1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.broker.impl;
20
21 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.broker.Broker;
26 import org.codehaus.activemq.broker.BrokerClient;
27 import org.codehaus.activemq.broker.BrokerConnector;
28 import org.codehaus.activemq.broker.BrokerContainer;
29 import org.codehaus.activemq.broker.BrokerContext;
30 import org.codehaus.activemq.capacity.CapacityMonitorEvent;
31 import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
32 import org.codehaus.activemq.message.*;
33 import org.codehaus.activemq.security.SecurityAdapter;
34 import org.codehaus.activemq.service.Service;
35 import org.codehaus.activemq.service.RedeliveryPolicy;
36 import org.codehaus.activemq.store.PersistenceAdapter;
37 import org.codehaus.activemq.transport.DiscoveryAgent;
38 import org.codehaus.activemq.transport.NetworkConnector;
39 import org.codehaus.activemq.transport.TransportServerChannel;
40 import org.codehaus.activemq.util.IdGenerator;
41
42 import javax.jms.InvalidClientIDException;
43 import javax.jms.InvalidDestinationException;
44 import javax.jms.JMSException;
45 import javax.jms.JMSSecurityException;
46 import javax.transaction.xa.XAException;
47 import java.util.ArrayList;
48 import java.util.HashMap;
49 import java.util.Iterator;
50 import java.util.List;
51 import java.util.Map;
52
53 /***
54 * Represents the ActiveMQ JMS Broker which typically has one or many connectors
55 *
56 * @version $Revision: 1.34 $
57 */
58 public class BrokerContainerImpl implements BrokerContainer, CapacityMonitorEventListener {
59 private static final Log log = LogFactory.getLog(BrokerContainerImpl.class);
60
61 private BrokerContext context;
62 private Broker broker;
63 private Map clientIds;
64 private Map consumerInfos;
65 private Map producerInfos;
66 private List transportConnectors;
67 private Thread shutdownHook;
68 private boolean stopped;
69 private List networkConnectors;
70 private DiscoveryAgent discoveryAgent;
71 private Map localDiscoveryDetails;
72
73
74 public BrokerContainerImpl() {
75 this(new IdGenerator().generateId());
76 }
77
78 public BrokerContainerImpl(String brokerName) {
79 this(brokerName, BrokerContext.getInstance());
80 }
81
82 public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter) {
83 this(brokerName, persistenceAdapter, BrokerContext.getInstance());
84 }
85
86 public BrokerContainerImpl(String brokerName, BrokerContext context) {
87 this(new DefaultBroker(brokerName), context);
88 }
89
90 public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter, BrokerContext context) {
91 this(new DefaultBroker(brokerName, persistenceAdapter), context);
92 }
93
94 /***
95 * @param broker
96 */
97 public BrokerContainerImpl(Broker broker, BrokerContext context) {
98 this.broker = broker;
99 this.context = context;
100 this.clientIds = new ConcurrentHashMap();
101 this.consumerInfos = new ConcurrentHashMap();
102 this.producerInfos = new ConcurrentHashMap();
103 this.transportConnectors = new CopyOnWriteArrayList();
104 this.networkConnectors = new CopyOnWriteArrayList();
105 this.broker.addCapacityEventListener(this);
106
107
108 context.registerContainer(broker.getBrokerName(), this);
109 }
110
111 /***
112 * start the Container
113 *
114 * @throws JMSException
115 */
116 public void start() throws JMSException {
117 log.info("ActiveMQ JMS Message Broker (" + broker.getBrokerName() + ") is starting");
118 log.info("For help or more information please see: www.protique.com");
119 broker.start();
120 addShutdownHook();
121
122
123
124
125
126 for (Iterator iter = new ArrayList(networkConnectors).iterator(); iter.hasNext();) {
127 Service connector = (Service) iter.next();
128 connector.start();
129 }
130
131 for (Iterator iter = new ArrayList(transportConnectors).iterator(); iter.hasNext();) {
132 Service connector = (Service) iter.next();
133 connector.start();
134 }
135
136 if (discoveryAgent != null) {
137 discoveryAgent.start();
138
139 localDiscoveryDetails = createLocalDiscoveryDetails();
140 discoveryAgent.registerService(getLocalBrokerName(), localDiscoveryDetails);
141 }
142
143 log.info("ActiveMQ JMS Message Broker (" + broker.getBrokerName() + ") has started");
144 }
145
146 /***
147 * Stop the Container
148 *
149 * @throws JMSException
150 */
151 public synchronized void stop() throws JMSException {
152 if (!stopped) {
153 stopped = true;
154
155 log.info("ActiveMQ Message Broker (" + broker.getBrokerName() + ") is shutting down");
156
157 context.deregisterContainer(broker.getBrokerName(), this);
158
159 try {
160 Runtime.getRuntime().removeShutdownHook(shutdownHook);
161 }
162 catch (Exception e) {
163 log.debug("Caught exception, must be shutting down: " + e);
164 }
165
166 JMSException firstException = null;
167
168 for (Iterator iter = new ArrayList(transportConnectors).iterator(); iter.hasNext();) {
169 Service connector = (Service) iter.next();
170 try {
171 connector.stop();
172 }
173 catch (JMSException e) {
174 if (firstException == null) {
175 firstException = e;
176 }
177 log.warn("Could not close transport connector: " + connector + " due to: " + e, e);
178 }
179 }
180 transportConnectors.clear();
181
182 for (Iterator iter = new ArrayList(networkConnectors).iterator(); iter.hasNext();) {
183 Service connector = (Service) iter.next();
184 try {
185 connector.stop();
186 }
187 catch (JMSException e) {
188 if (firstException == null) {
189 firstException = e;
190 }
191 log.warn("Could not close network connector: " + connector + " due to: " + e, e);
192 }
193 }
194 networkConnectors.clear();
195
196
197
198
199
200 for (Iterator iter = clientIds.values().iterator(); iter.hasNext();) {
201
202 BrokerClient client = (BrokerClient) iter.next();
203 try {
204 client.stop();
205 }
206 catch (JMSException e) {
207 if (firstException == null) {
208 firstException = e;
209 }
210 log.warn("Could not close client: " + client + " due to: " + e, e);
211 }
212 }
213 clientIds.clear();
214
215 broker.removeCapacityEventListener(this);
216 broker.stop();
217
218 log.info("ActiveMQ JMS Message Broker (" + broker.getBrokerName() + ") stopped");
219
220 if (firstException != null) {
221 throw firstException;
222 }
223 }
224 }
225
226 /***
227 * registers a new Connection
228 *
229 * @param client
230 * @param info infomation about the client-side Connection
231 * @throws InvalidClientIDException if the ClientID of the Connection is a duplicate
232 */
233 public void registerConnection(BrokerClient client, ConnectionInfo info) throws JMSException {
234 String clientId = info.getClientId();
235 if (clientIds.containsKey(clientId)) {
236 throw new InvalidClientIDException("Duplicate clientId: " + info);
237 }
238 getBroker().addClient(client, info);
239 log.info("Adding new client: " + clientId + " on transport: " + client.getChannel());
240 clientIds.put(clientId, client);
241 }
242
243 /***
244 * un-registers a Connection
245 *
246 * @param client
247 * @param info infomation about the client-side Connection
248 * @throws JMSException
249 */
250 public void deregisterConnection(BrokerClient client, ConnectionInfo info) throws JMSException {
251 String clientId = client.getClientID();
252 if (clientId != null) {
253 Object answer = clientIds.remove(clientId);
254 if (answer != null) {
255 log.info("Removing client: " + clientId + " on transport: " + client.getChannel());
256 getBroker().removeClient(client, info);
257 }
258 else {
259 log.warn("Got duplicate deregisterConnection for client: " + clientId);
260 }
261 }
262 else {
263 log.warn("No clientID available for client: " + client);
264 }
265 }
266
267 /***
268 * Registers a MessageConsumer
269 *
270 * @param client
271 * @param info
272 * @throws JMSException
273 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
274 */
275 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
276 consumerInfos.put(info, client);
277 getBroker().addMessageConsumer(client, info);
278 }
279
280 /***
281 * De-register a MessageConsumer from the Broker
282 *
283 * @param client
284 * @param info
285 * @throws JMSException
286 */
287 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
288 consumerInfos.remove(info);
289 getBroker().removeMessageConsumer(client, info);
290 }
291
292 /***
293 * Registers a MessageProducer
294 *
295 * @param client
296 * @param info
297 * @throws JMSException
298 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
299 */
300 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
301 ActiveMQDestination dest = info.getDestination();
302 if (dest != null && dest.isTemporary()) {
303
304 String clientId = ActiveMQDestination.getClientId(dest);
305 if (clientId == null) {
306 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
307 + " is a temporary destination with null clientId");
308 }
309 if (!clientIds.containsKey(clientId)) {
310 throw new InvalidDestinationException("Destination " + dest.getPhysicalName()
311 + " is no longer valid because the client " + clientId + " no longer exists");
312 }
313 }
314 getBroker().addMessageProducer(client, info);
315
316 producerInfos.put(info, client);
317 }
318
319 /***
320 * De-register a MessageProducer from the Broker
321 *
322 * @param client
323 * @param info
324 * @throws JMSException
325 */
326 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
327 getBroker().removeMessageProducer(client, info);
328
329 producerInfos.remove(info);
330 }
331
332 /***
333 * Register a client-side Session (used for Monitoring)
334 *
335 * @param client
336 * @param info
337 * @throws JMSException
338 */
339 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
340 }
341
342 /***
343 * De-register a client-side Session from the Broker (used for monitoring)
344 *
345 * @param client
346 * @param info
347 * @throws JMSException
348 */
349 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
350 }
351
352 /***
353 * Start a transaction from the Client session
354 *
355 * @param client
356 * @param transactionId
357 * @throws JMSException
358 */
359 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
360 getBroker().startTransaction(client, transactionId);
361 }
362
363 /***
364 * Rollback a transacton
365 *
366 * @param client
367 * @param transactionId
368 * @throws JMSException
369 */
370 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
371 getBroker().rollbackTransaction(client, transactionId);
372 }
373
374 /***
375 * Commit a transaction
376 *
377 * @param client
378 * @param transactionId
379 * @throws JMSException
380 */
381 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
382 getBroker().commitTransaction(client, transactionId);
383 }
384
385 /***
386 * send message with a transaction context
387 *
388 * @param client
389 * @param transactionId
390 * @param message
391 * @throws JMSException
392 */
393 public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
394 throws JMSException {
395 getBroker().sendTransactedMessage(client, transactionId, message);
396 }
397
398 /***
399 * Acknowledge receipt of a message within a transaction context
400 *
401 * @param client
402 * @param transactionId
403 * @param ack
404 * @throws JMSException
405 */
406 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
407 throws JMSException {
408 getBroker().acknowledgeTransactedMessage(client, transactionId, ack);
409 }
410
411 /***
412 * Send a non-transacted message to the Broker
413 *
414 * @param client
415 * @param message
416 * @throws JMSException
417 */
418 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
419 getBroker().sendMessage(client, message);
420 }
421
422 /***
423 * Acknowledge reciept of a message
424 *
425 * @param client
426 * @param ack
427 * @throws JMSException
428 */
429 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
430 getBroker().acknowledgeMessage(client, ack);
431 }
432
433 /***
434 * Command to delete a durable topic subscription
435 *
436 * @param client
437 * @param ds
438 * @throws JMSException
439 */
440 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
441 getBroker().deleteSubscription(ds.getClientId(), ds.getSubscriberName());
442 }
443
444 /***
445 * Start an XA transaction.
446 *
447 * @param client
448 * @param xid
449 */
450 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
451 getBroker().startTransaction(client, xid);
452 }
453
454 /***
455 * Gets the prepared XA transactions.
456 *
457 * @param client
458 * @return
459 */
460 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
461 return getBroker().getPreparedTransactions(client);
462 }
463
464 /***
465 * Prepare an XA transaction.
466 *
467 * @param client
468 * @param xid
469 */
470 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
471 return getBroker().prepareTransaction(client, xid);
472 }
473
474 /***
475 * Rollback an XA transaction.
476 *
477 * @param client
478 * @param xid
479 */
480 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
481 getBroker().rollbackTransaction(client, xid);
482 }
483
484 /***
485 * Commit an XA transaction.
486 *
487 * @param client
488 * @param xid
489 * @param onePhase
490 */
491 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
492 getBroker().commitTransaction(client, xid, onePhase);
493 }
494
495
496 /***
497 * Update any message producers about our capacity to handle messages
498 *
499 * @param event
500 */
501 public void capacityChanged(CapacityMonitorEvent event) {
502
503 for (Iterator i = producerInfos.values().iterator(); i.hasNext();) {
504 BrokerClient client = (BrokerClient) i.next();
505 client.updateBrokerCapacity(event.getCapacity());
506 }
507 }
508
509
510
511
512
513 public List getTransportConnectors() {
514 return transportConnectors;
515 }
516
517 public void setTransportConnectors(List transportConnectors) {
518 this.transportConnectors = transportConnectors;
519 }
520
521 public void addConnector(BrokerConnector connector) {
522 transportConnectors.add(connector);
523 context.registerConnector(connector.getServerChannel().getUrl(), connector);
524 }
525
526 public void removeConnector(BrokerConnector connector) {
527 transportConnectors.remove(connector);
528 context.deregisterConnector(connector.getServerChannel().getUrl());
529 }
530
531
532 public void addConnector(String bindAddress) throws JMSException {
533 addConnector(bindAddress, new DefaultWireFormat());
534 }
535
536 public void addConnector(String bindAddress, WireFormat wireFormat) throws JMSException {
537 addConnector(new BrokerConnectorImpl(this, bindAddress, wireFormat));
538 }
539
540 public void addConnector(TransportServerChannel transportConnector) {
541 addConnector(new BrokerConnectorImpl(this, transportConnector));
542 }
543
544 public List getNetworkConnectors() {
545 return networkConnectors;
546 }
547
548 public void setNetworkConnectors(List networkConnectors) {
549 this.networkConnectors = networkConnectors;
550 }
551
552 public NetworkConnector addNetworkConnector(String uri) {
553 NetworkConnector connector = addNetworkConnector();
554 connector.addNetworkChannel(uri);
555 return connector;
556 }
557
558 public NetworkConnector addNetworkConnector() {
559 NetworkConnector connector = new NetworkConnector(this);
560 addNetworkConnector(connector);
561 return connector;
562 }
563
564 public void addNetworkConnector(NetworkConnector connector) {
565 networkConnectors.add(connector);
566 }
567
568 public void removeNetworkConnector(NetworkConnector connector) {
569 networkConnectors.remove(connector);
570 }
571
572
573 public Broker getBroker() {
574 return broker;
575 }
576
577 public PersistenceAdapter getPersistenceAdapter() {
578 return broker != null ? broker.getPersistenceAdapter() : null;
579 }
580
581 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
582 checkBrokerSet();
583 broker.setPersistenceAdapter(persistenceAdapter);
584 }
585
586 public DiscoveryAgent getDiscoveryAgent() {
587 return discoveryAgent;
588 }
589
590 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
591 this.discoveryAgent = discoveryAgent;
592 }
593
594 public SecurityAdapter getSecurityAdapter() {
595 return broker != null ? broker.getSecurityAdapter() : null;
596 }
597
598 public void setSecurityAdapter(SecurityAdapter securityAdapter) {
599 checkBrokerSet();
600 broker.setSecurityAdapter(securityAdapter);
601 }
602
603 public RedeliveryPolicy getRedeliveryPolicy() {
604 return broker != null ? broker.getRedeliveryPolicy() : null;
605 }
606
607 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
608 checkBrokerSet();
609 broker.setRedeliveryPolicy(redeliveryPolicy);
610 }
611
612
613
614
615 protected void checkBrokerSet() {
616 if (broker == null) {
617 throw new IllegalStateException("Cannot set this property as we don't have a broker yet");
618 }
619 }
620
621 protected Map createLocalDiscoveryDetails() {
622 Map map = new HashMap();
623 map.put("brokerName", getLocalBrokerName());
624 map.put("connectURL", getLocalConnectionURL());
625 return map;
626 }
627
628 protected String getLocalBrokerName() {
629 return getBroker().getBrokerName();
630 }
631
632 protected String getLocalConnectionURL() {
633 StringBuffer buffer = new StringBuffer("reliable:");
634 List list = getTransportConnectors();
635 boolean first = true;
636 for (Iterator iter = list.iterator(); iter.hasNext();) {
637 BrokerConnector brokerConnector = (BrokerConnector) iter.next();
638 TransportServerChannel connector = brokerConnector.getServerChannel();
639 String url = connector.getUrl();
640 if (first) {
641 first = false;
642 }
643 else {
644 buffer.append(",");
645 }
646 buffer.append(url);
647 }
648 return buffer.toString();
649 }
650
651 protected void addShutdownHook() {
652 shutdownHook = new Thread("ActiveMQ ShutdownHook") {
653 public void run() {
654 containerShutdown();
655 }
656 };
657 Runtime.getRuntime().addShutdownHook(shutdownHook);
658 }
659
660 /***
661 * Causes a clean shutdown of the container when the VM is being shut down
662 */
663 protected void containerShutdown() {
664 try {
665 stop();
666 }
667 catch (JMSException e) {
668 Exception linkedException = e.getLinkedException();
669 if (linkedException != null) {
670 log.error("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
671 }
672 else {
673 log.error("Failed to shut down: " + e, e);
674 }
675 }
676 catch (Exception e) {
677 log.error("Failed to shut down: " + e, e);
678 }
679 }
680 }