1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.broker.BrokerClient;
24 import org.codehaus.activemq.filter.AndFilter;
25 import org.codehaus.activemq.filter.DestinationMap;
26 import org.codehaus.activemq.filter.Filter;
27 import org.codehaus.activemq.filter.FilterFactory;
28 import org.codehaus.activemq.filter.FilterFactoryImpl;
29 import org.codehaus.activemq.filter.NoLocalFilter;
30 import org.codehaus.activemq.message.ActiveMQDestination;
31 import org.codehaus.activemq.message.ActiveMQMessage;
32 import org.codehaus.activemq.message.ActiveMQQueue;
33 import org.codehaus.activemq.message.ConsumerInfo;
34 import org.codehaus.activemq.message.MessageAck;
35 import org.codehaus.activemq.service.Dispatcher;
36 import org.codehaus.activemq.service.MessageContainer;
37 import org.codehaus.activemq.service.QueueList;
38 import org.codehaus.activemq.service.QueueListEntry;
39 import org.codehaus.activemq.service.QueueMessageContainer;
40 import org.codehaus.activemq.service.Subscription;
41 import org.codehaus.activemq.service.SubscriptionContainer;
42 import org.codehaus.activemq.service.RedeliveryPolicy;
43 import org.codehaus.activemq.service.boundedvm.TransientQueueBoundedMessageContainer;
44 import org.codehaus.activemq.store.PersistenceAdapter;
45
46 import javax.jms.Destination;
47 import javax.jms.JMSException;
48 import java.util.Iterator;
49 import java.util.Map;
50 import java.util.Set;
51
52 /***
53 * A default Broker used for Queue messages
54 *
55 * @version $Revision: 1.1 $
56 */
57 public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport {
58 private static final Log log = LogFactory.getLog(DurableQueueMessageContainerManager.class);
59 private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
60
61 private PersistenceAdapter persistenceAdapter;
62 protected SubscriptionContainer subscriptionContainer;
63 protected FilterFactory filterFactory;
64 protected Map activeSubscriptions = new ConcurrentHashMap();
65 protected Map browsers = new ConcurrentHashMap();
66 protected DestinationMap destinationMap = new DestinationMap();
67 private Object subscriptionMutex = new Object();
68
69
70
71 public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy) {
72 this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy), new FilterFactoryImpl(), new DispatcherImpl());
73 }
74
75 public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
76 super(dispatcher);
77 this.persistenceAdapter = persistenceAdapter;
78 this.subscriptionContainer = subscriptionContainer;
79 this.filterFactory = filterFactory;
80 }
81
82 /***
83 * @param client
84 * @param info
85 * @throws javax.jms.JMSException
86 */
87 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
88 if (log.isDebugEnabled()) {
89 log.debug("Adding consumer: " + info);
90 }
91 if (info.getDestination().isQueue() && !info.getDestination().isTemporary()) {
92
93 getContainer(info.getDestination().getPhysicalName());
94
95 Subscription sub = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
96 dispatcher.addActiveSubscription(client, sub);
97 updateActiveSubscriptions(sub);
98
99
100
101 sub.setActive(true);
102 }
103 }
104
105 /***
106 * @param client
107 * @param info
108 * @throws javax.jms.JMSException
109 */
110 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
111 if (log.isDebugEnabled()) {
112 log.debug("Removing consumer: " + info);
113 }
114 if (info.getDestination() != null && info.getDestination().isQueue()) {
115 synchronized (subscriptionMutex) {
116 Subscription sub = (Subscription) subscriptionContainer.removeSubscription(info.getConsumerId());
117 if (sub != null) {
118 sub.setActive(false);
119 sub.clear();
120 dispatcher.removeActiveSubscription(client, sub);
121
122 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
123 QueueMessageContainer container = (QueueMessageContainer) iter.next();
124
125 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
126 QueueList list = getSubscriptionList(container);
127 list.remove(sub);
128 if (list.isEmpty()) {
129 activeSubscriptions.remove(sub.getDestination().getPhysicalName());
130 }
131 list = getBrowserList(container);
132 list.remove(sub);
133 if (list.isEmpty()) {
134 browsers.remove(sub.getDestination().getPhysicalName());
135 }
136 }
137 }
138 }
139 }
140 }
141 }
142
143 /***
144 * Delete a durable subscriber
145 *
146 * @param clientId
147 * @param subscriberName
148 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
149 */
150 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
151 }
152
153 /***
154 * @param client
155 * @param message
156 * @throws javax.jms.JMSException
157 */
158 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
159 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
160 if (dest != null && dest.isQueue() && !message.isTemporary()) {
161 if (log.isDebugEnabled()) {
162 log.debug("Dispaching message: " + message);
163 }
164
165 getContainer(((ActiveMQDestination) message.getJMSDestination()).getPhysicalName());
166 Set set = destinationMap.get(message.getJMSActiveMQDestination());
167 for (Iterator i = set.iterator();i.hasNext();) {
168 QueueMessageContainer container = (QueueMessageContainer) i.next();
169 container.addMessage(message);
170 dispatcher.wakeup();
171 updateSendStats(client, message);
172 }
173 }
174 }
175
176 /***
177 * Acknowledge a message as being read and consumed by the Consumer
178 *
179 * @param client
180 * @param ack
181 * @throws javax.jms.JMSException
182 */
183 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
184 if (!ack.isTemporary() && ack.getDestination().isQueue()){
185 Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
186 if (sub != null) {
187 sub.messageConsumed(ack);
188 if (ack.isMessageRead()) {
189 updateAcknowledgeStats(client, sub);
190 }
191 }
192 }
193 }
194
195 public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
196 if (!ack.isTemporary() && ack.getDestination().isQueue()){
197 Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
198 if (sub != null) {
199 sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
200 }
201 }
202 }
203
204 public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
205 if (!ack.isTemporary() && ack.getDestination().isQueue()){
206 Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
207 if (sub != null) {
208 sub.redeliverMessage(null, ack);
209 }
210 }
211 }
212
213 /***
214 * Poll for messages
215 *
216 * @throws javax.jms.JMSException
217 */
218 public void poll() throws JMSException {
219 synchronized (subscriptionMutex) {
220 for (Iterator iter = activeSubscriptions.keySet().iterator(); iter.hasNext();) {
221 QueueMessageContainer container = (QueueMessageContainer) iter.next();
222
223 QueueList browserList = (QueueList) browsers.get(container);
224 doPeek(container, browserList);
225 QueueList list = (QueueList) activeSubscriptions.get(container);
226 doPoll(container, list);
227 }
228 }
229 }
230
231 public void commitTransaction(BrokerClient client, String transactionId) {
232 }
233
234 public void rollbackTransaction(BrokerClient client, String transactionId) {
235 }
236
237 public MessageContainer getContainer(String destinationName) throws JMSException {
238 synchronized (subscriptionMutex) {
239 return super.getContainer(destinationName);
240 }
241 }
242
243
244
245
246 protected MessageContainer createContainer(String destinationName) throws JMSException {
247 QueueMessageContainer container = persistenceAdapter.createQueueMessageContainer(destinationName);
248
249
250 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
251 Subscription sub = (Subscription) iter.next();
252 if (sub.isBrowser()) {
253 updateBrowsers(container, sub);
254 }
255 else {
256 updateActiveSubscriptions(container, sub);
257 }
258 }
259
260 ActiveMQDestination key = new ActiveMQQueue(destinationName);
261 destinationMap.put(key, container);
262 return container;
263 }
264
265 protected Destination createDestination(String destinationName) {
266 return new ActiveMQQueue(destinationName);
267 }
268
269 private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException {
270 if (browsers != null && browsers.size() > 0) {
271 for (int i = 0; i < browsers.size(); i++) {
272 SubscriptionImpl sub = (SubscriptionImpl) browsers.get(i);
273 int count = 0;
274 ActiveMQMessage msg = null;
275 do {
276 msg = container.peekNext(sub.getLastMessageIdentity());
277 if (msg != null) {
278 if (sub.isTarget(msg)) {
279 sub.addMessage(container, msg);
280 dispatcher.wakeup(sub);
281 }
282 else {
283 sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
284 }
285 }
286 }
287 while (msg != null && !sub.isAtPrefetchLimit() && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
288 }
289 }
290 }
291
292 private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException {
293 int count = 0;
294 ActiveMQMessage msg = null;
295 if (subList != null && subList.size() > 0) {
296 do {
297 boolean dispatched = false;
298 msg = container.poll();
299 if (msg != null) {
300 QueueListEntry entry = subList.getFirstEntry();
301 boolean targeted = false;
302 while (entry != null) {
303 SubscriptionImpl sub = (SubscriptionImpl) entry.getElement();
304 if (sub.isTarget(msg)) {
305 targeted = true;
306 if (!sub.isAtPrefetchLimit()) {
307 sub.addMessage(container, msg);
308 dispatched = true;
309 dispatcher.wakeup(sub);
310 subList.rotate();
311 break;
312 }
313 }
314 entry = subList.getNextEntry(entry);
315 }
316 if (!dispatched) {
317 if (targeted) {
318
319
320 container.returnMessage(msg.getJMSMessageIdentity());
321 }
322 break;
323 }
324 }
325 }
326 while (msg != null && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
327 }
328 }
329
330 private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
331
332 synchronized (subscriptionMutex) {
333 boolean processedSubscriptionContainer = false;
334
335 String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
336 for (Iterator iter = messageContainers.entrySet().iterator(); iter.hasNext();) {
337 Map.Entry entry = (Map.Entry) iter.next();
338 String destinationName = (String) entry.getKey();
339 QueueMessageContainer container = (QueueMessageContainer) entry.getValue();
340
341 if (destinationName.equals(subscriptionPhysicalName)) {
342 processedSubscriptionContainer = true;
343 }
344 processSubscription(subscription, container);
345 }
346 if (!processedSubscriptionContainer) {
347 processSubscription(subscription, (QueueMessageContainer) getContainer(subscriptionPhysicalName));
348 }
349 }
350 }
351
352 protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException {
353
354 if (subscription.isBrowser()) {
355 updateBrowsers(container, subscription);
356 }
357 else {
358 updateActiveSubscriptions(container, subscription);
359 }
360 }
361
362 private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException {
363
364
365 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
366 container.reset();
367 QueueList list = getSubscriptionList(container);
368 if (!list.contains(sub)) {
369 list.add(sub);
370 }
371 }
372 }
373
374 private QueueList getSubscriptionList(QueueMessageContainer container) {
375 QueueList list = (QueueList) activeSubscriptions.get(container);
376 if (list == null) {
377 list = new DefaultQueueList();
378 activeSubscriptions.put(container, list);
379 }
380 return list;
381 }
382
383 private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException {
384
385
386 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
387 container.reset();
388 QueueList list = getBrowserList(container);
389 if (!list.contains(sub)) {
390 list.add(sub);
391 }
392 }
393 }
394
395 private QueueList getBrowserList(QueueMessageContainer container) {
396 QueueList list = (QueueList) browsers.get(container);
397 if (list == null) {
398 list = new DefaultQueueList();
399 browsers.put(container, list);
400 }
401 return list;
402 }
403
404 /***
405 * Create filter for a Consumer
406 *
407 * @param info
408 * @return the Fitler
409 * @throws javax.jms.JMSException
410 */
411 protected Filter createFilter(ConsumerInfo info) throws JMSException {
412 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
413 if (info.isNoLocal()) {
414 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
415 }
416 return filter;
417 }
418
419 }