View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.broker.impl;
20  
21  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
22  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.broker.Broker;
26  import org.codehaus.activemq.broker.BrokerClient;
27  import org.codehaus.activemq.broker.ConsumerInfoListener;
28  import org.codehaus.activemq.capacity.DelegateCapacityMonitor;
29  import org.codehaus.activemq.jndi.ReadOnlyContext;
30  import org.codehaus.activemq.message.ActiveMQDestination;
31  import org.codehaus.activemq.message.ActiveMQMessage;
32  import org.codehaus.activemq.message.ActiveMQXid;
33  import org.codehaus.activemq.message.ConnectionInfo;
34  import org.codehaus.activemq.message.ConsumerInfo;
35  import org.codehaus.activemq.message.MessageAck;
36  import org.codehaus.activemq.message.ProducerInfo;
37  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
38  import org.codehaus.activemq.security.SecurityAdapter;
39  import org.codehaus.activemq.service.MessageContainerManager;
40  import org.codehaus.activemq.service.Transaction;
41  import org.codehaus.activemq.service.TransactionManager;
42  import org.codehaus.activemq.service.RedeliveryPolicy;
43  import org.codehaus.activemq.service.boundedvm.TransientQueueBoundedMessageManager;
44  import org.codehaus.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
45  import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
46  import org.codehaus.activemq.service.impl.MessageAckTransactionTask;
47  import org.codehaus.activemq.service.impl.DurableQueueMessageContainerManager;
48  import org.codehaus.activemq.service.impl.RedeliverMessageTransactionTask;
49  import org.codehaus.activemq.service.impl.SendMessageTransactionTask;
50  import org.codehaus.activemq.store.PersistenceAdapter;
51  import org.codehaus.activemq.store.PreparedTransactionStore;
52  import org.codehaus.activemq.store.vm.VMPersistenceAdapter;
53  import org.codehaus.activemq.store.vm.VMTransactionManager;
54  import org.codehaus.activemq.util.Callback;
55  import org.codehaus.activemq.util.ExceptionTemplate;
56  import org.codehaus.activemq.util.JMSExceptionHelper;
57  
58  import javax.jms.JMSException;
59  import javax.naming.Context;
60  import javax.transaction.xa.XAException;
61  import java.io.File;
62  import java.lang.reflect.InvocationTargetException;
63  import java.lang.reflect.Method;
64  import java.util.Hashtable;
65  import java.util.Iterator;
66  import java.util.Map;
67  
68  /***
69   * The default {@link Broker} implementation
70   *
71   * @version $Revision: 1.22 $
72   */
73  public class DefaultBroker extends DelegateCapacityMonitor implements Broker {
74  
75      private static final Log log = LogFactory.getLog(DefaultBroker.class);
76  
77      protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
78      protected static final String PERSISTENCE_ADAPTER_PROPERTY = "activemq.persistenceAdapter";
79  
80      protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class};
81  
82      private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb
83  
84      private PersistenceAdapter persistenceAdapter;
85      private TransactionManager transactionManager;
86      private MessageContainerManager[] containerManagers;
87      private File tempDir;
88      private MemoryBoundedQueueManager memoryManager;
89      private PreparedTransactionStore preparedTransactionStore;
90      private final String brokerName;
91      private final String brokerClusterName;
92      private Map containerManagerMap;
93      private CopyOnWriteArrayList consumerInfoListeners;
94      private MessageContainerManager persistentTopicMCM;
95      private MessageContainerManager transientTopicMCM;
96      private MessageContainerManager persistentQueueMCM;
97      private MessageContainerManager transientQueueMCM;
98      private SecurityAdapter securityAdapter;
99      private RedeliveryPolicy redeliveryPolicy;
100 
101 
102     public DefaultBroker(String brokerName, String brokerClusterName) {
103         this.brokerName = brokerName;
104         this.brokerClusterName = brokerClusterName;
105         memoryManager = new MemoryBoundedQueueManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE);
106         setDelegate(memoryManager);
107         containerManagerMap = new ConcurrentHashMap();
108         consumerInfoListeners = new CopyOnWriteArrayList();
109     }
110 
111     public DefaultBroker(String brokerName) {
112         this(brokerName, "default");
113     }
114 
115     public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {
116         this(brokerName, brokerClusterName);
117         this.persistenceAdapter = persistenceAdapter;
118     }
119 
120     public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
121         this(brokerName);
122         this.persistenceAdapter = persistenceAdapter;
123     }
124 
125     /***
126      * Start this Service
127      *
128      * @throws JMSException
129      */
130     public void start() throws JMSException {
131         if (redeliveryPolicy == null) {
132             redeliveryPolicy = new RedeliveryPolicy();
133         }
134         if (persistenceAdapter == null) {
135             persistenceAdapter = createPersistenceAdapter();
136         }
137         persistenceAdapter.start();
138 
139         if (transactionManager == null) {
140             preparedTransactionStore = persistenceAdapter.createPreparedTransactionStore();
141             transactionManager = new VMTransactionManager(this, preparedTransactionStore);
142         }
143         transactionManager.start();
144 
145         // force containers to be created
146         if (containerManagerMap.isEmpty()) {
147             makeDefaultContainerManagers();
148         }
149         getContainerManagers();
150 
151         for (int i = 0; i < containerManagers.length; i++) {
152             containerManagers[i].start();
153         }
154     }
155 
156 
157     /***
158      * stop this Service
159      *
160      * @throws JMSException
161      */
162 
163     public void stop() throws JMSException {
164         ExceptionTemplate template = new ExceptionTemplate();
165 
166         if (containerManagers != null) {
167             for (int i = 0; i < containerManagers.length; i++) {
168                 final MessageContainerManager containerManager = containerManagers[i];
169                 template.run(new Callback() {
170                     public void execute() throws Throwable {
171                         containerManager.stop();
172                     }
173                 });
174             }
175         }
176         if (transactionManager != null) {
177             template.run(new Callback() {
178                 public void execute() throws Throwable {
179                     transactionManager.stop();
180                 }
181             });
182         }
183 
184         template.run(new Callback() {
185             public void execute() throws Throwable {
186                 persistenceAdapter.stop();
187             }
188         });
189 
190         template.throwJMSException();
191     }
192 
193     // Broker interface
194     //-------------------------------------------------------------------------
195 
196     public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException {
197         if (securityAdapter != null) {
198             securityAdapter.authorizeConnection(client, info);
199         }
200     }
201 
202     public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {
203         if (transactionManager != null) {
204             transactionManager.cleanUpClient(client);
205         }
206     }
207 
208     public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
209         if (securityAdapter != null) {
210             securityAdapter.authorizeProducer(client, info);
211         }
212     }
213 
214     public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
215     }
216 
217     /***
218      * Add an active message consumer
219      */
220     public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
221         validateConsumer(info);
222         if (securityAdapter != null) {
223             securityAdapter.authorizeConsumer(client, info);
224         }
225 
226         MessageContainerManager[] array = getContainerManagers();
227         for (int i = 0; i < array.length; i++) {
228             array[i].addMessageConsumer(client, info);
229         }
230         fireConsumerInfo(client, info);
231     }
232 
233     /***
234      * remove an active message consumer
235      */
236     public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
237         validateConsumer(info);
238         for (int i = 0; i < containerManagers.length; i++) {
239             containerManagers[i].removeMessageConsumer(client, info);
240         }
241         fireConsumerInfo(client, info);
242     }
243 
244 
245     /***
246      * send a message to the broker
247      */
248     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
249         checkValid();
250         if (message.getJMSMessageID() == null) {
251             throw new JMSException("No messageID specified for the Message");
252         }
253         ActiveMQDestination destination = message.getJMSActiveMQDestination();
254         if (destination.isComposite()) {
255             boolean first = true;
256 
257             for (Iterator iter = destination.getChildDestinations().iterator(); iter.hasNext();) {
258                 ActiveMQDestination childDestination = (ActiveMQDestination) iter.next();
259 
260                 // lets shallow copy just in case
261                 if (first) {
262                     first = false;
263                 }
264                 else {
265                     message = message.shallowCopy();
266                 }
267                 message.setJMSDestination(childDestination);
268 
269                 doMessageSend(client, message);
270             }
271         }
272         else {
273             doMessageSend(client, message);
274         }
275     }
276 
277     /***
278      * send a message to the broker within a transaction
279      */
280     public void sendTransactedMessage(final BrokerClient client, final String transactionId, final ActiveMQMessage message) throws JMSException {
281         Transaction transaction;
282         if (message.isXaTransacted()) {
283             try {
284                 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
285             }
286             catch (XAException e) {
287                 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
288             }
289         }
290         else {
291             transaction = transactionManager.getLocalTransaction(transactionId);
292         }
293 
294         transaction.addPostCommitTask(new SendMessageTransactionTask(client, message));
295     }
296 
297 
298     /***
299      * Acknowledge consumption of a message by the Message Consumer
300      */
301     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
302         for (int i = 0; i < containerManagers.length; i++) {
303             containerManagers[i].acknowledgeMessage(client, ack);
304         }
305     }
306 
307     /***
308      * Acknowledge consumption of a message within a transaction
309      */
310     public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException {
311         Transaction transaction;
312         if (ack.isXaTransacted()) {
313             try {
314                 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
315             }
316             catch (XAException e) {
317                 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
318             }
319         }
320         else {
321             transaction = transactionManager.getLocalTransaction(transactionId);
322         }
323         transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
324         transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));
325 
326         // we need to tell the dispatcher that we can now accept another message
327         // even though we don't really ack the message until the commit
328         // this is because if we have a prefetch value of 1, we can never consume 2 messages
329         // in a transaction, since the ack for the first message never arrives until the commit
330         for (int i = 0; i < containerManagers.length; i++) {
331             containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
332         }
333     }
334 
335     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
336         for (int i = 0; i < containerManagers.length; i++) {
337             containerManagers[i].redeliverMessage(client, ack);
338         }
339     }
340 
341     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
342         for (int i = 0; i < containerManagers.length; i++) {
343             containerManagers[i].deleteSubscription(clientId, subscriberName);
344         }
345     }
346 
347 
348     /***
349      * Start a transaction.
350      *
351      * @see org.codehaus.activemq.broker.Broker#startTransaction(org.codehaus.activemq.broker.BrokerClient, java.lang.String)
352      */
353     public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
354         transactionManager.createLocalTransaction(client, transactionId);
355     }
356 
357     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
358         try {
359             for (int i = 0; i < containerManagers.length; i++) {
360                 containerManagers[i].commitTransaction(client, transactionId);
361             }
362             Transaction transaction = transactionManager.getLocalTransaction(transactionId);
363             transaction.commit(true);
364         }
365         catch (XAException e) {
366             // TODO: I think the XAException should propagate all the way to the client.
367             throw (JMSException) new JMSException(e.getMessage()).initCause(e);
368         }
369     }
370 
371     /***
372      * rollback a transaction
373      */
374     public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
375         try {
376             for (int i = 0; i < containerManagers.length; i++) {
377                 containerManagers[i].rollbackTransaction(client, transactionId);
378             }
379             Transaction transaction = transactionManager.getLocalTransaction(transactionId);
380             transaction.rollback();
381         }
382         catch (XAException e) {
383             // TODO: I think the XAException should propagate all the way to the client.
384             throw (JMSException) new JMSException(e.getMessage()).initCause(e);
385         }
386     }
387 
388     /***
389      * Starts an XA Transaction.
390      *
391      * @see org.codehaus.activemq.broker.Broker#startTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
392      */
393     public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
394         transactionManager.createXATransaction(client, xid);
395     }
396 
397     /***
398      * Prepares an XA Transaciton.
399      *
400      * @see org.codehaus.activemq.broker.Broker#prepareTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
401      */
402     public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
403         Transaction transaction = transactionManager.getXATransaction(xid);
404         return transaction.prepare();
405     }
406 
407     /***
408      * Rollback an XA Transaction.
409      *
410      * @see org.codehaus.activemq.broker.Broker#rollbackTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
411      */
412     public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
413         Transaction transaction = transactionManager.getXATransaction(xid);
414         transaction.rollback();
415     }
416 
417     /***
418      * Commit an XA Transaction.
419      *
420      * @see org.codehaus.activemq.broker.Broker#commitTransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid, boolean)
421      */
422     public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
423         Transaction transaction = transactionManager.getXATransaction(xid);
424         transaction.commit(onePhase);
425     }
426 
427     /***
428      * Gets the prepared XA transactions.
429      *
430      * @see org.codehaus.activemq.broker.Broker#getPreparedTransactions(org.codehaus.activemq.broker.BrokerClient)
431      */
432     public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
433         return transactionManager.getPreparedXATransactions();
434     }
435 
436 
437     // Properties
438     //-------------------------------------------------------------------------
439 
440     /***
441      * Get a temp directory - used for spooling
442      *
443      * @return a File ptr to the directory
444      */
445     public File getTempDir() {
446         if (tempDir == null) {
447             String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
448             tempDir = new File(dirName);
449         }
450         return tempDir;
451     }
452 
453     public String getBrokerName() {
454         return brokerName;
455     }
456 
457     /***
458      * @return Returns the brokerClusterName.
459      */
460     public String getBrokerClusterName() {
461         return brokerClusterName;
462     }
463 
464     public void setTempDir(File tempDir) {
465         this.tempDir = tempDir;
466     }
467 
468     public MessageContainerManager[] getContainerManagers() {
469         if (containerManagers == null) {
470             containerManagers = createContainerManagers();
471         }
472         return containerManagers;
473     }
474 
475     public Map getContainerManagerMap() {
476         return containerManagerMap;
477     }
478 
479     public void setContainerManagerMap(Map containerManagerMap) {
480         this.containerManagerMap = containerManagerMap;
481         this.containerManagers = null;
482     }
483 
484     public PersistenceAdapter getPersistenceAdapter() {
485         return persistenceAdapter;
486     }
487 
488     public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
489         this.persistenceAdapter = persistenceAdapter;
490     }
491 
492     public TransactionManager getTransactionManager() {
493         return transactionManager;
494     }
495 
496     public void setTransactionManager(TransactionManager transactionManager) {
497         this.transactionManager = transactionManager;
498     }
499 
500     public SecurityAdapter getSecurityAdapter() {
501         return securityAdapter;
502     }
503 
504     public void setSecurityAdapter(SecurityAdapter securityAdapter) {
505         this.securityAdapter = securityAdapter;
506     }
507 
508     public RedeliveryPolicy getRedeliveryPolicy() {
509         return redeliveryPolicy;
510     }
511 
512     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
513         this.redeliveryPolicy = redeliveryPolicy;
514     }
515 
516     public PreparedTransactionStore getPreparedTransactionStore() {
517         return preparedTransactionStore;
518     }
519 
520     public void setPreparedTransactionStore(PreparedTransactionStore preparedTransactionStore) {
521         this.preparedTransactionStore = preparedTransactionStore;
522     }
523 
524     /***
525      * @return Returns the maximumMemoryUsage.
526      */
527     public long getMaximumMemoryUsage() {
528         return memoryManager.getValueLimit();
529     }
530 
531     /***
532      * @param maximumMemoryUsage The maximumMemoryUsage to set.
533      */
534     public void setMaximumMemoryUsage(long maximumMemoryUsage) {
535         this.memoryManager.setValueLimit(maximumMemoryUsage);
536     }
537 
538 
539     public Context getDestinationContext(Hashtable environment) {
540         Map data = new ConcurrentHashMap();
541         for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) {
542             Map.Entry entry = (Map.Entry) iter.next();
543             String name = entry.getKey().toString();
544             MessageContainerManager manager = (MessageContainerManager) entry.getValue();
545             Context context = new ReadOnlyContext(environment, manager.getDestinations());
546             data.put(name, context);
547         }
548         return new ReadOnlyContext(environment, data);
549     }
550 
551     // Implementation methods
552     //-------------------------------------------------------------------------
553 
554 
555     protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException {
556         if (securityAdapter != null) {
557             securityAdapter.authorizeSendMessage(client, message);
558         }
559         for (int i = 0; i < containerManagers.length; i++) {
560             containerManagers[i].sendMessage(client, message);
561         }
562     }
563 
564     /***
565      * Factory method to create a default persistence adapter
566      *
567      * @return
568      */
569     protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
570         File directory = new File(getStoreDirectory());
571 
572         // lets use reflection to avoid runtime dependency on persistence libraries
573         PersistenceAdapter answer = null;
574         String property = System.getProperty(PERSISTENCE_ADAPTER_PROPERTY);
575         if (property != null) {
576             answer = tryCreatePersistenceAdapter(property, directory, false);
577         }
578         if (answer == null) {
579             answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter", directory, true);
580         }
581         if (answer == null) {
582             answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.bdb.BDbPersistenceAdapter", directory, true);
583         }
584         if (answer != null) {
585             return answer;
586         }
587         else {
588             log.warn("Neither JDBM or BDB on the classpath or property '" + PERSISTENCE_ADAPTER_PROPERTY
589                     + "' not specified so defaulting to use RAM based message persistence");
590             return new VMPersistenceAdapter();
591         }
592     }
593 
594     protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException {
595         Class adapterClass = loadClass(className, ignoreErrors);
596         if (adapterClass != null) {
597 
598             try {
599                 Method method = adapterClass.getMethod("newInstance", NEWINSTANCE_PARAMETER_TYPES);
600                 PersistenceAdapter answer = (PersistenceAdapter) method.invoke(null, new Object[]{directory});
601                 log.info("Using persistence adapter: " + adapterClass.getName());
602                 return answer;
603             }
604             catch (InvocationTargetException e) {
605                 Throwable cause = e.getTargetException();
606                 if (cause != null) {
607                     if (cause instanceof JMSException) {
608                         throw (JMSException) cause;
609                     }
610                     else {
611                         if (cause instanceof Exception) {
612                             throw createInstantiateAdapterException(adapterClass, (Exception) cause);
613                         }
614                     }
615                 }
616                 if (!ignoreErrors) {
617                     throw createInstantiateAdapterException(adapterClass, e);
618                 }
619             }
620             catch (Throwable e) {
621                 if (!ignoreErrors) {
622                     throw createInstantiateAdapterException(adapterClass, e);
623                 }
624             }
625         }
626         return null;
627     }
628 
629     protected JMSException createInstantiateAdapterException(Class adapterClass, Throwable e) {
630         return JMSExceptionHelper.newJMSException("Could not instantiate instance of "
631                 + adapterClass.getName() + ". Reason: " + e, e);
632     }
633 
634     /***
635      * Tries to load the given class from the current context class loader or
636      * class loader which loaded us or return null if the class could not be found
637      */
638     protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
639         try {
640             return Thread.currentThread().getContextClassLoader().loadClass(name);
641         }
642         catch (ClassNotFoundException e) {
643             try {
644                 return getClass().getClassLoader().loadClass(name);
645             }
646             catch (ClassNotFoundException e2) {
647                 if (ignoreErrors) {
648                     log.trace("Could not find class: " + name + " on the classpath");
649                     return null;
650                 }
651                 else {
652                     throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
653                 }
654             }
655         }
656     }
657 
658     protected String getStoreDirectory() {
659         return System.getProperty(PROPERTY_STORE_DIRECTORY, "ActiveMQ");
660     }
661 
662     /***
663      * Factory method to create the default container managers
664      *
665      * @return
666      */
667     protected MessageContainerManager[] createContainerManagers() {
668         int size = containerManagerMap.size();
669         MessageContainerManager[] answer = new MessageContainerManager[size];
670         containerManagerMap.values().toArray(answer);
671         return answer;
672     }
673 
674     protected void makeDefaultContainerManagers() {
675         transientTopicMCM = new TransientTopicBoundedMessageManager(memoryManager);
676         containerManagerMap.put("transientTopicContainer", transientTopicMCM);
677         persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy);
678         containerManagerMap.put("persistentTopicContainer", persistentTopicMCM);
679         persistentQueueMCM = new DurableQueueMessageContainerManager(persistenceAdapter, redeliveryPolicy);
680         containerManagerMap.put("persistentQueueContainer", persistentQueueMCM);
681         transientQueueMCM = new TransientQueueBoundedMessageManager(memoryManager);
682         containerManagerMap.put("transientQueueContainer", transientQueueMCM);
683     }
684 
685     /***
686      * Ensures the consumer is valid, throwing a meaningful exception if not
687      *
688      * @param info
689      * @throws JMSException
690      */
691     protected void validateConsumer(ConsumerInfo info) throws JMSException {
692         if (info.getConsumerId() == null) {
693             throw new JMSException("No consumerId specified for the ConsumerInfo");
694         }
695     }
696 
697     protected void checkValid() throws JMSException {
698         if (containerManagers == null) {
699             throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");
700         }
701     }
702 
703     /***
704      * Add a ConsumerInfoListener to the Broker
705      *
706      * @param l
707      */
708     public void addConsumerInfoListener(ConsumerInfoListener l) {
709         consumerInfoListeners.add(l);
710     }
711 
712     /***
713      * Remove a ConsumerInfoListener from the Broker
714      *
715      * @param l
716      */
717     public void removeConsumerInfoListener(ConsumerInfoListener l) {
718         consumerInfoListeners.remove(l);
719     }
720 
721     protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {
722         for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) {
723             ConsumerInfoListener l = (ConsumerInfoListener) i.next();
724             l.onConsumerInfo(client, info);
725         }
726     }
727 
728     /***
729      * @return the MessageContainerManager for durable topics
730      */
731     public MessageContainerManager getPersistentTopicContainerManager() {
732         return persistentTopicMCM;
733     }
734 
735     /***
736      * @return the MessageContainerManager for transient topics
737      */
738     public MessageContainerManager getTransientTopicContainerManager() {
739         return transientTopicMCM;
740     }
741 
742     /***
743      * @return the MessageContainerManager for persistent queues
744      */
745     public MessageContainerManager getPersistentQueueContainerManager() {
746         return persistentQueueMCM;
747     }
748 
749     /***
750      * @return the MessageContainerManager for transient queues
751      */
752     public MessageContainerManager getTransientQueueContainerManager() {
753         return transientQueueMCM;
754     }
755 
756 }