View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport;
20  import java.net.URI;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.Map;
24  import javax.jms.ExceptionListener;
25  import javax.jms.JMSException;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.UnsupportedWireFormatException;
29  import org.codehaus.activemq.message.Packet;
30  import org.codehaus.activemq.message.PacketListener;
31  import org.codehaus.activemq.message.Receipt;
32  import org.codehaus.activemq.message.ReceiptHolder;
33  import org.codehaus.activemq.message.WireFormatInfo;
34  import org.codehaus.activemq.util.ExecutorHelper;
35  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
36  import EDU.oswego.cs.dl.util.concurrent.Executor;
37  
38  /***
39   * Some basic functionality, common across most transport implementations of channels
40   * 
41   * @version $Revision: 1.5 $
42   */
43  public abstract class TransportChannelSupport implements TransportChannel {
44      private static final Log log = LogFactory.getLog(TransportChannelSupport.class);
45      private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
46      private HashMap requestMap = new HashMap();
47      private PacketListener packetListener;
48      private ExceptionListener exceptionListener;
49      private String clientID;
50      private TransportChannelListener transportChannelListener;
51      private boolean serverSide;
52      protected boolean pendingStop = false;
53      protected boolean transportConnected = true;
54  
55      /***
56       * Give the TransportChannel a hint it's about to stop
57       * 
58       * @param pendingStop
59       */
60      public void setPendingStop(boolean pendingStop) {
61          this.pendingStop = pendingStop;
62      }
63  
64      /***
65       * @return true if the channel is about to stop
66       */
67      public boolean isPendingStop() {
68          return pendingStop;
69      }
70  
71      /***
72       * close the channel
73       */
74      public void stop() {
75          transportConnected = false;
76          Map map = (Map) this.requestMap.clone();
77          for (Iterator i = map.values().iterator();i.hasNext();) {
78              ReceiptHolder rh = (ReceiptHolder) i.next();
79              rh.close();
80          }
81          map.clear();
82          requestMap.clear();
83          if (transportChannelListener != null) {
84              transportChannelListener.removeClient(this);
85          }
86          exceptionListener = null;
87          packetListener = null;
88      }
89  
90      /***
91       * synchronously send a Packet
92       * 
93       * @param packet
94       * @return a Receipt
95       * @throws JMSException
96       */
97      public Receipt send(Packet packet) throws JMSException {
98          return send(packet, 0);
99      }
100 
101     /***
102      * Synchronously send a Packet
103      * 
104      * @param packet packet to send
105      * @param timeout amount of time to wait for a receipt
106      * @return the Receipt
107      * @throws JMSException
108      */
109     public Receipt send(Packet packet, int timeout) throws JMSException {
110         ReceiptHolder rh = new ReceiptHolder();
111         requestMap.put(packet.getId(), rh);
112         doAsyncSend(packet);
113         Receipt result = rh.getReceipt(timeout);
114         return result;
115     }
116 
117     // Properties
118     //-------------------------------------------------------------------------
119     /***
120      * @return the transportChannelListener
121      */
122     public TransportChannelListener getTransportChannelListener() {
123         return transportChannelListener;
124     }
125 
126     /***
127      * @param transportChannelListener
128      */
129     public void setTransportChannelListener(TransportChannelListener transportChannelListener) {
130         this.transportChannelListener = transportChannelListener;
131     }
132 
133     /***
134      * Add a listener for changes in a channels status
135      * 
136      * @param listener
137      */
138     public void addTransportStatusEventListener(TransportStatusEventListener listener) {
139         listeners.add(listener);
140     }
141 
142     /***
143      * Remove a listener for changes in a channels status
144      * 
145      * @param listener
146      */
147     public void removeTransportStatusEventListener(TransportStatusEventListener listener) {
148         listeners.remove(listener);
149     }
150 
151     /***
152      * @return the clientID
153      */
154     public String getClientID() {
155         return clientID;
156     }
157 
158     /***
159      * @param clientID set the clientID
160      */
161     public void setClientID(String clientID) {
162         this.clientID = clientID;
163     }
164 
165     /***
166      * @return the exception listener
167      */
168     public ExceptionListener getExceptionListener() {
169         return exceptionListener;
170     }
171 
172     /***
173      * @return the packet listener
174      */
175     public PacketListener getPacketListener() {
176         return packetListener;
177     }
178 
179     /***
180      * Set a listener for Packets
181      * 
182      * @param l
183      */
184     public void setPacketListener(PacketListener l) {
185         this.packetListener = l;
186     }
187 
188     /***
189      * Set an exception listener to listen for asynchronously generated exceptions
190      * 
191      * @param listener
192      */
193     public void setExceptionListener(ExceptionListener listener) {
194         this.exceptionListener = listener;
195     }
196 
197     /***
198      * @return true if server side
199      */
200     public boolean isServerSide() {
201         return serverSide;
202     }
203 
204     /***
205      * @param serverSide
206      */
207     public void setServerSide(boolean serverSide) {
208         this.serverSide = serverSide;
209     }
210     
211     /***
212      * @return true if the transport channel is active,
213      * this value will be false through reconnecting
214      */
215     public boolean isTransportConnected(){
216         return transportConnected;
217     }
218     
219     protected void setTransportConnected(boolean value){
220         transportConnected = value;
221     }
222 
223     // Implementation methods
224     //-------------------------------------------------------------------------
225     /***
226      * consume a packet from the channel
227      * 
228      * @param packet
229      * @throws UnsupportedWireFormatException
230      */
231     protected void doConsumePacket(Packet packet) {
232         doConsumePacket(packet, packetListener);
233     }
234 
235     protected void doConsumePacket(Packet packet, PacketListener listener) {
236         if (!doHandleReceipt(packet) && !doHandleWireFormat(packet)) {
237             if (listener != null) {
238                 listener.consume(packet);
239             }
240             else {
241                 log.warn("No packet listener set to receive packets");
242             }
243         }
244     }
245 
246     protected boolean doHandleReceipt(Packet packet) {
247         boolean result = false;
248         if (packet != null) {
249             if (packet.isReceipt()) {
250                 result = true;
251                 Receipt receipt = (Receipt) packet;
252                 ReceiptHolder rh = (ReceiptHolder) requestMap.remove(receipt.getCorrelationId());
253                 if (rh != null) {
254                     rh.setReceipt(receipt);
255                 }
256                 else {
257                     log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId());
258                 }
259             }
260         }
261         return result;
262     }
263 
264     protected boolean doHandleWireFormat(Packet packet) {
265         boolean handled = false;
266         if (packet.getPacketType() == Packet.WIRE_FORMAT_INFO) {
267             handled = true;
268             WireFormatInfo info = (WireFormatInfo) packet;
269             if (!canProcessWireFormatVersion(info.getVersion())) {
270                 setPendingStop(true);
271                 String errorStr = "Cannot process wire format of version: " + info.getVersion();
272                 TransportStatusEvent event = new TransportStatusEvent();
273                 event.setChannelStatus(TransportStatusEvent.FAILED);
274                 fireStatusEvent(event);
275                 onAsyncException(new UnsupportedWireFormatException(errorStr));
276                 stop();
277             }
278             else {
279                 if (log.isDebugEnabled()) {
280                     log.debug(this + " using wire format version: " + info.getVersion());
281                 }
282             }
283         }
284         return handled;
285     }
286 
287     /***
288      * send a Packet to the raw underlying transport This method is here to allow specific implementations to override
289      * this method
290      * 
291      * @param packet
292      * @throws JMSException
293      */
294     protected void doAsyncSend(Packet packet) throws JMSException {
295         asyncSend(packet);
296     }
297 
298     /***
299      * Handles an exception thrown while performing async dispatch of messages
300      * 
301      * @param e
302      */
303     protected void onAsyncException(JMSException e) {
304         if (exceptionListener != null) {
305             transportConnected = false;
306             exceptionListener.onException(e);
307         }
308         else {
309             log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e);
310         }
311     }
312 
313     /***
314      * Fire status event to any status event listeners
315      * 
316      * @param remoteURI
317      * @param status
318      */
319     protected void fireStatusEvent(URI remoteURI, int status) {
320         TransportStatusEvent event = new TransportStatusEvent();
321         event.setChannelStatus(status);
322         event.setRemoteURI(remoteURI);
323         fireStatusEvent(event);
324     }
325 
326     /***
327      * Fire status event to any status event listeners
328      * 
329      * @param event
330      */
331     protected void fireStatusEvent(TransportStatusEvent event) {
332         if (event != null) {
333             for (Iterator i = listeners.iterator();i.hasNext();) {
334                 TransportStatusEventListener l = (TransportStatusEventListener) i.next();
335                 l.statusChanged(event);
336             }
337         }
338     }
339 
340     /***
341      * A helper method to stop the execution of an executor
342      * 
343      * @param executor the executor or null if one is not created yet
344      * @throws InterruptedException
345      * @throws JMSException
346      */
347     protected void stopExecutor(Executor executor) throws InterruptedException, JMSException {
348         ExecutorHelper.stopExecutor(executor);
349     }
350 }