View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.message.util;
20  import java.util.Iterator;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.capacity.BasicCapacityMonitor;
24  import org.codehaus.activemq.message.Packet;
25  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
26  import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
27  
28  /***
29   * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active
30   * MemoryBoundedQueues cewated by this instance stays within the memory usage bounds set.
31   * 
32   * @version $Revision: 1.2 $
33   */
34  public class MemoryBoundedQueueManager extends BasicCapacityMonitor {
35      private static final int OBJECT_OVERHEAD = 50;
36      SynchronizedLong totalMemoryUsedSize = new SynchronizedLong(0);
37      private ConcurrentHashMap activeQueues = new ConcurrentHashMap();
38      private static final Log log = LogFactory.getLog(MemoryBoundedQueueManager.class);
39  
40      /***
41       * @param name
42       * @param maxSize
43       */
44      public MemoryBoundedQueueManager(String name, long maxSize) {
45          super(name, maxSize);
46      }
47  
48      /***
49       * retrieve a named MemoryBoundedQueue or creates one if not found
50       * 
51       * @param name
52       * @return an named instance of a MemoryBoundedQueue
53       */
54      public MemoryBoundedQueue getMemoryBoundedQueue(String name) {
55          MemoryBoundedQueue result = (MemoryBoundedQueue) activeQueues.get(name);
56          if (result == null) {
57              result = new MemoryBoundedQueue(name, this);
58              activeQueues.put(name, result);
59          }
60          return result;
61      }
62  
63      /***
64       * close this queue manager and all associated MemoryBoundedQueues
65       */
66      public void close() {
67          for (Iterator i = activeQueues.values().iterator();i.hasNext();) {
68              MemoryBoundedQueue mbq = (MemoryBoundedQueue) i.next();
69              mbq.close();
70          }
71          activeQueues.clear();
72      }
73  
74      /***
75       * @return the calculated total memory usage assocated with all it's queues
76       */
77      public long getTotalMemoryUsedSize() {
78          return totalMemoryUsedSize.get();
79      }
80  
81      /***
82       * @return true if this MemoryBoundedQueueManager has reached it's predefined limit
83       */
84      public boolean isFull() {
85          boolean result = totalMemoryUsedSize.get() >= super.getValueLimit();
86          return result;
87      }
88  
89      int incrementMemoryUsed(Packet obj) {
90          int size = OBJECT_OVERHEAD;
91          if (obj != null) {
92              if (obj.getMemoryUsageReferenceCount() == 0) {
93                  size += obj.getMemoryUsage();
94              }
95              obj.incrementMemoryReferenceCount();
96          }
97          totalMemoryUsedSize.add(size);
98          super.setCurrentValue(totalMemoryUsedSize.get());
99          return size;
100     }
101 
102     int decrementMemoryUsed(Packet obj) {
103         int size = OBJECT_OVERHEAD;
104         if (obj != null) {
105             obj.decrementMemoryReferenceCount();
106             if (obj.getMemoryUsageReferenceCount() == 0) {
107                 size += obj.getMemoryUsage();
108             }
109         }
110         totalMemoryUsedSize.subtract(size);
111         super.setCurrentValue(totalMemoryUsedSize.get());
112         return size;
113     }
114 
115     protected void finalize() {
116         close();
117     }
118 
119     void removeMemoryBoundedQueue(String name) {
120         activeQueues.remove(name);
121     }
122 }