1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.transport;
20 import java.net.URI;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.Map;
24 import javax.jms.ExceptionListener;
25 import javax.jms.JMSException;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.UnsupportedWireFormatException;
29 import org.codehaus.activemq.message.Packet;
30 import org.codehaus.activemq.message.PacketListener;
31 import org.codehaus.activemq.message.Receipt;
32 import org.codehaus.activemq.message.ReceiptHolder;
33 import org.codehaus.activemq.message.WireFormatInfo;
34 import org.codehaus.activemq.util.ExecutorHelper;
35 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
36 import EDU.oswego.cs.dl.util.concurrent.Executor;
37
38 /***
39 * Some basic functionality, common across most transport implementations of channels
40 *
41 * @version $Revision: 1.5 $
42 */
43 public abstract class TransportChannelSupport implements TransportChannel {
44 private static final Log log = LogFactory.getLog(TransportChannelSupport.class);
45 private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
46 private HashMap requestMap = new HashMap();
47 private PacketListener packetListener;
48 private ExceptionListener exceptionListener;
49 private String clientID;
50 private TransportChannelListener transportChannelListener;
51 private boolean serverSide;
52 protected boolean pendingStop = false;
53 protected boolean transportConnected = true;
54
55 /***
56 * Give the TransportChannel a hint it's about to stop
57 *
58 * @param pendingStop
59 */
60 public void setPendingStop(boolean pendingStop) {
61 this.pendingStop = pendingStop;
62 }
63
64 /***
65 * @return true if the channel is about to stop
66 */
67 public boolean isPendingStop() {
68 return pendingStop;
69 }
70
71 /***
72 * close the channel
73 */
74 public void stop() {
75 transportConnected = false;
76 Map map = (Map) this.requestMap.clone();
77 for (Iterator i = map.values().iterator();i.hasNext();) {
78 ReceiptHolder rh = (ReceiptHolder) i.next();
79 rh.close();
80 }
81 map.clear();
82 requestMap.clear();
83 if (transportChannelListener != null) {
84 transportChannelListener.removeClient(this);
85 }
86 exceptionListener = null;
87 packetListener = null;
88 }
89
90 /***
91 * synchronously send a Packet
92 *
93 * @param packet
94 * @return a Receipt
95 * @throws JMSException
96 */
97 public Receipt send(Packet packet) throws JMSException {
98 return send(packet, 0);
99 }
100
101 /***
102 * Synchronously send a Packet
103 *
104 * @param packet packet to send
105 * @param timeout amount of time to wait for a receipt
106 * @return the Receipt
107 * @throws JMSException
108 */
109 public Receipt send(Packet packet, int timeout) throws JMSException {
110 ReceiptHolder rh = new ReceiptHolder();
111 requestMap.put(packet.getId(), rh);
112 doAsyncSend(packet);
113 Receipt result = rh.getReceipt(timeout);
114 return result;
115 }
116
117
118
119 /***
120 * @return the transportChannelListener
121 */
122 public TransportChannelListener getTransportChannelListener() {
123 return transportChannelListener;
124 }
125
126 /***
127 * @param transportChannelListener
128 */
129 public void setTransportChannelListener(TransportChannelListener transportChannelListener) {
130 this.transportChannelListener = transportChannelListener;
131 }
132
133 /***
134 * Add a listener for changes in a channels status
135 *
136 * @param listener
137 */
138 public void addTransportStatusEventListener(TransportStatusEventListener listener) {
139 listeners.add(listener);
140 }
141
142 /***
143 * Remove a listener for changes in a channels status
144 *
145 * @param listener
146 */
147 public void removeTransportStatusEventListener(TransportStatusEventListener listener) {
148 listeners.remove(listener);
149 }
150
151 /***
152 * @return the clientID
153 */
154 public String getClientID() {
155 return clientID;
156 }
157
158 /***
159 * @param clientID set the clientID
160 */
161 public void setClientID(String clientID) {
162 this.clientID = clientID;
163 }
164
165 /***
166 * @return the exception listener
167 */
168 public ExceptionListener getExceptionListener() {
169 return exceptionListener;
170 }
171
172 /***
173 * @return the packet listener
174 */
175 public PacketListener getPacketListener() {
176 return packetListener;
177 }
178
179 /***
180 * Set a listener for Packets
181 *
182 * @param l
183 */
184 public void setPacketListener(PacketListener l) {
185 this.packetListener = l;
186 }
187
188 /***
189 * Set an exception listener to listen for asynchronously generated exceptions
190 *
191 * @param listener
192 */
193 public void setExceptionListener(ExceptionListener listener) {
194 this.exceptionListener = listener;
195 }
196
197 /***
198 * @return true if server side
199 */
200 public boolean isServerSide() {
201 return serverSide;
202 }
203
204 /***
205 * @param serverSide
206 */
207 public void setServerSide(boolean serverSide) {
208 this.serverSide = serverSide;
209 }
210
211 /***
212 * @return true if the transport channel is active,
213 * this value will be false through reconnecting
214 */
215 public boolean isTransportConnected(){
216 return transportConnected;
217 }
218
219 protected void setTransportConnected(boolean value){
220 transportConnected = value;
221 }
222
223
224
225 /***
226 * consume a packet from the channel
227 *
228 * @param packet
229 * @throws UnsupportedWireFormatException
230 */
231 protected void doConsumePacket(Packet packet) {
232 doConsumePacket(packet, packetListener);
233 }
234
235 protected void doConsumePacket(Packet packet, PacketListener listener) {
236 if (!doHandleReceipt(packet) && !doHandleWireFormat(packet)) {
237 if (listener != null) {
238 listener.consume(packet);
239 }
240 else {
241 log.warn("No packet listener set to receive packets");
242 }
243 }
244 }
245
246 protected boolean doHandleReceipt(Packet packet) {
247 boolean result = false;
248 if (packet != null) {
249 if (packet.isReceipt()) {
250 result = true;
251 Receipt receipt = (Receipt) packet;
252 ReceiptHolder rh = (ReceiptHolder) requestMap.remove(receipt.getCorrelationId());
253 if (rh != null) {
254 rh.setReceipt(receipt);
255 }
256 else {
257 log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId());
258 }
259 }
260 }
261 return result;
262 }
263
264 protected boolean doHandleWireFormat(Packet packet) {
265 boolean handled = false;
266 if (packet.getPacketType() == Packet.WIRE_FORMAT_INFO) {
267 handled = true;
268 WireFormatInfo info = (WireFormatInfo) packet;
269 if (!canProcessWireFormatVersion(info.getVersion())) {
270 setPendingStop(true);
271 String errorStr = "Cannot process wire format of version: " + info.getVersion();
272 TransportStatusEvent event = new TransportStatusEvent();
273 event.setChannelStatus(TransportStatusEvent.FAILED);
274 fireStatusEvent(event);
275 onAsyncException(new UnsupportedWireFormatException(errorStr));
276 stop();
277 }
278 else {
279 if (log.isDebugEnabled()) {
280 log.debug(this + " using wire format version: " + info.getVersion());
281 }
282 }
283 }
284 return handled;
285 }
286
287 /***
288 * send a Packet to the raw underlying transport This method is here to allow specific implementations to override
289 * this method
290 *
291 * @param packet
292 * @throws JMSException
293 */
294 protected void doAsyncSend(Packet packet) throws JMSException {
295 asyncSend(packet);
296 }
297
298 /***
299 * Handles an exception thrown while performing async dispatch of messages
300 *
301 * @param e
302 */
303 protected void onAsyncException(JMSException e) {
304 if (exceptionListener != null) {
305 transportConnected = false;
306 exceptionListener.onException(e);
307 }
308 else {
309 log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e);
310 }
311 }
312
313 /***
314 * Fire status event to any status event listeners
315 *
316 * @param remoteURI
317 * @param status
318 */
319 protected void fireStatusEvent(URI remoteURI, int status) {
320 TransportStatusEvent event = new TransportStatusEvent();
321 event.setChannelStatus(status);
322 event.setRemoteURI(remoteURI);
323 fireStatusEvent(event);
324 }
325
326 /***
327 * Fire status event to any status event listeners
328 *
329 * @param event
330 */
331 protected void fireStatusEvent(TransportStatusEvent event) {
332 if (event != null) {
333 for (Iterator i = listeners.iterator();i.hasNext();) {
334 TransportStatusEventListener l = (TransportStatusEventListener) i.next();
335 l.statusChanged(event);
336 }
337 }
338 }
339
340 /***
341 * A helper method to stop the execution of an executor
342 *
343 * @param executor the executor or null if one is not created yet
344 * @throws InterruptedException
345 * @throws JMSException
346 */
347 protected void stopExecutor(Executor executor) throws InterruptedException, JMSException {
348 ExecutorHelper.stopExecutor(executor);
349 }
350 }