1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.service.impl; 19 20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 21 import org.codehaus.activemq.broker.BrokerClient; 22 import org.codehaus.activemq.filter.DestinationMap; 23 import org.codehaus.activemq.filter.Filter; 24 import org.codehaus.activemq.message.ActiveMQDestination; 25 import org.codehaus.activemq.message.ConsumerInfo; 26 import org.codehaus.activemq.service.Dispatcher; 27 import org.codehaus.activemq.service.Subscription; 28 import org.codehaus.activemq.service.SubscriptionContainer; 29 import org.codehaus.activemq.service.RedeliveryPolicy; 30 31 import java.util.HashSet; 32 import java.util.Iterator; 33 import java.util.Map; 34 import java.util.Set; 35 36 /*** 37 * A default RAM only implementation of the {@link SubscriptionContainer} 38 * 39 * @version $Revision: 1.6 $ 40 */ 41 public class SubscriptionContainerImpl implements SubscriptionContainer { 42 private Map subscriptions; 43 private DestinationMap destinationIndex = new DestinationMap(); 44 private RedeliveryPolicy redeliveryPolicy; 45 46 public SubscriptionContainerImpl(RedeliveryPolicy redeliveryPolicy) { 47 this(new ConcurrentHashMap(), redeliveryPolicy); 48 } 49 50 public SubscriptionContainerImpl(Map subscriptions, RedeliveryPolicy redeliveryPolicy) { 51 this.subscriptions = subscriptions; 52 this.redeliveryPolicy = redeliveryPolicy; 53 } 54 55 public String toString() { 56 return super.toString() + "[size:" + subscriptions.size() + "]"; 57 } 58 59 public RedeliveryPolicy getRedeliveryPolicy() { 60 return redeliveryPolicy; 61 } 62 63 public Subscription getSubscription(String consumerId) { 64 return (Subscription) subscriptions.get(consumerId); 65 } 66 67 public Subscription removeSubscription(String consumerId) { 68 Subscription subscription = (Subscription) subscriptions.remove(consumerId); 69 if (subscription != null) { 70 destinationIndex.remove(subscription.getDestination(), subscription); 71 } 72 return subscription; 73 } 74 75 public Set getSubscriptions(ActiveMQDestination destination) { 76 Object answer = destinationIndex.get(destination); 77 if (answer instanceof Set) { 78 return (Set) answer; 79 } 80 else { 81 Set set = new HashSet(1); 82 set.add(answer); 83 return set; 84 } 85 } 86 87 public Iterator subscriptionIterator() { 88 return subscriptions.values().iterator(); 89 } 90 91 public Subscription makeSubscription(Dispatcher dispatcher,BrokerClient client, ConsumerInfo info, Filter filter) { 92 Subscription subscription = createSubscription(dispatcher, client,info, filter); 93 subscriptions.put(info.getConsumerId(), subscription); 94 destinationIndex.put(subscription.getDestination(), subscription); 95 return subscription; 96 } 97 98 protected Subscription createSubscription(Dispatcher dispatcher, BrokerClient client,ConsumerInfo info, Filter filter) { 99 return new SubscriptionImpl(dispatcher, client,info, filter, getRedeliveryPolicy()); 100 } 101 }