1. Project Clover database Tue Dec 20 2016 21:24:09 CET
  2. Package org.xwiki.observation.remote.internal.jgroups

File JGroupsNetworkAdapter.java

 

Coverage histogram

../../../../../../img/srcFileCovDistChart7.png
64% of files have more coverage

Code metrics

8
54
6
1
228
133
17
0.31
9
6
2.83

Classes

Class Line # Actions
JGroupsNetworkAdapter 59 54 0% 17 21
0.691176569.1%
 

Contributing tests

This file is covered by 1 test. .

Source view

1    /*
2    * See the NOTICE file distributed with this work for additional
3    * information regarding copyright ownership.
4    *
5    * This is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU Lesser General Public License as
7    * published by the Free Software Foundation; either version 2.1 of
8    * the License, or (at your option) any later version.
9    *
10    * This software is distributed in the hope that it will be useful,
11    * but WITHOUT ANY WARRANTY; without even the implied warranty of
12    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13    * Lesser General Public License for more details.
14    *
15    * You should have received a copy of the GNU Lesser General Public
16    * License along with this software; if not, write to the Free
17    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
18    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
19    */
20    package org.xwiki.observation.remote.internal.jgroups;
21   
22    import java.io.IOException;
23    import java.io.InputStream;
24    import java.lang.management.ManagementFactory;
25    import java.text.MessageFormat;
26    import java.util.Map;
27    import java.util.concurrent.ConcurrentHashMap;
28   
29    import javax.inject.Inject;
30    import javax.inject.Named;
31    import javax.inject.Singleton;
32    import javax.management.MBeanServer;
33   
34    import org.jgroups.JChannel;
35    import org.jgroups.Message;
36    import org.jgroups.conf.ConfiguratorFactory;
37    import org.jgroups.conf.ProtocolStackConfigurator;
38    import org.jgroups.conf.XmlConfigurator;
39    import org.jgroups.jmx.JmxConfigurator;
40    import org.slf4j.Logger;
41    import org.xwiki.component.annotation.Component;
42    import org.xwiki.component.manager.ComponentLookupException;
43    import org.xwiki.component.manager.ComponentManager;
44    import org.xwiki.environment.Environment;
45    import org.xwiki.observation.remote.NetworkAdapter;
46    import org.xwiki.observation.remote.RemoteEventData;
47    import org.xwiki.observation.remote.RemoteEventException;
48    import org.xwiki.observation.remote.jgroups.JGroupsReceiver;
49   
50    /**
51    * JGroups based implementation of {@link NetworkAdapter}.
52    *
53    * @version $Id: fa865dc354639e3a78908733b5624096382c4783 $
54    * @since 2.0RC1
55    */
56    @Component
57    @Named("jgroups")
58    @Singleton
 
