View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport;
20  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.ActiveMQConnection;
24  import org.codehaus.activemq.ActiveMQConnectionFactory;
25  import org.codehaus.activemq.broker.BrokerClient;
26  import org.codehaus.activemq.broker.BrokerContainer;
27  import org.codehaus.activemq.broker.ConsumerInfoListener;
28  import org.codehaus.activemq.message.ActiveMQDestination;
29  import org.codehaus.activemq.message.BrokerInfo;
30  import org.codehaus.activemq.message.ConsumerInfo;
31  import org.codehaus.activemq.message.Receipt;
32  import org.codehaus.activemq.service.MessageContainerManager;
33  import org.codehaus.activemq.service.Service;
34  import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
35  import javax.jms.JMSException;
36  import javax.jms.Session;
37  import java.util.Iterator;
38  import java.util.Map;
39  
40  /***
41   * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
42   * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
43   * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
44   * broker.
45   * 
46   * @version $Revision: 1.18 $
47   */
48  public class NetworkChannel implements Service, ConsumerInfoListener {
49      private static final Log log = LogFactory.getLog(NetworkChannel.class);
50      private String uri;
51      private BrokerContainer brokerContainer;
52      private ActiveMQConnection localConnection;
53      private ActiveMQConnection remoteConnection;
54      private ConcurrentHashMap consumerMap;
55      private String remoteUserName;
56      private String remotePassword;
57      private String remoteBrokerName;
58      private String remoteClusterName;
59      private int maximumRetries = 0;
60      private long reconnectSleepTime = 1000L;
61  
62      /***
63       * Default Constructor
64       */
65      public NetworkChannel() {
66          this.consumerMap = new ConcurrentHashMap();
67      }
68  
69      /***
70       * Constructor
71       * 
72       * @param brokerContainer
73       * @param uri
74       */
75      public NetworkChannel(BrokerContainer brokerContainer, String uri) {
76          this();
77          this.brokerContainer = brokerContainer;
78          this.uri = uri;
79      }
80  
81      /***
82       * @return text info on this
83       */
84      public String toString() {
85          return super.toString() + "[uri=" + uri + "]";
86      }
87  
88      /***
89       * Start the channel
90       */
91      public void start() throws JMSException {
92          Thread runner = new Thread(new Runnable() {
93              public void run() {
94                  try {
95                      initialize();
96                      brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
97                      startSubscriptions();
98                      log.info("Started NetworkChannel to " + uri);
99                  }
100                 catch (JMSException jmsEx) {
101                     log.error("Failed to start NetworkChannel: " + uri);
102                 }
103             }
104         }, "NetworkChannel Starter");
105         runner.setDaemon(true);
106         runner.start();
107     }
108 
109     /***
110      * stop the channel
111      * 
112      * @throws JMSException on error
113      */
114     public void stop() throws JMSException {
115         consumerMap.clear();
116         if (remoteConnection != null) {
117             remoteConnection.close();
118             remoteConnection = null;
119         }
120         if (localConnection != null) {
121             localConnection.close();
122             localConnection = null;
123         }
124         for (Iterator i = consumerMap.values().iterator();i.hasNext();) {
125             NetworkMessageBridge consumer = (NetworkMessageBridge) i.next();
126             consumer.stop();
127         }
128     }
129 
130     /***
131      * Listen for new Consumer events at this broker
132      * 
133      * @param client
134      * @param info
135      */
136     public void onConsumerInfo(BrokerClient client, ConsumerInfo info) {
137         if (!client.isClusteredConnection()) {
138             if (!info.hasVisited(remoteBrokerName)) {
139                 if (info.isStarted()) {
140                     addConsumerInfo(info);
141                 }
142                 else {
143                     removeConsumerInfo(info);
144                 }
145             }
146         }
147     }
148 
149     /***
150      * @return the uri of the broker(s) this channel is connected to
151      */
152     public String getUri() {
153         return uri;
154     }
155 
156     /***
157      * set the uri of the broker(s) this channel is connected to
158      * 
159      * @param uri
160      */
161     public void setUri(String uri) {
162         this.uri = uri;
163     }
164 
165     /***
166      * @return Returns the remotePassword.
167      */
168     public String getRemotePassword() {
169         return remotePassword;
170     }
171 
172     /***
173      * @param remotePassword The remotePassword to set.
174      */
175     public void setRemotePassword(String remotePassword) {
176         this.remotePassword = remotePassword;
177     }
178 
179     /***
180      * @return Returns the remoteUserName.
181      */
182     public String getRemoteUserName() {
183         return remoteUserName;
184     }
185 
186     /***
187      * @param remoteUserName The remoteUserName to set.
188      */
189     public void setRemoteUserName(String remoteUserName) {
190         this.remoteUserName = remoteUserName;
191     }
192 
193     /***
194      * @return Returns the brokerContainer.
195      */
196     public BrokerContainer getBrokerContainer() {
197         return brokerContainer;
198     }
199 
200     /***
201      * @param brokerContainer The brokerContainer to set.
202      */
203     public void setBrokerContainer(BrokerContainer brokerContainer) {
204         this.brokerContainer = brokerContainer;
205     }
206 
207     public int getMaximumRetries() {
208         return maximumRetries;
209     }
210 
211     public void setMaximumRetries(int maximumRetries) {
212         this.maximumRetries = maximumRetries;
213     }
214 
215     public long getReconnectSleepTime() {
216         return reconnectSleepTime;
217     }
218 
219     public void setReconnectSleepTime(long reconnectSleepTime) {
220         this.reconnectSleepTime = reconnectSleepTime;
221     }
222 
223     public String getRemoteBrokerName() {
224         return remoteBrokerName;
225     }
226 
227     public void setRemoteBrokerName(String remoteBrokerName) {
228         this.remoteBrokerName = remoteBrokerName;
229     }
230 
231     // Implementation methods
232     //-------------------------------------------------------------------------
233     private void addConsumerInfo(ConsumerInfo info) {
234         addConsumerInfo(info.getDestination(), info.isDurableTopic());
235     }
236 
237     private void addConsumerInfo(ActiveMQDestination destination, boolean durableTopic) {
238         NetworkMessageBridge key = new NetworkMessageBridge();
239         key.setDestination(destination);
240         key.setDurableTopic(durableTopic);
241         NetworkMessageBridge bridge = (NetworkMessageBridge) consumerMap.get(key);
242         if (bridge == null) {
243             try {
244                 bridge = key;//might as well recycle :-)
245                 bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName());
246                 bridge.setLocalSession(localConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE));
247                 bridge.setRemoteSession(remoteConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE));
248                 consumerMap.put(bridge, bridge);
249                 bridge.start();
250                 log.info("started NetworkMessageBridge for destination: " + destination);
251             }
252             catch (JMSException jmsEx) {
253                 log.error("Failed to start NetworkMessageBridge for destination: " + destination);
254             }
255         }
256         bridge.incrementReferenceCount();
257     }
258 
259     private void removeConsumerInfo(final ConsumerInfo info) {
260         NetworkMessageBridge key = new NetworkMessageBridge();
261         key.setDestination(info.getDestination());
262         key.setDurableTopic(info.isDurableTopic());
263         final NetworkMessageBridge bridge = (NetworkMessageBridge) consumerMap.get(key);
264         if (bridge != null) {
265             if (bridge.decrementReferenceCount() <= 0 && !bridge.isDurableTopic()
266                     && (bridge.getDestination().isTopic() || bridge.getDestination().isTemporary())) {
267                 Thread runner = new Thread(new Runnable() {
268                     public void run() {
269                         bridge.stop();
270                         consumerMap.remove(bridge);
271                         log.info("stopped MetworkMessageBridge for destination: " + info.getDestination());
272                     }
273                 });
274                 runner.setDaemon(true);
275                 runner.start();
276             }
277         }
278     }
279 
280     private void startSubscriptions() {
281         MessageContainerManager durableTopicMCM = brokerContainer.getBroker().getPersistentTopicContainerManager();
282         if (durableTopicMCM != null) {
283             Map map = durableTopicMCM.getDestinations();
284             startSubscriptions(map, true);
285         }
286         for (Iterator i = brokerContainer.getBroker().getContainerManagerMap().values().iterator();i.hasNext();) {
287             MessageContainerManager mcm = (MessageContainerManager) i.next();
288             if (mcm != durableTopicMCM) {
289                 startSubscriptions(mcm.getDestinations(), false);
290             }
291         }
292     }
293 
294     private void startSubscriptions(Map destinations, boolean durableTopic) {
295         if (destinations != null) {
296             for (Iterator i = destinations.values().iterator();i.hasNext();) {
297                 ActiveMQDestination dest = (ActiveMQDestination) i.next();
298                 addConsumerInfo(dest, durableTopic);
299             }
300         }
301     }
302 
303     private void initialize() throws JMSException {
304         initializeRemote();
305         initializeLocal();
306         //start subscribing for already established subscriptions
307     }
308 
309     private void initializeRemote() throws JMSException {
310         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri);
311         factory.setUseAsyncSend(true);
312         remoteConnection = (ActiveMQConnection) factory.createConnection();
313         remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
314         TransportChannel transportChannel = remoteConnection.getTransportChannel();
315         if (transportChannel instanceof CompositeTransportChannel) {
316             CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel;
317             composite.setMaximumRetries(maximumRetries);
318             composite.setFailureSleepTime(reconnectSleepTime);
319         }
320         remoteConnection.start();
321         BrokerInfo info = new BrokerInfo();
322         info.setBrokerName(brokerContainer.getBroker().getBrokerName());
323         info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
324         Receipt receipt = remoteConnection.syncSendRequest(info);
325         remoteBrokerName = receipt.getBrokerName();
326         remoteClusterName = receipt.getClusterName();
327     }
328 
329     private void initializeLocal() throws JMSException {
330         String brokerName = brokerContainer.getBroker().getBrokerName();
331         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
332         factory.setUseAsyncSend(true);
333         factory.setBrokerName(brokerName);
334         localConnection = (ActiveMQConnection) factory.createConnection();
335         localConnection.start();
336         BrokerInfo info = new BrokerInfo();
337         info.setBrokerName(remoteBrokerName);
338         info.setClusterName(remoteClusterName);
339         localConnection.asyncSendPacket(info);
340     }
341 }