1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.ActiveMQMessage;
24 import org.codehaus.activemq.message.util.MemoryBoundedQueue;
25
26 import javax.jms.JMSException;
27 import java.util.Iterator;
28
29 /***
30 * A utility class used by the Sessionfor dispatching messages asycnronously to consumers
31 *
32 * @version $Revision: 1.3 $
33 * @see javax.jms.Session
34 */
35 class ActiveMQSessionExecutor implements Runnable {
36 private static final Log log = LogFactory.getLog(ActiveMQSessionExecutor.class);
37 private ActiveMQSession session;
38 private MemoryBoundedQueue messageQueue;
39 private boolean closed;
40 private Thread runner;
41 private boolean doDispatch;
42
43 ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
44 this.session = session;
45 this.messageQueue = queue;
46 this.doDispatch = true;
47 }
48
49 void setDoDispatch(boolean value) {
50 doDispatch = value;
51 }
52
53 void execute(ActiveMQMessage message) {
54 messageQueue.enqueue(message);
55 }
56
57 void executeFirst(ActiveMQMessage message) {
58 messageQueue.enqueueFirstNoBlock(message);
59 }
60
61 /***
62 * implementation of Runnable
63 */
64 public void run() {
65 while (!closed && doDispatch) {
66 ActiveMQMessage message = null;
67 try {
68 message = (ActiveMQMessage) messageQueue.dequeue(100);
69 }
70 catch (InterruptedException ie) {
71 }
72 if (!closed) {
73 if (message != null) {
74 if (doDispatch) {
75 for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
76 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
77 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
78 try {
79 consumer.processMessage(message.shallowCopy());
80 }
81 catch (JMSException e) {
82 this.session.connection.handleAsyncException(e);
83 }
84 }
85 }
86 }
87 else {
88 messageQueue.enqueueFirstNoBlock(message);
89 }
90 }
91 }
92 }
93 }
94
95 synchronized void start() {
96 messageQueue.start();
97 if (runner == null && doDispatch) {
98 runner = new Thread(this, "JmsSessionDispather: " + session.getSessionId());
99 runner.setPriority(Thread.MAX_PRIORITY);
100
101 runner.start();
102 }
103 }
104
105 synchronized void stop() {
106 messageQueue.stop();
107 }
108
109 synchronized void close() {
110 closed = true;
111 messageQueue.close();
112 }
113
114 void clear() {
115 messageQueue.clear();
116 }
117
118 ActiveMQMessage dequeueNoWait() {
119 try {
120 return (ActiveMQMessage) messageQueue.dequeueNoWait();
121 }
122 catch (InterruptedException ie) {
123 return null;
124 }
125 }
126
127 protected void clearMessagesInProgress(){
128 messageQueue.clear();
129 }
130 }