1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.broker.impl;
20
21 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.broker.Broker;
26 import org.codehaus.activemq.broker.BrokerClient;
27 import org.codehaus.activemq.broker.ConsumerInfoListener;
28 import org.codehaus.activemq.capacity.DelegateCapacityMonitor;
29 import org.codehaus.activemq.jndi.ReadOnlyContext;
30 import org.codehaus.activemq.message.ActiveMQDestination;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.message.ActiveMQXid;
33 import org.codehaus.activemq.message.ConnectionInfo;
34 import org.codehaus.activemq.message.ConsumerInfo;
35 import org.codehaus.activemq.message.MessageAck;
36 import org.codehaus.activemq.message.ProducerInfo;
37 import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
38 import org.codehaus.activemq.security.SecurityAdapter;
39 import org.codehaus.activemq.service.MessageContainerManager;
40 import org.codehaus.activemq.service.Transaction;
41 import org.codehaus.activemq.service.TransactionManager;
42 import org.codehaus.activemq.service.RedeliveryPolicy;
43 import org.codehaus.activemq.service.boundedvm.TransientQueueBoundedMessageManager;
44 import org.codehaus.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
45 import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
46 import org.codehaus.activemq.service.impl.MessageAckTransactionTask;
47 import org.codehaus.activemq.service.impl.DurableQueueMessageContainerManager;
48 import org.codehaus.activemq.service.impl.RedeliverMessageTransactionTask;
49 import org.codehaus.activemq.service.impl.SendMessageTransactionTask;
50 import org.codehaus.activemq.store.PersistenceAdapter;
51 import org.codehaus.activemq.store.PreparedTransactionStore;
52 import org.codehaus.activemq.store.vm.VMPersistenceAdapter;
53 import org.codehaus.activemq.store.vm.VMTransactionManager;
54 import org.codehaus.activemq.util.Callback;
55 import org.codehaus.activemq.util.ExceptionTemplate;
56 import org.codehaus.activemq.util.JMSExceptionHelper;
57
58 import javax.jms.JMSException;
59 import javax.naming.Context;
60 import javax.transaction.xa.XAException;
61 import java.io.File;
62 import java.lang.reflect.InvocationTargetException;
63 import java.lang.reflect.Method;
64 import java.util.Hashtable;
65 import java.util.Iterator;
66 import java.util.Map;
67
68 /***
69 * The default {@link Broker} implementation
70 *
71 * @version $Revision: 1.22 $
72 */
73 public class DefaultBroker extends DelegateCapacityMonitor implements Broker {
74
75 private static final Log log = LogFactory.getLog(DefaultBroker.class);
76
77 protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
78 protected static final String PERSISTENCE_ADAPTER_PROPERTY = "activemq.persistenceAdapter";
79
80 protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class};
81
82 private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024;
83
84 private PersistenceAdapter persistenceAdapter;
85 private TransactionManager transactionManager;
86 private MessageContainerManager[] containerManagers;
87 private File tempDir;
88 private MemoryBoundedQueueManager memoryManager;
89 private PreparedTransactionStore preparedTransactionStore;
90 private final String brokerName;
91 private final String brokerClusterName;
92 private Map containerManagerMap;
93 private CopyOnWriteArrayList consumerInfoListeners;
94 private MessageContainerManager persistentTopicMCM;
95 private MessageContainerManager transientTopicMCM;
96 private MessageContainerManager persistentQueueMCM;
97 private MessageContainerManager transientQueueMCM;
98 private SecurityAdapter securityAdapter;
99 private RedeliveryPolicy redeliveryPolicy;
100
101
102 public DefaultBroker(String brokerName, String brokerClusterName) {
103 this.brokerName = brokerName;
104 this.brokerClusterName = brokerClusterName;
105 memoryManager = new MemoryBoundedQueueManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE);
106 setDelegate(memoryManager);
107 containerManagerMap = new ConcurrentHashMap();
108 consumerInfoListeners = new CopyOnWriteArrayList();
109 }
110
111 public DefaultBroker(String brokerName) {
112 this(brokerName, "default");
113 }
114
115 public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {
116 this(brokerName, brokerClusterName);
117 this.persistenceAdapter = persistenceAdapter;
118 }
119
120 public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
121 this(brokerName);
122 this.persistenceAdapter = persistenceAdapter;
123 }
124
125 /***
126 * Start this Service
127 *
128 * @throws JMSException
129 */
130 public void start() throws JMSException {
131 if (redeliveryPolicy == null) {
132 redeliveryPolicy = new RedeliveryPolicy();
133 }
134 if (persistenceAdapter == null) {
135 persistenceAdapter = createPersistenceAdapter();
136 }
137 persistenceAdapter.start();
138
139 if (transactionManager == null) {
140 preparedTransactionStore = persistenceAdapter.createPreparedTransactionStore();
141 transactionManager = new VMTransactionManager(this, preparedTransactionStore);
142 }
143 transactionManager.start();
144
145
146 if (containerManagerMap.isEmpty()) {
147 makeDefaultContainerManagers();
148 }
149 getContainerManagers();
150
151 for (int i = 0; i < containerManagers.length; i++) {
152 containerManagers[i].start();
153 }
154 }
155
156
157 /***
158 * stop this Service
159 *
160 * @throws JMSException
161 */
162
163 public void stop() throws JMSException {
164 ExceptionTemplate template = new ExceptionTemplate();
165
166 if (containerManagers != null) {
167 for (int i = 0; i < containerManagers.length; i++) {
168 final MessageContainerManager containerManager = containerManagers[i];
169 template.run(new Callback() {
170 public void execute() throws Throwable {
171 containerManager.stop();
172 }
173 });
174 }
175 }
176 if (transactionManager != null) {
177 template.run(new Callback() {
178 public void execute() throws Throwable {
179 transactionManager.stop();
180 }
181 });
182 }
183
184 template.run(new Callback() {
185 public void execute() throws Throwable {
186 persistenceAdapter.stop();
187 }
188 });
189
190 template.throwJMSException();
191 }
192
193
194
195
196 public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException {
197 if (securityAdapter != null) {
198 securityAdapter.authorizeConnection(client, info);
199 }
200 }
201
202 public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {
203 if (transactionManager != null) {
204 transactionManager.cleanUpClient(client);
205 }
206 }
207
208 public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
209 if (securityAdapter != null) {
210 securityAdapter.authorizeProducer(client, info);
211 }
212 }
213
214 public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
215 }
216
217 /***
218 * Add an active message consumer
219 */
220 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
221 validateConsumer(info);
222 if (securityAdapter != null) {
223 securityAdapter.authorizeConsumer(client, info);
224 }
225
226 MessageContainerManager[] array = getContainerManagers();
227 for (int i = 0; i < array.length; i++) {
228 array[i].addMessageConsumer(client, info);
229 }
230 fireConsumerInfo(client, info);
231 }
232
233 /***
234 * remove an active message consumer
235 */
236 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
237 validateConsumer(info);
238 for (int i = 0; i < containerManagers.length; i++) {
239 containerManagers[i].removeMessageConsumer(client, info);
240 }
241 fireConsumerInfo(client, info);
242 }
243
244
245 /***
246 * send a message to the broker
247 */
248 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
249 checkValid();
250 if (message.getJMSMessageID() == null) {
251 throw new JMSException("No messageID specified for the Message");
252 }
253 ActiveMQDestination destination = message.getJMSActiveMQDestination();
254 if (destination.isComposite()) {
255 boolean first = true;
256
257 for (Iterator iter = destination.getChildDestinations().iterator(); iter.hasNext();) {
258 ActiveMQDestination childDestination = (ActiveMQDestination) iter.next();
259
260
261 if (first) {
262 first = false;
263 }
264 else {
265 message = message.shallowCopy();
266 }
267 message.setJMSDestination(childDestination);
268
269 doMessageSend(client, message);
270 }
271 }
272 else {
273 doMessageSend(client, message);
274 }
275 }
276
277 /***
278 * send a message to the broker within a transaction
279 */
280 public void sendTransactedMessage(final BrokerClient client, final String transactionId, final ActiveMQMessage message) throws JMSException {
281 Transaction transaction;
282 if (message.isXaTransacted()) {
283 try {
284 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
285 }
286 catch (XAException e) {
287 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
288 }
289 }
290 else {
291 transaction = transactionManager.getLocalTransaction(transactionId);
292 }
293
294 transaction.addPostCommitTask(new SendMessageTransactionTask(client, message));
295 }
296
297
298 /***
299 * Acknowledge consumption of a message by the Message Consumer
300 */
301 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
302 for (int i = 0; i < containerManagers.length; i++) {
303 containerManagers[i].acknowledgeMessage(client, ack);
304 }
305 }
306
307 /***
308 * Acknowledge consumption of a message within a transaction
309 */
310 public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException {
311 Transaction transaction;
312 if (ack.isXaTransacted()) {
313 try {
314 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
315 }
316 catch (XAException e) {
317 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
318 }
319 }
320 else {
321 transaction = transactionManager.getLocalTransaction(transactionId);
322 }
323 transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
324 transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));
325
326
327
328
329
330 for (int i = 0; i < containerManagers.length; i++) {
331 containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
332 }
333 }
334
335 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
336 for (int i = 0; i < containerManagers.length; i++) {
337 containerManagers[i].redeliverMessage(client, ack);
338 }
339 }
340
341 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
342 for (int i = 0; i < containerManagers.length; i++) {
343 containerManagers[i].deleteSubscription(clientId, subscriberName);
344 }
345 }
346
347
348 /***
349 * Start a transaction.
350 *
351 * @see org.codehaus.activemq.broker.Broker#startTransaction(org.codehaus.activemq.broker.BrokerClient, java.lang.String)
352 */
353 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
354 transactionManager.createLocalTransaction(client, transactionId);
355 }
356
357 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
358 try {
359 for (int i = 0; i < containerManagers.length; i++) {
360 containerManagers[i].commitTransaction(client, transactionId);
361 }
362 Transaction transaction = transactionManager.getLocalTransaction(transactionId);
363 transaction.commit(true);
364 }
365 catch (XAException e) {
366
367 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
368 }
369 }
370
371 /***
372 * rollback a transaction
373 */
374 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
375 try {
376 for (int i = 0; i < containerManagers.length; i++) {
377 containerManagers[i].rollbackTransaction(client, transactionId);
378 }
379 Transaction transaction = transactionManager.getLocalTransaction(transactionId);
380 transaction.rollback();
381 }
382 catch (XAException e) {
383
384 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
385 }
386 }
387
388 /***
389 * Starts an XA Transaction.
390 *
391 * @see org.codehaus.activemq.broker.Broker#startTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
392 */
393 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
394 transactionManager.createXATransaction(client, xid);
395 }
396
397 /***
398 * Prepares an XA Transaciton.
399 *
400 * @see org.codehaus.activemq.broker.Broker#prepareTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
401 */
402 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
403 Transaction transaction = transactionManager.getXATransaction(xid);
404 return transaction.prepare();
405 }
406
407 /***
408 * Rollback an XA Transaction.
409 *
410 * @see org.codehaus.activemq.broker.Broker#rollbackTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
411 */
412 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
413 Transaction transaction = transactionManager.getXATransaction(xid);
414 transaction.rollback();
415 }
416
417 /***
418 * Commit an XA Transaction.
419 *
420 * @see org.codehaus.activemq.broker.Broker#commitTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid, boolean)
421 */
422 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
423 Transaction transaction = transactionManager.getXATransaction(xid);
424 transaction.commit(onePhase);
425 }
426
427 /***
428 * Gets the prepared XA transactions.
429 *
430 * @see org.codehaus.activemq.broker.Broker#getPreparedTransactions(org.codehaus.activemq.broker.BrokerClient)
431 */
432 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
433 return transactionManager.getPreparedXATransactions();
434 }
435
436
437
438
439
440 /***
441 * Get a temp directory - used for spooling
442 *
443 * @return a File ptr to the directory
444 */
445 public File getTempDir() {
446 if (tempDir == null) {
447 String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
448 tempDir = new File(dirName);
449 }
450 return tempDir;
451 }
452
453 public String getBrokerName() {
454 return brokerName;
455 }
456
457 /***
458 * @return Returns the brokerClusterName.
459 */
460 public String getBrokerClusterName() {
461 return brokerClusterName;
462 }
463
464 public void setTempDir(File tempDir) {
465 this.tempDir = tempDir;
466 }
467
468 public MessageContainerManager[] getContainerManagers() {
469 if (containerManagers == null) {
470 containerManagers = createContainerManagers();
471 }
472 return containerManagers;
473 }
474
475 public Map getContainerManagerMap() {
476 return containerManagerMap;
477 }
478
479 public void setContainerManagerMap(Map containerManagerMap) {
480 this.containerManagerMap = containerManagerMap;
481 this.containerManagers = null;
482 }
483
484 public PersistenceAdapter getPersistenceAdapter() {
485 return persistenceAdapter;
486 }
487
488 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
489 this.persistenceAdapter = persistenceAdapter;
490 }
491
492 public TransactionManager getTransactionManager() {
493 return transactionManager;
494 }
495
496 public void setTransactionManager(TransactionManager transactionManager) {
497 this.transactionManager = transactionManager;
498 }
499
500 public SecurityAdapter getSecurityAdapter() {
501 return securityAdapter;
502 }
503
504 public void setSecurityAdapter(SecurityAdapter securityAdapter) {
505 this.securityAdapter = securityAdapter;
506 }
507
508 public RedeliveryPolicy getRedeliveryPolicy() {
509 return redeliveryPolicy;
510 }
511
512 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
513 this.redeliveryPolicy = redeliveryPolicy;
514 }
515
516 public PreparedTransactionStore getPreparedTransactionStore() {
517 return preparedTransactionStore;
518 }
519
520 public void setPreparedTransactionStore(PreparedTransactionStore preparedTransactionStore) {
521 this.preparedTransactionStore = preparedTransactionStore;
522 }
523
524 /***
525 * @return Returns the maximumMemoryUsage.
526 */
527 public long getMaximumMemoryUsage() {
528 return memoryManager.getValueLimit();
529 }
530
531 /***
532 * @param maximumMemoryUsage The maximumMemoryUsage to set.
533 */
534 public void setMaximumMemoryUsage(long maximumMemoryUsage) {
535 this.memoryManager.setValueLimit(maximumMemoryUsage);
536 }
537
538
539 public Context getDestinationContext(Hashtable environment) {
540 Map data = new ConcurrentHashMap();
541 for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) {
542 Map.Entry entry = (Map.Entry) iter.next();
543 String name = entry.getKey().toString();
544 MessageContainerManager manager = (MessageContainerManager) entry.getValue();
545 Context context = new ReadOnlyContext(environment, manager.getDestinations());
546 data.put(name, context);
547 }
548 return new ReadOnlyContext(environment, data);
549 }
550
551
552
553
554
555 protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException {
556 if (securityAdapter != null) {
557 securityAdapter.authorizeSendMessage(client, message);
558 }
559 for (int i = 0; i < containerManagers.length; i++) {
560 containerManagers[i].sendMessage(client, message);
561 }
562 }
563
564 /***
565 * Factory method to create a default persistence adapter
566 *
567 * @return
568 */
569 protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
570 File directory = new File(getStoreDirectory());
571
572
573 PersistenceAdapter answer = null;
574 String property = System.getProperty(PERSISTENCE_ADAPTER_PROPERTY);
575 if (property != null) {
576 answer = tryCreatePersistenceAdapter(property, directory, false);
577 }
578 if (answer == null) {
579 answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter", directory, true);
580 }
581 if (answer == null) {
582 answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.bdb.BDbPersistenceAdapter", directory, true);
583 }
584 if (answer != null) {
585 return answer;
586 }
587 else {
588 log.warn("Neither JDBM or BDB on the classpath or property '" + PERSISTENCE_ADAPTER_PROPERTY
589 + "' not specified so defaulting to use RAM based message persistence");
590 return new VMPersistenceAdapter();
591 }
592 }
593
594 protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException {
595 Class adapterClass = loadClass(className, ignoreErrors);
596 if (adapterClass != null) {
597
598 try {
599 Method method = adapterClass.getMethod("newInstance", NEWINSTANCE_PARAMETER_TYPES);
600 PersistenceAdapter answer = (PersistenceAdapter) method.invoke(null, new Object[]{directory});
601 log.info("Using persistence adapter: " + adapterClass.getName());
602 return answer;
603 }
604 catch (InvocationTargetException e) {
605 Throwable cause = e.getTargetException();
606 if (cause != null) {
607 if (cause instanceof JMSException) {
608 throw (JMSException) cause;
609 }
610 else {
611 if (cause instanceof Exception) {
612 throw createInstantiateAdapterException(adapterClass, (Exception) cause);
613 }
614 }
615 }
616 if (!ignoreErrors) {
617 throw createInstantiateAdapterException(adapterClass, e);
618 }
619 }
620 catch (Throwable e) {
621 if (!ignoreErrors) {
622 throw createInstantiateAdapterException(adapterClass, e);
623 }
624 }
625 }
626 return null;
627 }
628
629 protected JMSException createInstantiateAdapterException(Class adapterClass, Throwable e) {
630 return JMSExceptionHelper.newJMSException("Could not instantiate instance of "
631 + adapterClass.getName() + ". Reason: " + e, e);
632 }
633
634 /***
635 * Tries to load the given class from the current context class loader or
636 * class loader which loaded us or return null if the class could not be found
637 */
638 protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
639 try {
640 return Thread.currentThread().getContextClassLoader().loadClass(name);
641 }
642 catch (ClassNotFoundException e) {
643 try {
644 return getClass().getClassLoader().loadClass(name);
645 }
646 catch (ClassNotFoundException e2) {
647 if (ignoreErrors) {
648 log.trace("Could not find class: " + name + " on the classpath");
649 return null;
650 }
651 else {
652 throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
653 }
654 }
655 }
656 }
657
658 protected String getStoreDirectory() {
659 return System.getProperty(PROPERTY_STORE_DIRECTORY, "ActiveMQ");
660 }
661
662 /***
663 * Factory method to create the default container managers
664 *
665 * @return
666 */
667 protected MessageContainerManager[] createContainerManagers() {
668 int size = containerManagerMap.size();
669 MessageContainerManager[] answer = new MessageContainerManager[size];
670 containerManagerMap.values().toArray(answer);
671 return answer;
672 }
673
674 protected void makeDefaultContainerManagers() {
675 transientTopicMCM = new TransientTopicBoundedMessageManager(memoryManager);
676 containerManagerMap.put("transientTopicContainer", transientTopicMCM);
677 persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy);
678 containerManagerMap.put("persistentTopicContainer", persistentTopicMCM);
679 persistentQueueMCM = new DurableQueueMessageContainerManager(persistenceAdapter, redeliveryPolicy);
680 containerManagerMap.put("persistentQueueContainer", persistentQueueMCM);
681 transientQueueMCM = new TransientQueueBoundedMessageManager(memoryManager);
682 containerManagerMap.put("transientQueueContainer", transientQueueMCM);
683 }
684
685 /***
686 * Ensures the consumer is valid, throwing a meaningful exception if not
687 *
688 * @param info
689 * @throws JMSException
690 */
691 protected void validateConsumer(ConsumerInfo info) throws JMSException {
692 if (info.getConsumerId() == null) {
693 throw new JMSException("No consumerId specified for the ConsumerInfo");
694 }
695 }
696
697 protected void checkValid() throws JMSException {
698 if (containerManagers == null) {
699 throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");
700 }
701 }
702
703 /***
704 * Add a ConsumerInfoListener to the Broker
705 *
706 * @param l
707 */
708 public void addConsumerInfoListener(ConsumerInfoListener l) {
709 consumerInfoListeners.add(l);
710 }
711
712 /***
713 * Remove a ConsumerInfoListener from the Broker
714 *
715 * @param l
716 */
717 public void removeConsumerInfoListener(ConsumerInfoListener l) {
718 consumerInfoListeners.remove(l);
719 }
720
721 protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {
722 for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) {
723 ConsumerInfoListener l = (ConsumerInfoListener) i.next();
724 l.onConsumerInfo(client, info);
725 }
726 }
727
728 /***
729 * @return the MessageContainerManager for durable topics
730 */
731 public MessageContainerManager getPersistentTopicContainerManager() {
732 return persistentTopicMCM;
733 }
734
735 /***
736 * @return the MessageContainerManager for transient topics
737 */
738 public MessageContainerManager getTransientTopicContainerManager() {
739 return transientTopicMCM;
740 }
741
742 /***
743 * @return the MessageContainerManager for persistent queues
744 */
745 public MessageContainerManager getPersistentQueueContainerManager() {
746 return persistentQueueMCM;
747 }
748
749 /***
750 * @return the MessageContainerManager for transient queues
751 */
752 public MessageContainerManager getTransientQueueContainerManager() {
753 return transientQueueMCM;
754 }
755
756 }