1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.transport.reliable;
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.TimeoutExpiredException;
23 import org.codehaus.activemq.UnsupportedWireFormatException;
24 import org.codehaus.activemq.message.Packet;
25 import org.codehaus.activemq.message.PacketListener;
26 import org.codehaus.activemq.message.Receipt;
27 import org.codehaus.activemq.message.WireFormat;
28 import org.codehaus.activemq.transport.TransportChannel;
29 import org.codehaus.activemq.transport.TransportStatusEvent;
30 import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
31 import javax.jms.ExceptionListener;
32 import javax.jms.JMSException;
33 import java.net.URI;
34 import java.util.LinkedList;
35 import java.util.List;
36
37 /***
38 * A Compsite implementation of a TransportChannel
39 *
40 * @version $Revision: 1.11 $
41 */
42 public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
43 private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
44 private Object lock = new Object();
45 private LinkedList packetList = new LinkedList();
46 private boolean cacheMessagesForFailover;
47
48 /***
49 * Construct this transport
50 *
51 * @param wireFormat
52 */
53 public ReliableTransportChannel(WireFormat wireFormat) {
54 super(wireFormat);
55 }
56
57 /***
58 * Construct this transport
59 *
60 * @param wireFormat
61 * @param uris
62 */
63 public ReliableTransportChannel(WireFormat wireFormat, List uris) {
64 super(wireFormat, uris);
65 }
66
67 /***
68 * @return pretty print for this
69 */
70 public String toString() {
71 return "ReliableTransportChannel: " + channel;
72 }
73
74 /***
75 * start the connection
76 *
77 * @throws JMSException
78 */
79 public void start() throws JMSException {
80 if (started.commit(false, true)) {
81 if (channel != null) {
82 channel.start();
83 }
84 }
85 }
86
87 /***
88 * @param packet
89 * @param timeout
90 * @return receipt - or null
91 * @throws JMSException
92 */
93 public Receipt send(Packet packet, int timeout) throws JMSException {
94 do {
95 TransportChannel tc = getEstablishedChannel(timeout);
96 if (tc != null) {
97 try {
98 return tc.send(packet, timeout);
99 }
100 catch (TimeoutExpiredException e) {
101 throw e;
102 }
103 catch (UnsupportedWireFormatException uwf) {
104 throw uwf;
105 }
106 catch (JMSException jmsEx) {
107 if (isPendingStop()) {
108 break;
109 }
110 doReconnect(tc, timeout);
111 }
112 }
113 }
114 while (!closed.get() && !isPendingStop());
115 return null;
116 }
117
118 /***
119 * @param packet
120 * @throws JMSException
121 */
122 public void asyncSend(Packet packet) throws JMSException {
123 long timeout = getEstablishConnectionTimeout();
124 do {
125 TransportChannel tc = getEstablishedChannel(timeout);
126 if (tc != null) {
127 try {
128 tc.asyncSend(packet);
129 break;
130 }
131 catch (TimeoutExpiredException e) {
132 throw e;
133 }
134 catch (UnsupportedWireFormatException uwf) {
135 throw uwf;
136 }
137 catch (JMSException jmsEx) {
138 if (isPendingStop()) {
139 break;
140 }
141 doReconnect(tc, timeout);
142 }
143 }
144 }
145 while (!closed.get() && !isPendingStop());
146 }
147
148 protected void configureChannel() {
149 channel.setPacketListener(this);
150 channel.setExceptionListener(this);
151 }
152
153 protected URI extractURI(List list) throws JMSException {
154 int idx = 0;
155 if (list.size() > 1) {
156 SMLCGRandom rand = new SMLCGRandom();
157 do {
158 idx = (int) (rand.nextDouble() * list.size());
159 }
160 while (idx < 0 || idx >= list.size());
161 }
162 Object answer = list.remove(idx);
163 if (answer instanceof URI) {
164 return (URI) answer;
165 }
166 else {
167 log.error("#### got: " + answer + " of type: " + answer.getClass());
168 return null;
169 }
170 }
171
172 /***
173 * consume a packet from the enbedded channel
174 *
175 * @param packet to consume
176 */
177 public void consume(Packet packet) {
178
179
180 PacketListener listener = getPacketListener();
181 if (listener != null) {
182 listener.consume(packet);
183 }
184 }
185
186 /***
187 * handle exception from the embedded channel
188 *
189 * @param jmsEx
190 */
191 public void onException(JMSException jmsEx) {
192 TransportChannel tc = this.channel;
193 if (jmsEx instanceof UnsupportedWireFormatException) {
194 fireException(jmsEx);
195 }
196 else {
197 try {
198 doReconnect(tc, getEstablishConnectionTimeout());
199 }
200 catch (JMSException ex) {
201 ex.setLinkedException(jmsEx);
202 fireException(ex);
203 }
204 }
205 }
206
207 /***
208 * stop this channel
209 */
210 public void stop() {
211 super.stop();
212 fireStatusEvent(super.currentURI, TransportStatusEvent.STOPPED);
213 }
214
215 /***
216 * Fire a JMSException to the exception listener
217 *
218 * @param jmsEx
219 */
220 protected void fireException(JMSException jmsEx) {
221 ExceptionListener listener = getExceptionListener();
222 if (listener != null) {
223 listener.onException(jmsEx);
224 }
225 }
226
227 protected TransportChannel getEstablishedChannel(long timeout) throws JMSException {
228 if (!closed.get() && this.channel == null && !isPendingStop()) {
229 establishConnection(timeout);
230 }
231 return this.channel;
232 }
233
234 protected void doReconnect(TransportChannel currentChannel, long timeout) throws JMSException {
235 setTransportConnected(false);
236 if (!closed.get() && !isPendingStop()) {
237 synchronized (lock) {
238
239
240 if (this.channel == currentChannel) {
241 fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
242 try {
243 establishConnection(timeout);
244 }
245 catch (JMSException jmsEx) {
246 fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
247 throw jmsEx;
248 }
249 setTransportConnected(true);
250 fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
251 }
252 }
253 }
254 }
255 }