View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.impl;
20  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21  import org.codehaus.activemq.DuplicateDurableSubscriptionException;
22  import org.codehaus.activemq.broker.BrokerClient;
23  import org.codehaus.activemq.filter.AndFilter;
24  import org.codehaus.activemq.filter.DestinationMap;
25  import org.codehaus.activemq.filter.Filter;
26  import org.codehaus.activemq.filter.FilterFactory;
27  import org.codehaus.activemq.filter.FilterFactoryImpl;
28  import org.codehaus.activemq.filter.NoLocalFilter;
29  import org.codehaus.activemq.message.ActiveMQDestination;
30  import org.codehaus.activemq.message.ActiveMQMessage;
31  import org.codehaus.activemq.message.ActiveMQTopic;
32  import org.codehaus.activemq.message.ConsumerInfo;
33  import org.codehaus.activemq.message.MessageAck;
34  import org.codehaus.activemq.service.Dispatcher;
35  import org.codehaus.activemq.service.MessageContainer;
36  import org.codehaus.activemq.service.Subscription;
37  import org.codehaus.activemq.service.SubscriptionContainer;
38  import org.codehaus.activemq.service.TopicMessageContainer;
39  import org.codehaus.activemq.service.RedeliveryPolicy;
40  import org.codehaus.activemq.store.PersistenceAdapter;
41  import javax.jms.DeliveryMode;
42  import javax.jms.Destination;
43  import javax.jms.IllegalStateException;
44  import javax.jms.JMSException;
45  import java.util.Iterator;
46  import java.util.Map;
47  import java.util.Set;
48  
49  /***
50   * A default Broker used for Topic messages for durable consumers
51   * 
52   * @version $Revision: 1.25 $
53   */
54  public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
55      private PersistenceAdapter persistenceAdapter;
56      protected SubscriptionContainer subscriptionContainer;
57      protected FilterFactory filterFactory;
58      protected Map activeSubscriptions = new ConcurrentHashMap();
59      private DestinationMap destinationMap = new DestinationMap();
60      private boolean loadedMessageContainers;
61  
62      public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy) {
63          this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy), new FilterFactoryImpl(),
64                  new DispatcherImpl());
65      }
66  
67      public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter,
68              SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
69          super(dispatcher);
70          this.persistenceAdapter = persistenceAdapter;
71          this.subscriptionContainer = subscriptionContainer;
72          this.filterFactory = filterFactory;
73      }
74  
75      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
76          if (info.isDurableTopic()) {
77              doAddMessageConsumer(client, info);
78          }
79      }
80  
81      public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
82          // we should not remove a durable topic subscription from the subscriptionContainer
83          // unless via the deleteSubscription() method
84          Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
85          if (sub != null) {
86              sub.setActive(false);
87              dispatcher.removeActiveSubscription(client, sub);
88          }
89      }
90  
91      /***
92       * Delete a durable subscriber
93       * 
94       * @param clientId
95       * @param subscriberName
96       * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
97       */
98      public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
99          boolean subscriptionFound = false;
100         for (Iterator i = subscriptionContainer.subscriptionIterator();i.hasNext();) {
101             Subscription sub = (Subscription) i.next();
102             if (sub.getClientId().equals(clientId) && sub.getSubscriberName().equals(subscriberName)) {
103                 //only delete if not active
104                 if (sub.isActive()) {
105                     throw new JMSException("The Consummer " + subscriberName + " is still active");
106                 }
107                 else {
108                     subscriptionContainer.removeSubscription(sub.getConsumerId());
109                     sub.clear();
110                     subscriptionFound = true;
111                 }
112             }
113         }
114         if (!subscriptionFound) {
115             throw new IllegalStateException("The Consumer " + subscriberName + " does not exist for client: "
116                     + clientId);
117         }
118     }
119 
120     /***
121      * @param client
122      * @param message
123      * @throws javax.jms.JMSException
124      */
125     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
126         ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
127         if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
128             MessageContainer container = getContainer(message.getJMSDestination().toString());
129             Set matchingSubscriptions = subscriptionContainer.getSubscriptions(message.getJMSActiveMQDestination());
130             // note that we still need to persist the message even if there are no matching
131             // subscribers as they may come along later
132             // plus we don't pre-load subscription information
133             container.addMessage(message);
134             if (!matchingSubscriptions.isEmpty()) {
135                 for (Iterator i = matchingSubscriptions.iterator();i.hasNext();) {
136                     Subscription sub = (Subscription) i.next();
137                     if (sub.isTarget(message)) {
138                         sub.addMessage(container, message);
139                     }
140                 }
141                 updateSendStats(client, message);
142             }
143         }
144     }
145 
146     /***
147      * Acknowledge a message as being read and consumed byh the Consumer
148      * 
149      * @param client
150      * @param ack
151      * @throws javax.jms.JMSException
152      */
153     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
154         if (ack.getDestination().isTopic()) {
155             Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
156             if (sub != null) {
157                 sub.messageConsumed(ack);
158             }
159         }
160     }
161 
162     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
163             throws JMSException {
164         Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
165         if (sub != null) {
166             sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
167         }
168     }
169 
170     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
171         Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
172         if (sub != null) {
173             // lets find all the containers that contain this message
174             for (Iterator iter = messageContainers.values().iterator();iter.hasNext();) {
175                 MessageContainer container = (MessageContainer) iter.next();
176                 if (container.containsMessage(ack.getMessageIdentity())) {
177                     sub.redeliverMessage(container, ack);
178                     // we only need to redeliver the message from one container
179                     break;
180                 }
181             }
182         }
183     }
184 
185     /***
186      * poll or messages
187      * 
188      * @throws javax.jms.JMSException
189      */
190     public void poll() throws JMSException {
191         //do nothing
192     }
193 
194     public void commitTransaction(BrokerClient client, String transactionId) {
195     }
196 
197     public void rollbackTransaction(BrokerClient client, String transactionId) {
198     }
199 
200     // Implementation methods
201     //-------------------------------------------------------------------------
202     protected MessageContainer createContainer(String destinationName) throws JMSException {
203         TopicMessageContainer topicMessageContainer = persistenceAdapter.createTopicMessageContainer(destinationName);
204         destinationMap.put(new ActiveMQTopic(destinationName), topicMessageContainer);
205         return topicMessageContainer;
206     }
207 
208     protected Destination createDestination(String destinationName) {
209         return new ActiveMQTopic(destinationName);
210     }
211 
212     protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
213         boolean shouldRecover = false;
214         if (info.getConsumerName() != null && info.getClientId() != null) {
215             for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
216                 Subscription subscription = (Subscription) iter.next();
217                 if (subscription.isSameDurableSubscription(info)) {
218                     throw new DuplicateDurableSubscriptionException(info);
219                 }
220             }
221         }
222         Subscription subscription = subscriptionContainer.getSubscription(info.getConsumerId());
223         if (subscription != null && subscription.isDurableTopic()) {
224             //check the subscription hasn't changed
225             if (!subscription.getDestination().equals(subscription.getDestination())
226                     || !subscription.getSelector().equals(info.getSelector())) {
227                 subscriptionContainer.removeSubscription(info.getConsumerId());
228                 subscription.clear();
229                 subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
230             }
231         }
232         else {
233             subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
234             shouldRecover = true;
235         }
236         subscription.setActiveConsumer(client,info);
237         activeSubscriptions.put(info.getConsumerId(), subscription);
238         dispatcher.addActiveSubscription(client, subscription);
239         if (subscription.isWildcard()) {
240             synchronized (this) {
241                 if (!loadedMessageContainers) {
242                     loadAllMessageContainers();
243                     loadedMessageContainers = true;
244                 }
245             }
246         }
247         else {
248             // load the container
249             getContainer(subscription.getDestination().getPhysicalName());
250         }
251         Set containers = destinationMap.get(subscription.getDestination());
252         for (Iterator iter = containers.iterator();iter.hasNext();) {
253             TopicMessageContainer container = (TopicMessageContainer) iter.next();
254             if (container instanceof DurableTopicMessageContainer) {
255                 ((DurableTopicMessageContainer) container).storeSubscription(info, subscription);
256             }
257         }
258         if (shouldRecover) {
259             recoverSubscriptions(subscription);
260         }
261         // lets not make the subscription active until later
262         // as we can't start dispatching until we've sent back the receipt
263         // TODO we might wish to register a post-receipt action here
264         // to perform the wakeup
265         subscription.setActive(true);
266         //dispatcher.wakeup(subscription);
267     }
268 
269     /***
270      * This method is called when a new durable subscription is started and so we need to go through each matching
271      * message container and dispatch any matching messages that may be outstanding
272      * 
273      * @param subscription
274      */
275     protected void recoverSubscriptions(Subscription subscription) throws JMSException {
276         // we should load all of the message containers from disk if we're a wildcard
277         if (subscription.isWildcard()) {
278             synchronized (this) {
279                 if (!loadedMessageContainers) {
280                     loadAllMessageContainers();
281                     loadedMessageContainers = true;
282                 }
283             }
284         }
285         else {
286             // load the container
287             getContainer(subscription.getDestination().getPhysicalName());
288         }
289         Set containers = destinationMap.get(subscription.getDestination());
290         for (Iterator iter = containers.iterator();iter.hasNext();) {
291             TopicMessageContainer container = (TopicMessageContainer) iter.next();
292             container.recoverSubscription(subscription);
293         }
294     }
295 
296     /***
297      * Called when recovering a wildcard subscription where we need to load all the durable message containers (for
298      * which we have any outstanding messages to deliver) into RAM
299      */
300     protected void loadAllMessageContainers() throws JMSException {
301         Map destinations = persistenceAdapter.getInitialDestinations();
302         if (destinations != null) {
303             for (Iterator iter = destinations.entrySet().iterator();iter.hasNext();) {
304                 Map.Entry entry = (Map.Entry) iter.next();
305                 String name = (String) entry.getKey();
306                 Destination destination = (Destination) entry.getValue();
307                 loadContainer(name, destination);
308             }
309         }
310     }
311 
312     /***
313      * Create filter for a Consumer
314      * 
315      * @param info
316      * @return the Fitler
317      * @throws javax.jms.JMSException
318      */
319     protected Filter createFilter(ConsumerInfo info) throws JMSException {
320         Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
321         if (info.isNoLocal()) {
322             filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
323         }
324         return filter;
325     }
326 }