View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.broker.BrokerClient;
23  import org.codehaus.activemq.filter.FilterFactory;
24  import org.codehaus.activemq.filter.FilterFactoryImpl;
25  import org.codehaus.activemq.message.ActiveMQDestination;
26  import org.codehaus.activemq.message.ActiveMQMessage;
27  import org.codehaus.activemq.message.ConsumerInfo;
28  import org.codehaus.activemq.service.Dispatcher;
29  import org.codehaus.activemq.service.MessageContainer;
30  import org.codehaus.activemq.service.Subscription;
31  import org.codehaus.activemq.service.SubscriptionContainer;
32  import org.codehaus.activemq.service.RedeliveryPolicy;
33  import org.codehaus.activemq.store.PersistenceAdapter;
34  
35  import javax.jms.DeliveryMode;
36  import javax.jms.JMSException;
37  import java.util.Iterator;
38  import java.util.Set;
39  
40  /***
41   * A default implementation of a Broker of Topic messages for transient consumers
42   *
43   * @version $Revision: 1.6 $
44   */
45  public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
46      private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);
47  
48      public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
49          this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
50      }
51  
52      public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
53          super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
54      }
55  
56      /***
57       * @param client
58       * @param info
59       * @throws javax.jms.JMSException
60       */
61      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
62          if (info.getDestination().isTopic()) {
63              doAddMessageConsumer(client, info);
64          }
65      }
66  
67  
68      /***
69       * @param client
70       * @param info
71       * @throws javax.jms.JMSException
72       */
73      public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
74          Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
75          if (sub != null) {
76              sub.setActive(false);
77              dispatcher.removeActiveSubscription(client, sub);
78              subscriptionContainer.removeSubscription(info.getConsumerId());
79              sub.clear();
80          }
81      }
82  
83  
84      /***
85       * @param client
86       * @param message
87       * @throws javax.jms.JMSException
88       */
89      public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
90          ActiveMQDestination destination = message.getJMSActiveMQDestination();
91          if (destination != null && destination.isTopic()) {
92              MessageContainer container = null;
93              if (log.isDebugEnabled()) {
94                  log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
95              }
96              Set subscriptions = subscriptionContainer.getSubscriptions(destination);
97              for (Iterator i = subscriptions.iterator(); i.hasNext();) {
98                  Subscription sub = (Subscription) i.next();
99                  if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
100                     if (container == null) {
101                         container = getContainer(message.getJMSDestination().toString());
102                         container.addMessage(message);
103                     }
104                     sub.addMessage(container, message);
105                 }
106             }
107             updateSendStats(client, message);
108         }
109     }
110 
111     /***
112      * Delete a durable subscriber
113      *
114      * @param clientId
115      * @param subscriberName
116      * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
117      */
118     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
119     }
120 }