View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.service.boundedvm;
20  import java.util.Collections;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Set;
24  import javax.jms.Destination;
25  import javax.jms.JMSException;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.broker.BrokerClient;
29  import org.codehaus.activemq.filter.AndFilter;
30  import org.codehaus.activemq.filter.DestinationMap;
31  import org.codehaus.activemq.filter.Filter;
32  import org.codehaus.activemq.filter.FilterFactory;
33  import org.codehaus.activemq.filter.FilterFactoryImpl;
34  import org.codehaus.activemq.filter.NoLocalFilter;
35  import org.codehaus.activemq.message.ActiveMQDestination;
36  import org.codehaus.activemq.message.ActiveMQMessage;
37  import org.codehaus.activemq.message.ConsumerInfo;
38  import org.codehaus.activemq.message.MessageAck;
39  import org.codehaus.activemq.message.util.MemoryBoundedQueue;
40  import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
41  import org.codehaus.activemq.service.MessageContainer;
42  import org.codehaus.activemq.service.MessageContainerManager;
43  import org.codehaus.activemq.service.impl.DispatcherImpl;
44  import org.codehaus.activemq.service.impl.MessageContainerManagerSupport;
45  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
46  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
47  
48  /***
49   * A MessageContainerManager for transient queues
50   * 
51   * @version $Revision: 1.4 $
52   */
53  /***
54   * A manager of MessageContainer instances
55   */
56  public class TransientQueueBoundedMessageManager implements MessageContainerManager, Runnable {
57      private static final int GARBAGE_COLLECTION_CAPACITY_LIMIT = 20;
58      private static final Log log = LogFactory.getLog(TransientQueueBoundedMessageManager.class);
59      private MemoryBoundedQueueManager queueManager;
60      private ConcurrentHashMap containers;
61      private ConcurrentHashMap subscriptions;
62      private FilterFactory filterFactory;
63      private SynchronizedBoolean started;
64      private SynchronizedBoolean doingGarbageCollection;
65      private Map destinations;
66      private DestinationMap destinationMap;
67      private Thread garbageCollectionThread;
68  
69      /***
70       * Constructor for TransientQueueBoundedMessageManager
71       * 
72       * @param mgr
73       */
74      public TransientQueueBoundedMessageManager(MemoryBoundedQueueManager mgr) {
75          this.queueManager = mgr;
76          this.containers = new ConcurrentHashMap();
77          this.destinationMap = new DestinationMap();
78          this.destinations = new ConcurrentHashMap();
79          this.subscriptions = new ConcurrentHashMap();
80          this.filterFactory = new FilterFactoryImpl();
81          this.started = new SynchronizedBoolean(false);
82          this.doingGarbageCollection = new SynchronizedBoolean(false);
83      }
84  
85      /***
86       * start the manager
87       * 
88       * @throws JMSException
89       */
90      public void start() throws JMSException {
91          if (started.commit(false, true)) {
92              for (Iterator i = containers.values().iterator();i.hasNext();) {
93                  TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
94                  container.start();
95              }
96              garbageCollectionThread = new Thread(this);
97              garbageCollectionThread.setName("TQMCMGarbageCollector");
98              garbageCollectionThread.start();
99          }
100     }
101 
102     /***
103      * stop the manager
104      * 
105      * @throws JMSException
106      */
107     public void stop() throws JMSException {
108         if (started.commit(true, false)) {
109             for (Iterator i = containers.values().iterator();i.hasNext();) {
110                 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
111                 container.stop();
112             }
113             if (garbageCollectionThread != null) {
114                 garbageCollectionThread.interrupt();
115             }
116         }
117     }
118 
119     /***
120      * collect expired messages
121      */
122     public void run() {
123         while (started.get()) {
124             doGarbageCollection();
125             try {
126                 Thread.sleep(2000);
127             }
128             catch (InterruptedException e) {
129             }
130         }
131     }
132 
133     /***
134      * Add a consumer if appropiate
135      * 
136      * @param client
137      * @param info
138      * @throws JMSException
139      */
140     public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
141         ActiveMQDestination destination = info.getDestination();
142         if (destination.isQueue()) {
143             TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) containers
144                     .get(destination);
145             if (container == null) {
146                 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
147                 container = new TransientQueueBoundedMessageContainer(queueManager, destination);
148                 addContainer(container);
149                 if (started.get()) {
150                     container.start();
151                 }
152             }
153             TransientQueueSubscription ts = container.addConsumer(createFilter(info), info, client);
154             if (ts != null) {
155                 subscriptions.put(info.getConsumerId(), ts);
156             }
157             String name = destination.getPhysicalName();
158             if (!destinations.containsKey(name)) {
159                 destinations.put(name, destination);
160             }
161         }
162     }
163 
164     /***
165      * @param client
166      * @param info
167      * @throws JMSException
168      */
169     public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
170         ActiveMQDestination destination = info.getDestination();
171         if (destination.isQueue()) {
172             for (Iterator i = containers.values().iterator();i.hasNext();) {
173                 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
174                 if (container != null) {
175                     container.removeConsumer(info);
176                 }
177             }
178             subscriptions.remove(info.getConsumerId());
179         }
180     }
181 
182     /***
183      * Delete a durable subscriber
184      * 
185      * @param clientId
186      * @param subscriberName
187      * @throws JMSException if the subscriber doesn't exist or is still active
188      */
189     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
190     }
191 
192     /***
193      * @param client
194      * @param message
195      * @throws JMSException
196      */
197     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
198         if (message != null && message.getJMSActiveMQDestination().isQueue() && (message.isTemporary())) {
199             if (queueManager.getCurrentCapacity() <= GARBAGE_COLLECTION_CAPACITY_LIMIT) {
200                 doGarbageCollection();
201             }
202             Set set = destinationMap.get(message.getJMSActiveMQDestination());
203             for (Iterator i = set.iterator();i.hasNext();) {
204                 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
205                 container.enqueue(message);
206             }
207         }
208     }
209 
210     /***
211      * @param client
212      * @param ack
213      * @throws JMSException
214      */
215     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
216         TransientQueueSubscription ts = (TransientQueueSubscription) subscriptions.get(ack.getConsumerId());
217         if (ts != null) {
218             ActiveMQMessage message = ts.acknowledgeMessage(ack.getMessageID());
219             if (message != null && !ack.isMessageRead()){
220                 message.setJMSRedelivered(true);
221                 Set set = destinationMap.get(message.getJMSActiveMQDestination());
222                 for (Iterator i = set.iterator();i.hasNext();) {
223                     TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
224                     container.enqueueFirst(message);
225                     break;
226                 }
227             }  
228         }
229     }
230 
231     /***
232      * @param client
233      * @param transactionId
234      * @param ack
235      * @throws JMSException
236      */
237     public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
238             throws JMSException {
239     }
240 
241     /***
242      * @param client
243      * @param ack
244      * @throws JMSException
245      */
246     public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
247         TransientQueueSubscription ts = (TransientQueueSubscription) subscriptions.get(ack.getConsumerId());
248         if (ts != null) {
249             ActiveMQMessage message = ts.acknowledgeMessage(ack.getMessageID());
250             if (message != null){
251                 message.setJMSRedelivered(true);
252                 Set set = destinationMap.get(message.getJMSActiveMQDestination());
253                 for (Iterator i = set.iterator();i.hasNext();) {
254                     TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
255                     container.enqueueFirst(message);
256                     break;
257                 }
258             }  
259         }
260     }
261 
262     /***
263      * @throws JMSException
264      */
265     public void poll() throws JMSException {
266     }
267 
268     /***
269      * A hook when the transaction is about to be commited; so apply all outstanding commands to the Journal if using a
270      * Journal (transaction log)
271      * 
272      * @param client
273      * @param transactionId
274      * @throws JMSException
275      */
276     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
277     }
278 
279     /***
280      * A hook when the transaction is about to be rolled back; so discard all outstanding commands that are pending to
281      * be written to the Journal
282      * 
283      * @param client
284      * @param transactionId
285      */
286     public void rollbackTransaction(BrokerClient client, String transactionId) {
287     }
288 
289     /***
290      * @param physicalName
291      * @return @throws JMSException
292      */
293     public MessageContainer getContainer(String physicalName) throws JMSException {
294         Object key = destinations.get(physicalName);
295         if (key != null) {
296             return (MessageContainer) containers.get(key);
297         }
298         return null;
299     }
300 
301     /***
302      * @return a map of destinations
303      */
304     public Map getDestinations() {
305         return Collections.unmodifiableMap(destinations);
306     }
307 
308     /***
309      * Create filter for a Consumer
310      * 
311      * @param info
312      * @return the Fitler
313      * @throws javax.jms.JMSException
314      */
315     protected Filter createFilter(ConsumerInfo info) throws JMSException {
316         Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
317         if (info.isNoLocal()) {
318             filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
319         }
320         return filter;
321     }
322 
323     private void doGarbageCollection() {
324         if (doingGarbageCollection.commit(true, false)) {
325             if (queueManager.getCurrentCapacity() <= GARBAGE_COLLECTION_CAPACITY_LIMIT) {
326                 for (Iterator i = containers.values().iterator();i.hasNext();) {
327                     TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
328                     container.removeExpiredMessages();
329                 }
330             }
331             //if still below the limit - clear queues with no subscribers
332             if (queueManager.getCurrentCapacity() <= GARBAGE_COLLECTION_CAPACITY_LIMIT) {
333                 for (Iterator i = containers.values().iterator();i.hasNext();) {
334                     TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
335                     if (!container.hasActiveSubscribers()) {
336                         container.clear();
337                     }
338                 }
339             }
340             for (Iterator i = containers.values().iterator();i.hasNext();) {
341                 TransientQueueBoundedMessageContainer container = (TransientQueueBoundedMessageContainer) i.next();
342                 if (container.isInactive()) {
343                     try {
344                         container.close();
345                         log.info("closed inactive transient queue container: " + container.getDestinationName());
346                     }
347                     catch (JMSException e) {
348                         log.warn("failure closing container", e);
349                     }
350                     removeContainer(container);
351                 }
352             }
353             //now close any inactive queues
354             doingGarbageCollection.set(false);
355         }
356     }
357 
358     private synchronized void addContainer(TransientQueueBoundedMessageContainer container) {
359         containers.put(container.getDestination(), container);
360         destinationMap.put(container.getDestination(), container);
361     }
362 
363     private synchronized void removeContainer(TransientQueueBoundedMessageContainer container) {
364         containers.remove(container.getDestination());
365         destinationMap.remove(container.getDestination(), container);
366     }
367 
368     
369     protected Destination createDestination(String destinationName) {
370         return null;
371     }
372 
373     protected MessageContainer createContainer(String destinationName) throws JMSException {
374         return null;
375     }
376 }