1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.broker.BrokerClient;
24 import org.codehaus.activemq.message.ActiveMQMessage;
25 import org.codehaus.activemq.service.MessageContainerManager;
26 import org.codehaus.activemq.service.Service;
27 import org.codehaus.activemq.service.Subscription;
28
29 import javax.jms.JMSException;
30 import java.util.Map;
31 import java.util.Iterator;
32
33 /***
34 * A Dispatcher that polls for updates for active Message Consumers
35 *
36 * @version $Revision: 1.9 $
37 */
38 public class DispatchWorker implements Runnable, Service {
39 private static final Log log = LogFactory.getLog(DispatchWorker.class);
40 private static final int POLL_TIMEOUT = 250;
41
42 private Map subscriptions = new ConcurrentHashMap(1000, 0.75f);
43 private Object lock = new Object();
44 private boolean active = true;
45 private boolean started = false;
46 private MessageContainerManager containerManager;
47
48 /***
49 * Register the MessageContainerManager for the Dispatcher
50 *
51 * @param mcm
52 */
53 public void register(MessageContainerManager mcm) {
54 this.containerManager = mcm;
55 }
56
57 /***
58 * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is
59 * waiting for messages to dispatch
60 */
61 public void wakeup() {
62 synchronized (lock) {
63 active = true;
64 lock.notifyAll();
65 }
66 }
67
68 /***
69 * Add an active subscription
70 *
71 * @param client
72 * @param sub
73 */
74 public void addActiveSubscription(BrokerClient client, Subscription sub) {
75 if (log.isDebugEnabled()) {
76 log.info("Adding subscription: " + sub + " to client: " + client);
77 }
78 subscriptions.put(sub, client);
79 }
80
81 /***
82 * remove an active subscription
83 *
84 * @param client
85 * @param sub
86 */
87 public void removeActiveSubscription(BrokerClient client, Subscription sub) {
88 if (log.isDebugEnabled()) {
89 log.info("Removing subscription: " + sub + " from client: " + client);
90 }
91 subscriptions.remove(sub);
92 }
93
94 /***
95 * dispatch messages to active Consumers
96 *
97 * @see java.lang.Runnable#run()
98 */
99 public void run() {
100 while (started) {
101 doPoll();
102 boolean dispatched = false;
103 try {
104
105 for (Iterator iter = subscriptions.keySet().iterator(); iter.hasNext();) {
106 Subscription sub = (Subscription) iter.next();
107 if (sub != null && sub.isReadyToDispatch()) {
108 dispatched = dispatchMessages(sub, dispatched);
109 }
110 }
111 }
112 catch (JMSException jmsEx) {
113 log.error("Could not dispatch to Subscription: " + jmsEx, jmsEx);
114 }
115 if (!dispatched) {
116 synchronized (lock) {
117 active = false;
118 if (!active && started) {
119 try {
120 lock.wait(POLL_TIMEOUT);
121 }
122 catch (InterruptedException e) {
123 }
124 }
125 }
126 }
127 }
128 }
129
130
131 /***
132 * start the DispatchWorker
133 *
134 * @see org.codehaus.activemq.service.Service#start()
135 */
136 public void start() {
137 started = true;
138 }
139
140 /***
141 * stop the DispatchWorker
142 *
143 * @see org.codehaus.activemq.service.Service#stop()
144 */
145 public void stop() {
146 started = false;
147 }
148
149
150
151
152
153 protected boolean dispatchMessages(Subscription subscription, boolean dispatched) throws JMSException {
154 ActiveMQMessage[] msgs = subscription.getMessagesToDispatch();
155 if (msgs != null && msgs.length > 0) {
156 BrokerClient client = (BrokerClient) subscriptions.get(subscription);
157 if (client == null) {
158 log.warn("Null client for subscription: " + subscription);
159 }
160 else {
161 for (int i = 0; i < msgs.length; i++) {
162 ActiveMQMessage msg = msgs[i].shallowCopy();
163
164 if (log.isDebugEnabled()) {
165 log.debug("Dispatching message: " + msg);
166 }
167 int[] consumerNos = new int[1];
168 consumerNos[0] = subscription.getConsumerNumber();
169 msg.setConsumerNos(consumerNos);
170 client.dispatch(msg);
171 dispatched = true;
172 }
173 }
174 }
175 return dispatched;
176 }
177
178 protected void doPoll() {
179 if (containerManager != null && started) {
180 try {
181 containerManager.poll();
182 }
183 catch (JMSException e) {
184 log.error("Error polling from the ContainerManager: ", e);
185 }
186 }
187 }
188 }