1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 19 package org.codehaus.activemq.message.util; 20 import java.util.Iterator; 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 import org.codehaus.activemq.capacity.BasicCapacityMonitor; 24 import org.codehaus.activemq.message.Packet; 25 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 26 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 27 28 /*** 29 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active 30 * MemoryBoundedQueues cewated by this instance stays within the memory usage bounds set. 31 * 32 * @version $Revision: 1.2 $ 33 */ 34 public class MemoryBoundedQueueManager extends BasicCapacityMonitor { 35 private static final int OBJECT_OVERHEAD = 50; 36 SynchronizedLong totalMemoryUsedSize = new SynchronizedLong(0); 37 private ConcurrentHashMap activeQueues = new ConcurrentHashMap(); 38 private static final Log log = LogFactory.getLog(MemoryBoundedQueueManager.class); 39 40 /*** 41 * @param name 42 * @param maxSize 43 */ 44 public MemoryBoundedQueueManager(String name, long maxSize) { 45 super(name, maxSize); 46 } 47 48 /*** 49 * retrieve a named MemoryBoundedQueue or creates one if not found 50 * 51 * @param name 52 * @return an named instance of a MemoryBoundedQueue 53 */ 54 public MemoryBoundedQueue getMemoryBoundedQueue(String name) { 55 MemoryBoundedQueue result = (MemoryBoundedQueue) activeQueues.get(name); 56 if (result == null) { 57 result = new MemoryBoundedQueue(name, this); 58 activeQueues.put(name, result); 59 } 60 return result; 61 } 62 63 /*** 64 * close this queue manager and all associated MemoryBoundedQueues 65 */ 66 public void close() { 67 for (Iterator i = activeQueues.values().iterator();i.hasNext();) { 68 MemoryBoundedQueue mbq = (MemoryBoundedQueue) i.next(); 69 mbq.close(); 70 } 71 activeQueues.clear(); 72 } 73 74 /*** 75 * @return the calculated total memory usage assocated with all it's queues 76 */ 77 public long getTotalMemoryUsedSize() { 78 return totalMemoryUsedSize.get(); 79 } 80 81 /*** 82 * @return true if this MemoryBoundedQueueManager has reached it's predefined limit 83 */ 84 public boolean isFull() { 85 boolean result = totalMemoryUsedSize.get() >= super.getValueLimit(); 86 return result; 87 } 88 89 int incrementMemoryUsed(Packet obj) { 90 int size = OBJECT_OVERHEAD; 91 if (obj != null) { 92 if (obj.getMemoryUsageReferenceCount() == 0) { 93 size += obj.getMemoryUsage(); 94 } 95 obj.incrementMemoryReferenceCount(); 96 } 97 totalMemoryUsedSize.add(size); 98 super.setCurrentValue(totalMemoryUsedSize.get()); 99 return size; 100 } 101 102 int decrementMemoryUsed(Packet obj) { 103 int size = OBJECT_OVERHEAD; 104 if (obj != null) { 105 obj.decrementMemoryReferenceCount(); 106 if (obj.getMemoryUsageReferenceCount() == 0) { 107 size += obj.getMemoryUsage(); 108 } 109 } 110 totalMemoryUsedSize.subtract(size); 111 super.setCurrentValue(totalMemoryUsedSize.get()); 112 return size; 113 } 114 115 protected void finalize() { 116 close(); 117 } 118 119 void removeMemoryBoundedQueue(String name) { 120 activeQueues.remove(name); 121 } 122 }