59    public class JGroupsNetworkAdapter implements NetworkAdapter
60    {
61    /**
62    * Relative path where to find jgroups channels configurations.
63    */
64    public static final String CONFIGURATION_PATH = "observation/remote/jgroups/";
65   
66    /**
67    * Used to lookup the receiver corresponding to the channel identifier.
68    */
69    @Inject
70    private ComponentManager componentManager;
71   
72    /**
73    * The logger to log.
74    */
75    @Inject
76    private Logger logger;
77   
78    /**
79    * The network channels.
80    */
81    private Map<String, JChannel> channels = new ConcurrentHashMap<String, JChannel>();
82   
 
83  34 toggle @Override
84    public void send(RemoteEventData remoteEvent)
85    {
86  34 this.logger.debug("Send JGroups remote event [" + remoteEvent + "]");
87   
88    // Send the message to the whole group
89  34 Message message = new Message(null, null, remoteEvent);
90   
91    // Send message to JGroups channels
92  34 for (Map.Entry<String, JChannel> entry : this.channels.entrySet()) {
93  34 try {
94  34 entry.getValue().send(message);
95    } catch (Exception e) {
96  0 this.logger.error("Failed to send message [" + remoteEvent + "] to the channel [" + entry.getKey()
97    + "]", e);
98    }
99    }
100    }
101   
 
102  5 toggle @Override
103    public void startChannel(String channelId) throws RemoteEventException
104    {
105  5 if (this.channels.containsKey(channelId)) {
106  0 throw new RemoteEventException(MessageFormat.format("Channel [{0}] already started", channelId));
107    }
108   
109  5 JChannel channel;
110  5 try {
111  5 channel = createChannel(channelId);
112  5 channel.connect("event");
113   
114  5 this.channels.put(channelId, channel);
115    } catch (Exception e) {
116  0 throw new RemoteEventException("Failed to create channel [" + channelId + "]", e);
117    }
118   
119    // Register the channel against the JMX Server
120  5 try {
121  5 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
122  5 JmxConfigurator.registerChannel(channel, mbs, channel.getClusterName());
123    } catch (Exception e) {
124  0 this.logger.warn("Failed to register channel [" + channelId + "] against the JMX Server", e);
125    }
126   
127  5 this.logger.info("Channel [{}] started", channelId);
128    }
129   
 
130  0 toggle @Override
131    public void stopChannel(String channelId) throws RemoteEventException
132    {
133  0 JChannel channel = this.channels.get(channelId);
134   
135  0 if (channel == null) {
136  0 throw new RemoteEventException(MessageFormat.format("Channel [{0}] is not started", channelId));
137    }
138   
139  0 channel.close();
140   
141  0 this.channels.remove(channelId);
142   
143    // Unregister the channel from the JMX Server
144  0 try {
145  0 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
146  0 JmxConfigurator.unregister(channel, mbs, channel.getClusterName());
147    } catch (Exception e) {
148  0 this.logger.warn("Failed to unregister channel [" + channelId + "] from the JMX Server", e);
149    }
150   
151  0 this.logger.info("Channel [{}] stopped", channelId);
152    }
153   
154    /**
155    * Create a new channel.
156    *
157    * @param channelId the identifier of the channel to create
158    * @return the new channel
159    * @throws Exception failed to create new channel
160    */
 
161  5 toggle private JChannel createChannel(String channelId) throws Exception
162    {
163    // load configuration
164  5 ProtocolStackConfigurator channelConf = loadChannelConfiguration(channelId);
165   
166    // get Receiver
167  5 JGroupsReceiver channelReceiver;
168  5 try {
169  5 channelReceiver = this.componentManager.getInstance(JGroupsReceiver.class, channelId);
170    } catch (ComponentLookupException e) {
171  5 channelReceiver = this.componentManager.getInstance(JGroupsReceiver.class);
172    }
173   
174    // create channel
175  5 JChannel channel = new JChannel(channelConf);
176   
177  5 channel.setReceiver(channelReceiver);
178  5 channel.setDiscardOwnMessages(true);
179   
180  5 return channel;
181    }
182   
183    /**
184    * Load channel configuration.
185    *
186    * @param channelId the identifier of the channel
187    * @return the channel configuration
188    * @throws IOException failed to load configuration file
189    */
 
190  5 toggle private ProtocolStackConfigurator loadChannelConfiguration(String channelId) throws IOException
191    {
192  5 String channelFile = channelId + ".xml";
193  5 String path = "/WEB-INF/" + CONFIGURATION_PATH + channelFile;
194   
195  5 InputStream is = null;
196  5 try {
197  5 Environment environment = this.componentManager.getInstance(Environment.class);
198  3 is = environment.getResourceAsStream(path);
199    } catch (ComponentLookupException e) {
200    // Environment not found, continue by fallbacking on JGroups's standard configuration.
201  2 this.logger.debug("Failed to lookup the Environment component.", e);
202    }
203   
204  5 if (is == null) {
205    // Fallback on JGroups standard configuration locations
206  5 is = ConfiguratorFactory.getConfigStream(channelFile);
207   
208  5 if (is == null && !JChannel.DEFAULT_PROTOCOL_STACK.equals(channelFile)) {
209    // Fallback on default JGroups configuration
210  0 is = ConfiguratorFactory.getConfigStream(JChannel.DEFAULT_PROTOCOL_STACK);
211    }
212    }
213   
214  5 return XmlConfigurator.getInstance(is);
215    }
216   
 
217  3 toggle @Override
218    public void stopAllChannels() throws RemoteEventException
219    {
220  3 for (Map.Entry<String, JChannel> channelEntry : this.channels.entrySet()) {
221  3 channelEntry.getValue().close();
222    }
223   
224  3 this.channels.clear();
225   
226  3 this.logger.info("All channels stopped");
227    }
228    }