View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport.reliable;
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.TimeoutExpiredException;
23  import org.codehaus.activemq.UnsupportedWireFormatException;
24  import org.codehaus.activemq.message.Packet;
25  import org.codehaus.activemq.message.PacketListener;
26  import org.codehaus.activemq.message.Receipt;
27  import org.codehaus.activemq.message.WireFormat;
28  import org.codehaus.activemq.transport.TransportChannel;
29  import org.codehaus.activemq.transport.TransportStatusEvent;
30  import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
31  import javax.jms.ExceptionListener;
32  import javax.jms.JMSException;
33  import java.net.URI;
34  import java.util.LinkedList;
35  import java.util.List;
36  
37  /***
38   * A Compsite implementation of a TransportChannel
39   * 
40   * @version $Revision: 1.11 $
41   */
42  public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
43      private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
44      private Object lock = new Object();
45      private LinkedList packetList = new LinkedList();
46      private boolean cacheMessagesForFailover;
47  
48      /***
49       * Construct this transport
50       * 
51       * @param wireFormat
52       */
53      public ReliableTransportChannel(WireFormat wireFormat) {
54          super(wireFormat);
55      }
56  
57      /***
58       * Construct this transport
59       * 
60       * @param wireFormat
61       * @param uris
62       */
63      public ReliableTransportChannel(WireFormat wireFormat, List uris) {
64          super(wireFormat, uris);
65      }
66  
67      /***
68       * @return pretty print for this
69       */
70      public String toString() {
71          return "ReliableTransportChannel: " + channel;
72      }
73  
74      /***
75       * start the connection
76       * 
77       * @throws JMSException
78       */
79      public void start() throws JMSException {
80          if (started.commit(false, true)) {
81              if (channel != null) {
82                  channel.start();
83              }
84          }
85      }
86  
87      /***
88       * @param packet
89       * @param timeout
90       * @return receipt - or null
91       * @throws JMSException
92       */
93      public Receipt send(Packet packet, int timeout) throws JMSException {
94          do {
95              TransportChannel tc = getEstablishedChannel(timeout);
96              if (tc != null) {
97                  try {
98                      return tc.send(packet, timeout);
99                  }
100                 catch (TimeoutExpiredException e) {
101                     throw e;
102                 }
103                 catch (UnsupportedWireFormatException uwf) {
104                     throw uwf;
105                 }
106                 catch (JMSException jmsEx) {
107                     if (isPendingStop()) {
108                         break;
109                     }
110                     doReconnect(tc, timeout);
111                 }
112             }
113         }
114         while (!closed.get() && !isPendingStop());
115         return null;
116     }
117 
118     /***
119      * @param packet
120      * @throws JMSException
121      */
122     public void asyncSend(Packet packet) throws JMSException {
123         long timeout = getEstablishConnectionTimeout();
124         do {
125             TransportChannel tc = getEstablishedChannel(timeout);
126             if (tc != null) {
127                 try {
128                     tc.asyncSend(packet);
129                     break;
130                 }
131                 catch (TimeoutExpiredException e) {
132                     throw e;
133                 }
134                 catch (UnsupportedWireFormatException uwf) {
135                     throw uwf;
136                 }
137                 catch (JMSException jmsEx) {
138                     if (isPendingStop()) {
139                         break;
140                     }
141                     doReconnect(tc, timeout);
142                 }
143             }
144         }
145         while (!closed.get() && !isPendingStop());
146     }
147 
148     protected void configureChannel() {
149         channel.setPacketListener(this);
150         channel.setExceptionListener(this);
151     }
152 
153     protected URI extractURI(List list) throws JMSException {
154         int idx = 0;
155         if (list.size() > 1) {
156             SMLCGRandom rand = new SMLCGRandom();
157             do {
158                 idx = (int) (rand.nextDouble() * list.size());
159             }
160             while (idx < 0 || idx >= list.size());
161         }
162         Object answer = list.remove(idx);
163         if (answer instanceof URI) {
164             return (URI) answer;
165         }
166         else {
167             log.error("#### got: " + answer + " of type: " + answer.getClass());
168             return null;
169         }
170     }
171 
172     /***
173      * consume a packet from the enbedded channel
174      * 
175      * @param packet to consume
176      */
177     public void consume(Packet packet) {
178         //do processing
179         //avoid a lock
180         PacketListener listener = getPacketListener();
181         if (listener != null) {
182             listener.consume(packet);
183         }
184     }
185 
186     /***
187      * handle exception from the embedded channel
188      * 
189      * @param jmsEx
190      */
191     public void onException(JMSException jmsEx) {
192         TransportChannel tc = this.channel;
193         if (jmsEx instanceof UnsupportedWireFormatException) {
194             fireException(jmsEx);
195         }
196         else {
197             try {
198                 doReconnect(tc, getEstablishConnectionTimeout());
199             }
200             catch (JMSException ex) {
201                 ex.setLinkedException(jmsEx);
202                 fireException(ex);
203             }
204         }
205     }
206 
207     /***
208      * stop this channel
209      */
210     public void stop() {
211         super.stop();
212         fireStatusEvent(super.currentURI, TransportStatusEvent.STOPPED);
213     }
214 
215     /***
216      * Fire a JMSException to the exception listener
217      * 
218      * @param jmsEx
219      */
220     protected void fireException(JMSException jmsEx) {
221         ExceptionListener listener = getExceptionListener();
222         if (listener != null) {
223             listener.onException(jmsEx);
224         }
225     }
226 
227     protected TransportChannel getEstablishedChannel(long timeout) throws JMSException {
228         if (!closed.get() && this.channel == null && !isPendingStop()) {
229             establishConnection(timeout);
230         }
231         return this.channel;
232     }
233 
234     protected void doReconnect(TransportChannel currentChannel, long timeout) throws JMSException {
235         setTransportConnected(false);
236         if (!closed.get() && !isPendingStop()) {
237             synchronized (lock) {
238                 //Loss of connectivity can be signalled from more than one
239                 //thread - hence the check here - we want to avoid doing it more than once
240                 if (this.channel == currentChannel) {
241                     fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
242                     try {
243                         establishConnection(timeout);
244                     }
245                     catch (JMSException jmsEx) {
246                         fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
247                         throw jmsEx;
248                     }
249                     setTransportConnected(true);
250                     fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
251                 }
252             }
253         }
254     }
255 }