1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.service.impl;
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.codehaus.activemq.DuplicateDurableSubscriptionException;
22 import org.codehaus.activemq.broker.BrokerClient;
23 import org.codehaus.activemq.filter.AndFilter;
24 import org.codehaus.activemq.filter.DestinationMap;
25 import org.codehaus.activemq.filter.Filter;
26 import org.codehaus.activemq.filter.FilterFactory;
27 import org.codehaus.activemq.filter.FilterFactoryImpl;
28 import org.codehaus.activemq.filter.NoLocalFilter;
29 import org.codehaus.activemq.message.ActiveMQDestination;
30 import org.codehaus.activemq.message.ActiveMQMessage;
31 import org.codehaus.activemq.message.ActiveMQTopic;
32 import org.codehaus.activemq.message.ConsumerInfo;
33 import org.codehaus.activemq.message.MessageAck;
34 import org.codehaus.activemq.service.Dispatcher;
35 import org.codehaus.activemq.service.MessageContainer;
36 import org.codehaus.activemq.service.Subscription;
37 import org.codehaus.activemq.service.SubscriptionContainer;
38 import org.codehaus.activemq.service.TopicMessageContainer;
39 import org.codehaus.activemq.service.RedeliveryPolicy;
40 import org.codehaus.activemq.store.PersistenceAdapter;
41 import javax.jms.DeliveryMode;
42 import javax.jms.Destination;
43 import javax.jms.IllegalStateException;
44 import javax.jms.JMSException;
45 import java.util.Iterator;
46 import java.util.Map;
47 import java.util.Set;
48
49 /***
50 * A default Broker used for Topic messages for durable consumers
51 *
52 * @version $Revision: 1.25 $
53 */
54 public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
55 private PersistenceAdapter persistenceAdapter;
56 protected SubscriptionContainer subscriptionContainer;
57 protected FilterFactory filterFactory;
58 protected Map activeSubscriptions = new ConcurrentHashMap();
59 private DestinationMap destinationMap = new DestinationMap();
60 private boolean loadedMessageContainers;
61
62 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy) {
63 this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy), new FilterFactoryImpl(),
64 new DispatcherImpl());
65 }
66
67 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter,
68 SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
69 super(dispatcher);
70 this.persistenceAdapter = persistenceAdapter;
71 this.subscriptionContainer = subscriptionContainer;
72 this.filterFactory = filterFactory;
73 }
74
75 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
76 if (info.isDurableTopic()) {
77 doAddMessageConsumer(client, info);
78 }
79 }
80
81 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
82
83
84 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
85 if (sub != null) {
86 sub.setActive(false);
87 dispatcher.removeActiveSubscription(client, sub);
88 }
89 }
90
91 /***
92 * Delete a durable subscriber
93 *
94 * @param clientId
95 * @param subscriberName
96 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
97 */
98 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
99 boolean subscriptionFound = false;
100 for (Iterator i = subscriptionContainer.subscriptionIterator();i.hasNext();) {
101 Subscription sub = (Subscription) i.next();
102 if (sub.getClientId().equals(clientId) && sub.getSubscriberName().equals(subscriberName)) {
103
104 if (sub.isActive()) {
105 throw new JMSException("The Consummer " + subscriberName + " is still active");
106 }
107 else {
108 subscriptionContainer.removeSubscription(sub.getConsumerId());
109 sub.clear();
110 subscriptionFound = true;
111 }
112 }
113 }
114 if (!subscriptionFound) {
115 throw new IllegalStateException("The Consumer " + subscriberName + " does not exist for client: "
116 + clientId);
117 }
118 }
119
120 /***
121 * @param client
122 * @param message
123 * @throws javax.jms.JMSException
124 */
125 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
126 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
127 if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
128 MessageContainer container = getContainer(message.getJMSDestination().toString());
129 Set matchingSubscriptions = subscriptionContainer.getSubscriptions(message.getJMSActiveMQDestination());
130
131
132
133 container.addMessage(message);
134 if (!matchingSubscriptions.isEmpty()) {
135 for (Iterator i = matchingSubscriptions.iterator();i.hasNext();) {
136 Subscription sub = (Subscription) i.next();
137 if (sub.isTarget(message)) {
138 sub.addMessage(container, message);
139 }
140 }
141 updateSendStats(client, message);
142 }
143 }
144 }
145
146 /***
147 * Acknowledge a message as being read and consumed byh the Consumer
148 *
149 * @param client
150 * @param ack
151 * @throws javax.jms.JMSException
152 */
153 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
154 if (ack.getDestination().isTopic()) {
155 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
156 if (sub != null) {
157 sub.messageConsumed(ack);
158 }
159 }
160 }
161
162 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
163 throws JMSException {
164 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
165 if (sub != null) {
166 sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
167 }
168 }
169
170 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
171 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
172 if (sub != null) {
173
174 for (Iterator iter = messageContainers.values().iterator();iter.hasNext();) {
175 MessageContainer container = (MessageContainer) iter.next();
176 if (container.containsMessage(ack.getMessageIdentity())) {
177 sub.redeliverMessage(container, ack);
178
179 break;
180 }
181 }
182 }
183 }
184
185 /***
186 * poll or messages
187 *
188 * @throws javax.jms.JMSException
189 */
190 public void poll() throws JMSException {
191
192 }
193
194 public void commitTransaction(BrokerClient client, String transactionId) {
195 }
196
197 public void rollbackTransaction(BrokerClient client, String transactionId) {
198 }
199
200
201
202 protected MessageContainer createContainer(String destinationName) throws JMSException {
203 TopicMessageContainer topicMessageContainer = persistenceAdapter.createTopicMessageContainer(destinationName);
204 destinationMap.put(new ActiveMQTopic(destinationName), topicMessageContainer);
205 return topicMessageContainer;
206 }
207
208 protected Destination createDestination(String destinationName) {
209 return new ActiveMQTopic(destinationName);
210 }
211
212 protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
213 boolean shouldRecover = false;
214 if (info.getConsumerName() != null && info.getClientId() != null) {
215 for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
216 Subscription subscription = (Subscription) iter.next();
217 if (subscription.isSameDurableSubscription(info)) {
218 throw new DuplicateDurableSubscriptionException(info);
219 }
220 }
221 }
222 Subscription subscription = subscriptionContainer.getSubscription(info.getConsumerId());
223 if (subscription != null && subscription.isDurableTopic()) {
224
225 if (!subscription.getDestination().equals(subscription.getDestination())
226 || !subscription.getSelector().equals(info.getSelector())) {
227 subscriptionContainer.removeSubscription(info.getConsumerId());
228 subscription.clear();
229 subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
230 }
231 }
232 else {
233 subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
234 shouldRecover = true;
235 }
236 subscription.setActiveConsumer(client,info);
237 activeSubscriptions.put(info.getConsumerId(), subscription);
238 dispatcher.addActiveSubscription(client, subscription);
239 if (subscription.isWildcard()) {
240 synchronized (this) {
241 if (!loadedMessageContainers) {
242 loadAllMessageContainers();
243 loadedMessageContainers = true;
244 }
245 }
246 }
247 else {
248
249 getContainer(subscription.getDestination().getPhysicalName());
250 }
251 Set containers = destinationMap.get(subscription.getDestination());
252 for (Iterator iter = containers.iterator();iter.hasNext();) {
253 TopicMessageContainer container = (TopicMessageContainer) iter.next();
254 if (container instanceof DurableTopicMessageContainer) {
255 ((DurableTopicMessageContainer) container).storeSubscription(info, subscription);
256 }
257 }
258 if (shouldRecover) {
259 recoverSubscriptions(subscription);
260 }
261
262
263
264
265 subscription.setActive(true);
266
267 }
268
269 /***
270 * This method is called when a new durable subscription is started and so we need to go through each matching
271 * message container and dispatch any matching messages that may be outstanding
272 *
273 * @param subscription
274 */
275 protected void recoverSubscriptions(Subscription subscription) throws JMSException {
276
277 if (subscription.isWildcard()) {
278 synchronized (this) {
279 if (!loadedMessageContainers) {
280 loadAllMessageContainers();
281 loadedMessageContainers = true;
282 }
283 }
284 }
285 else {
286
287 getContainer(subscription.getDestination().getPhysicalName());
288 }
289 Set containers = destinationMap.get(subscription.getDestination());
290 for (Iterator iter = containers.iterator();iter.hasNext();) {
291 TopicMessageContainer container = (TopicMessageContainer) iter.next();
292 container.recoverSubscription(subscription);
293 }
294 }
295
296 /***
297 * Called when recovering a wildcard subscription where we need to load all the durable message containers (for
298 * which we have any outstanding messages to deliver) into RAM
299 */
300 protected void loadAllMessageContainers() throws JMSException {
301 Map destinations = persistenceAdapter.getInitialDestinations();
302 if (destinations != null) {
303 for (Iterator iter = destinations.entrySet().iterator();iter.hasNext();) {
304 Map.Entry entry = (Map.Entry) iter.next();
305 String name = (String) entry.getKey();
306 Destination destination = (Destination) entry.getValue();
307 loadContainer(name, destination);
308 }
309 }
310 }
311
312 /***
313 * Create filter for a Consumer
314 *
315 * @param info
316 * @return the Fitler
317 * @throws javax.jms.JMSException
318 */
319 protected Filter createFilter(ConsumerInfo info) throws JMSException {
320 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
321 if (info.isNoLocal()) {
322 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
323 }
324 return filter;
325 }
326 }