1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.transport;
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.ActiveMQConnection;
24 import org.codehaus.activemq.ActiveMQConnectionFactory;
25 import org.codehaus.activemq.broker.BrokerClient;
26 import org.codehaus.activemq.broker.BrokerContainer;
27 import org.codehaus.activemq.broker.ConsumerInfoListener;
28 import org.codehaus.activemq.message.ActiveMQDestination;
29 import org.codehaus.activemq.message.BrokerInfo;
30 import org.codehaus.activemq.message.ConsumerInfo;
31 import org.codehaus.activemq.message.Receipt;
32 import org.codehaus.activemq.service.MessageContainerManager;
33 import org.codehaus.activemq.service.Service;
34 import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
35 import javax.jms.JMSException;
36 import javax.jms.Session;
37 import java.util.Iterator;
38 import java.util.Map;
39
40 /***
41 * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
42 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
43 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
44 * broker.
45 *
46 * @version $Revision: 1.18 $
47 */
48 public class NetworkChannel implements Service, ConsumerInfoListener {
49 private static final Log log = LogFactory.getLog(NetworkChannel.class);
50 private String uri;
51 private BrokerContainer brokerContainer;
52 private ActiveMQConnection localConnection;
53 private ActiveMQConnection remoteConnection;
54 private ConcurrentHashMap consumerMap;
55 private String remoteUserName;
56 private String remotePassword;
57 private String remoteBrokerName;
58 private String remoteClusterName;
59 private int maximumRetries = 0;
60 private long reconnectSleepTime = 1000L;
61
62 /***
63 * Default Constructor
64 */
65 public NetworkChannel() {
66 this.consumerMap = new ConcurrentHashMap();
67 }
68
69 /***
70 * Constructor
71 *
72 * @param brokerContainer
73 * @param uri
74 */
75 public NetworkChannel(BrokerContainer brokerContainer, String uri) {
76 this();
77 this.brokerContainer = brokerContainer;
78 this.uri = uri;
79 }
80
81 /***
82 * @return text info on this
83 */
84 public String toString() {
85 return super.toString() + "[uri=" + uri + "]";
86 }
87
88 /***
89 * Start the channel
90 */
91 public void start() throws JMSException {
92 Thread runner = new Thread(new Runnable() {
93 public void run() {
94 try {
95 initialize();
96 brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
97 startSubscriptions();
98 log.info("Started NetworkChannel to " + uri);
99 }
100 catch (JMSException jmsEx) {
101 log.error("Failed to start NetworkChannel: " + uri);
102 }
103 }
104 }, "NetworkChannel Starter");
105 runner.setDaemon(true);
106 runner.start();
107 }
108
109 /***
110 * stop the channel
111 *
112 * @throws JMSException on error
113 */
114 public void stop() throws JMSException {
115 consumerMap.clear();
116 if (remoteConnection != null) {
117 remoteConnection.close();
118 remoteConnection = null;
119 }
120 if (localConnection != null) {
121 localConnection.close();
122 localConnection = null;
123 }
124 for (Iterator i = consumerMap.values().iterator();i.hasNext();) {
125 NetworkMessageBridge consumer = (NetworkMessageBridge) i.next();
126 consumer.stop();
127 }
128 }
129
130 /***
131 * Listen for new Consumer events at this broker
132 *
133 * @param client
134 * @param info
135 */
136 public void onConsumerInfo(BrokerClient client, ConsumerInfo info) {
137 if (!client.isClusteredConnection()) {
138 if (!info.hasVisited(remoteBrokerName)) {
139 if (info.isStarted()) {
140 addConsumerInfo(info);
141 }
142 else {
143 removeConsumerInfo(info);
144 }
145 }
146 }
147 }
148
149 /***
150 * @return the uri of the broker(s) this channel is connected to
151 */
152 public String getUri() {
153 return uri;
154 }
155
156 /***
157 * set the uri of the broker(s) this channel is connected to
158 *
159 * @param uri
160 */
161 public void setUri(String uri) {
162 this.uri = uri;
163 }
164
165 /***
166 * @return Returns the remotePassword.
167 */
168 public String getRemotePassword() {
169 return remotePassword;
170 }
171
172 /***
173 * @param remotePassword The remotePassword to set.
174 */
175 public void setRemotePassword(String remotePassword) {
176 this.remotePassword = remotePassword;
177 }
178
179 /***
180 * @return Returns the remoteUserName.
181 */
182 public String getRemoteUserName() {
183 return remoteUserName;
184 }
185
186 /***
187 * @param remoteUserName The remoteUserName to set.
188 */
189 public void setRemoteUserName(String remoteUserName) {
190 this.remoteUserName = remoteUserName;
191 }
192
193 /***
194 * @return Returns the brokerContainer.
195 */
196 public BrokerContainer getBrokerContainer() {
197 return brokerContainer;
198 }
199
200 /***
201 * @param brokerContainer The brokerContainer to set.
202 */
203 public void setBrokerContainer(BrokerContainer brokerContainer) {
204 this.brokerContainer = brokerContainer;
205 }
206
207 public int getMaximumRetries() {
208 return maximumRetries;
209 }
210
211 public void setMaximumRetries(int maximumRetries) {
212 this.maximumRetries = maximumRetries;
213 }
214
215 public long getReconnectSleepTime() {
216 return reconnectSleepTime;
217 }
218
219 public void setReconnectSleepTime(long reconnectSleepTime) {
220 this.reconnectSleepTime = reconnectSleepTime;
221 }
222
223 public String getRemoteBrokerName() {
224 return remoteBrokerName;
225 }
226
227 public void setRemoteBrokerName(String remoteBrokerName) {
228 this.remoteBrokerName = remoteBrokerName;
229 }
230
231
232
233 private void addConsumerInfo(ConsumerInfo info) {
234 addConsumerInfo(info.getDestination(), info.isDurableTopic());
235 }
236
237 private void addConsumerInfo(ActiveMQDestination destination, boolean durableTopic) {
238 NetworkMessageBridge key = new NetworkMessageBridge();
239 key.setDestination(destination);
240 key.setDurableTopic(durableTopic);
241 NetworkMessageBridge bridge = (NetworkMessageBridge) consumerMap.get(key);
242 if (bridge == null) {
243 try {
244 bridge = key;
245 bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName());
246 bridge.setLocalSession(localConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE));
247 bridge.setRemoteSession(remoteConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE));
248 consumerMap.put(bridge, bridge);
249 bridge.start();
250 log.info("started NetworkMessageBridge for destination: " + destination);
251 }
252 catch (JMSException jmsEx) {
253 log.error("Failed to start NetworkMessageBridge for destination: " + destination);
254 }
255 }
256 bridge.incrementReferenceCount();
257 }
258
259 private void removeConsumerInfo(final ConsumerInfo info) {
260 NetworkMessageBridge key = new NetworkMessageBridge();
261 key.setDestination(info.getDestination());
262 key.setDurableTopic(info.isDurableTopic());
263 final NetworkMessageBridge bridge = (NetworkMessageBridge) consumerMap.get(key);
264 if (bridge != null) {
265 if (bridge.decrementReferenceCount() <= 0 && !bridge.isDurableTopic()
266 && (bridge.getDestination().isTopic() || bridge.getDestination().isTemporary())) {
267 Thread runner = new Thread(new Runnable() {
268 public void run() {
269 bridge.stop();
270 consumerMap.remove(bridge);
271 log.info("stopped MetworkMessageBridge for destination: " + info.getDestination());
272 }
273 });
274 runner.setDaemon(true);
275 runner.start();
276 }
277 }
278 }
279
280 private void startSubscriptions() {
281 MessageContainerManager durableTopicMCM = brokerContainer.getBroker().getPersistentTopicContainerManager();
282 if (durableTopicMCM != null) {
283 Map map = durableTopicMCM.getDestinations();
284 startSubscriptions(map, true);
285 }
286 for (Iterator i = brokerContainer.getBroker().getContainerManagerMap().values().iterator();i.hasNext();) {
287 MessageContainerManager mcm = (MessageContainerManager) i.next();
288 if (mcm != durableTopicMCM) {
289 startSubscriptions(mcm.getDestinations(), false);
290 }
291 }
292 }
293
294 private void startSubscriptions(Map destinations, boolean durableTopic) {
295 if (destinations != null) {
296 for (Iterator i = destinations.values().iterator();i.hasNext();) {
297 ActiveMQDestination dest = (ActiveMQDestination) i.next();
298 addConsumerInfo(dest, durableTopic);
299 }
300 }
301 }
302
303 private void initialize() throws JMSException {
304 initializeRemote();
305 initializeLocal();
306
307 }
308
309 private void initializeRemote() throws JMSException {
310 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri);
311 factory.setUseAsyncSend(true);
312 remoteConnection = (ActiveMQConnection) factory.createConnection();
313 remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
314 TransportChannel transportChannel = remoteConnection.getTransportChannel();
315 if (transportChannel instanceof CompositeTransportChannel) {
316 CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel;
317 composite.setMaximumRetries(maximumRetries);
318 composite.setFailureSleepTime(reconnectSleepTime);
319 }
320 remoteConnection.start();
321 BrokerInfo info = new BrokerInfo();
322 info.setBrokerName(brokerContainer.getBroker().getBrokerName());
323 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
324 Receipt receipt = remoteConnection.syncSendRequest(info);
325 remoteBrokerName = receipt.getBrokerName();
326 remoteClusterName = receipt.getClusterName();
327 }
328
329 private void initializeLocal() throws JMSException {
330 String brokerName = brokerContainer.getBroker().getBrokerName();
331 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
332 factory.setUseAsyncSend(true);
333 factory.setBrokerName(brokerName);
334 localConnection = (ActiveMQConnection) factory.createConnection();
335 localConnection.start();
336 BrokerInfo info = new BrokerInfo();
337 info.setBrokerName(remoteBrokerName);
338 info.setClusterName(remoteClusterName);
339 localConnection.asyncSendPacket(info);
340 }
341 }