View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message.util;
20  import java.util.ArrayList;
21  import java.util.List;
22  import javax.jms.JMSException;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.codehaus.activemq.message.Packet;
26  import org.codehaus.activemq.service.QueueListEntry;
27  import org.codehaus.activemq.service.impl.DefaultQueueList;
28  
29  /***
30   * MemoryBoundedQueue is a queue bounded by memory usage for Packets
31   * 
32   * @version $Revision: 1.6 $
33   */
34  public class MemoryBoundedQueue implements BoundedPacketQueue {
35      private MemoryBoundedQueueManager queueManager;
36      private String name;
37      private boolean stopped = false;
38      private boolean closed = false;
39      private long memoryUsedByThisQueue;
40      private Object outLock = new Object();
41      private Object inLock = new Object();
42      private DefaultQueueList internalList = new DefaultQueueList();
43      private static final int WAIT_TIMEOUT = 100;
44      private static final Log log = LogFactory.getLog(MemoryBoundedQueueManager.class);
45  
46      /***
47       * Constructor
48       * 
49       * @param name
50       * @param manager
51       */
52      MemoryBoundedQueue(String name, MemoryBoundedQueueManager manager) {
53          this.name = name;
54          this.queueManager = manager;
55      }
56  
57      /***
58       * @return the name of this MemoryBoundedQueue
59       */
60      public String getName() {
61          return name;
62      }
63  
64      /***
65       * @return a pretty print of this queue
66       */
67      public String toString() {
68          return "" + name + " , cardinality = " + size() + " memory usage = " + memoryUsedByThisQueue;
69      }
70  
71      /***
72       * @return the number of items held by this queue
73       */
74      public int size() {
75          return internalList.size();
76      }
77  
78      /***
79       * @return an aproximation the memory used by this queue
80       */
81      public long getLocalMemoryUsedByThisQueue() {
82          return memoryUsedByThisQueue;
83      }
84  
85      /***
86       * close and remove this queue from the MemoryBoundedQueueManager
87       */
88      public void close() {
89          try {
90              clear();
91              closed = true;
92              synchronized (outLock) {
93                  outLock.notifyAll();
94              }
95              synchronized (inLock) {
96                  inLock.notifyAll();
97              }
98          }
99          catch (Throwable e) {
100             e.printStackTrace();
101         }
102         finally {
103             queueManager.removeMemoryBoundedQueue(getName());
104         }
105     }
106 
107     /***
108      * Enqueue a Packet without checking memory usage limits
109      * 
110      * @param packet
111      */
112     public void enqueueNoBlock(Packet packet) {
113         if (!closed) {
114             internalList.add(packet);
115             incrementMemoryUsed(packet);
116             synchronized (outLock) {
117                 outLock.notify();
118             }
119         }
120     }
121 
122     /***
123      * Enqueue a Packet to this queue
124      * 
125      * @param packet
126      */
127     public void enqueue(Packet packet) {
128         if (!queueManager.isFull()) {
129             enqueueNoBlock(packet);
130         }
131         else {
132             synchronized (inLock) {
133                 try {
134                     while (queueManager.isFull() && !closed) {
135                         inLock.wait(WAIT_TIMEOUT);
136                     }
137                 }
138                 catch (InterruptedException ie) {
139                 }
140             }
141             enqueueNoBlock(packet);
142         }
143     }
144 
145     /***
146      * Enqueue a packet to the head of the queue with total disregard for memory constraints
147      * 
148      * @param packet
149      */
150     public final void enqueueFirstNoBlock(Packet packet) {
151         if (!closed) {
152             internalList.addFirst(packet);
153             incrementMemoryUsed(packet);
154             synchronized (outLock) {
155                 outLock.notify();
156             }
157         }
158     }
159 
160     /***
161      * Enqueue a Packet to the head of the queue
162      * 
163      * @param packet
164      * @throws InterruptedException
165      */
166     public void enqueueFirst(Packet packet) throws InterruptedException {
167         if (!queueManager.isFull()) {
168             enqueueFirstNoBlock(packet);
169         }
170         else {
171             synchronized (inLock) {
172                 while (queueManager.isFull() && !closed) {
173                     inLock.wait(WAIT_TIMEOUT);
174                 }
175             }
176             enqueueFirstNoBlock(packet);
177         }
178     }
179 
180     /***
181      * @return the first dequeued Packet or blocks until one is available
182      * @throws InterruptedException
183      */
184     public Packet dequeue() throws InterruptedException {
185         Packet result = null;
186         synchronized (outLock) {
187             while (internalList.isEmpty() && !closed) {
188                 outLock.wait(WAIT_TIMEOUT);
189             }
190             result = dequeueNoWait();
191         }
192         return result;
193     }
194 
195     /***
196      * Dequeues a Packet from the head of the queue
197      * 
198      * @param timeInMillis time to wait for a Packet to be available
199      * @return the first Packet or null if none available within <I>timeInMillis </I>
200      * @throws InterruptedException
201      */
202     public Packet dequeue(long timeInMillis) throws InterruptedException {
203         Packet result = null;
204         if (timeInMillis == 0) {
205             result = dequeue();
206         }
207         else {
208             synchronized (outLock) {
209                 // if timeInMillis is less than zero assume nowait
210                 long waitTime = timeInMillis;
211                 long start = (timeInMillis <= 0) ? 0 : System.currentTimeMillis();
212                 while (!closed) {
213                     result = dequeueNoWait();
214                     if (result != null || waitTime <= 0) {
215                         break;
216                     }
217                     else {
218                         outLock.wait(waitTime);
219                         waitTime = timeInMillis - (System.currentTimeMillis() - start);
220                     }
221                 }
222             }
223         }
224         return result;
225     }
226 
227     /***
228      * dequeues a Packet from the head of the queue
229      * 
230      * @return the Packet at the head of the queue or null, if none is available
231      * @throws InterruptedException
232      */
233     public Packet dequeueNoWait() throws InterruptedException {
234         Packet packet = null;
235         if (stopped) {
236             synchronized (outLock) {
237                 while (stopped && !closed) {
238                     outLock.wait(WAIT_TIMEOUT);
239                 }
240             }
241         }
242         packet = (Packet) internalList.removeFirst();
243         decrementMemoryUsed(packet);
244         if (packet != null) {
245             synchronized (inLock) {
246                 inLock.notify();
247             }
248         }
249         return packet;
250     }
251 
252     /***
253      * @return true if the queue is enabled for dequeing (default = true)
254      */
255     public boolean isStarted() {
256         return stopped == false;
257     }
258 
259     /***
260      * disable dequeueing
261      */
262     public void stop() {
263         synchronized (outLock) {
264             stopped = true;
265         }
266     }
267 
268     /***
269      * enable dequeueing
270      */
271     public void start() {
272         stopped = false;
273         synchronized (outLock) {
274             outLock.notifyAll();
275         }
276         synchronized (inLock) {
277             inLock.notifyAll();
278         }
279     }
280 
281     /***
282      * Remove a packet from the queue
283      * 
284      * @param packet
285      * @return true if the packet was found
286      */
287     public boolean remove(Packet packet) {
288         boolean result = false;
289         if (!internalList.isEmpty()) {
290             result = internalList.remove(packet);
291         }
292         if (result) {
293             decrementMemoryUsed(packet);
294         }
295         synchronized (inLock) {
296             inLock.notify();
297         }
298         return result;
299     }
300 
301     /***
302      * Remove a Packet by it's id
303      * 
304      * @param id
305      * @return
306      */
307     public Packet remove(String id) {
308         Packet result = null;
309         QueueListEntry entry = internalList.getFirstEntry();
310         try {
311             while (entry != null) {
312                 Packet p = (Packet) entry.getElement();
313                 if (p.getId().equals(id)) {
314                     result = p;
315                     internalList.remove(entry);
316                     break;
317                 }
318                 entry = internalList.getNextEntry(entry);
319             }
320         }
321         catch (JMSException jmsEx) {
322             jmsEx.printStackTrace();
323         }
324         synchronized (inLock) {
325             inLock.notify();
326         }
327         return result;
328     }
329 
330     /***
331      * remove any Packets in the queue
332      */
333     public void clear() {
334         while (!internalList.isEmpty()) {
335             Packet packet = (Packet) internalList.removeFirst();
336             decrementMemoryUsed(packet);
337         }
338         synchronized (inLock) {
339             inLock.notifyAll();
340         }
341     }
342 
343     /***
344      * @return true if the queue is empty
345      */
346     public boolean isEmpty() {
347         return internalList.isEmpty();
348     }
349 
350     /***
351      * retrieve a Packet at an indexed position in the queue
352      * 
353      * @param index
354      * @return
355      */
356     public Packet get(int index) {
357         return (Packet) internalList.get(index);
358     }
359 
360     /***
361      * Retrieve a shallow copy of the contents as a list
362      * 
363      * @return a list containing the bounded queue contents
364      */
365     public List getContents() {
366         Object[] array = internalList.toArray();
367         List list = new ArrayList();
368         for (int i = 0;i < array.length;i++) {
369             list.add(array[i]);
370         }
371         return list;
372     }
373 
374     private synchronized void incrementMemoryUsed(Packet packet) {
375         if (packet != null) {
376             memoryUsedByThisQueue += queueManager.incrementMemoryUsed(packet);
377         }
378     }
379 
380     private synchronized void decrementMemoryUsed(Packet packet) {
381         if (packet != null) {
382             memoryUsedByThisQueue -= queueManager.decrementMemoryUsed(packet);
383         }
384     }
385 }