View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.codehaus.activemq.transport.tcp;
20  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21  import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
22  import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
23  import EDU.oswego.cs.dl.util.concurrent.Executor;
24  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
25  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.message.Packet;
29  import org.codehaus.activemq.message.WireFormat;
30  import org.codehaus.activemq.message.WireFormatInfo;
31  import org.codehaus.activemq.transport.TransportChannelSupport;
32  import org.codehaus.activemq.util.JMSExceptionHelper;
33  import javax.jms.JMSException;
34  import java.io.BufferedInputStream;
35  import java.io.DataInputStream;
36  import java.io.DataOutputStream;
37  import java.io.IOException;
38  import java.io.InterruptedIOException;
39  import java.net.InetAddress;
40  import java.net.Socket;
41  import java.net.SocketTimeoutException;
42  import java.net.URI;
43  import java.net.UnknownHostException;
44  
45  /***
46   * A tcp implementation of a TransportChannel
47   * 
48   * @version $Revision: 1.53 $
49   */
50  public class TcpTransportChannel extends TransportChannelSupport implements Runnable {
51      private static final int SOCKET_BUFFER_SIZE = 64 * 1024;
52      private static final Log log = LogFactory.getLog(TcpTransportChannel.class);
53      protected Socket socket;
54      private WireFormat wireFormat;
55      private DataOutputStream dataOut;
56      private DataInputStream dataIn;
57      private SynchronizedBoolean closed;
58      private SynchronizedBoolean started;
59      private Object outboundLock;
60      private Executor executor;
61      private Thread thread;
62      private boolean useAsyncSend = false;
63      private boolean changeTimeout = false;
64      private int soTimeout = 5000;
65      private BoundedChannel exceptionsList;
66  
67      /***
68       * Construct basic helpers
69       */
70      protected TcpTransportChannel(WireFormat wireFormat) {
71          this.wireFormat = wireFormat;
72          closed = new SynchronizedBoolean(false);
73          started = new SynchronizedBoolean(false);
74          // there's not much point logging all exceptions, lets just keep a few around
75          exceptionsList = new BoundedLinkedQueue(10);
76          outboundLock = new Object();
77          if (useAsyncSend) {
78              executor = new PooledExecutor(new BoundedBuffer(1000), 1);
79          }
80      }
81  
82      /***
83       * Connect to a remote Node - e.g. a Broker
84       * 
85       * @param remoteLocation
86       * @throws JMSException
87       */
88      public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
89          this(wireFormat);
90          try {
91              this.socket = createSocket(remoteLocation);
92              initialiseSocket();
93          }
94          catch (Exception ioe) {
95              throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
96          }
97      }
98  
99      /***
100      * Connect to a remote Node - e.g. a Broker
101      * 
102      * @param remoteLocation
103      * @param localLocation - e.g. local InetAddress and local port
104      * @throws JMSException
105      */
106     public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
107         this(wireFormat);
108         try {
109             this.socket = createSocket(remoteLocation, localLocation);
110             initialiseSocket();
111         }
112         catch (Exception ioe) {
113             throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
114         }
115     }
116 
117     /***
118      * @param socket
119      * @throws JMSException
120      */
121     public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
122         this(wireFormat);
123         this.socket = socket;
124         this.executor = executor;
125         setServerSide(true);
126         try {
127             initialiseSocket();
128         }
129         catch (IOException ioe) {
130             throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
131         }
132     }
133 
134     /***
135      * start listeneing for events
136      * 
137      * @throws JMSException if an error occurs
138      */
139     public void start() throws JMSException {
140         if (started.commit(false, true)) {
141             thread = new Thread(this, toString());
142             if (isServerSide()) {
143                 thread.setDaemon(true);
144             }
145             else {
146                 thread.setPriority(Thread.NORM_PRIORITY + 2);
147             }
148             thread.start();
149             //send the wire format
150             if (isServerSide()) {
151                 WireFormatInfo info = new WireFormatInfo();
152                 info.setVersion(getCurrentWireFormatVersion());
153                 asyncSend(info);
154             }
155         }
156     }
157 
158     /***
159      * close the channel
160      */
161     public void stop() {
162         if (closed.commit(false, true)) {
163             super.stop();
164             try {
165                 stopExecutor(executor);
166                 dataOut.close();
167                 dataIn.close();
168                 socket.close();
169                 // lets wait for the receive thread to terminate
170                 // TODO thread.join();
171             }
172             catch (Exception e) {
173                 log.warn("Caught while closing: " + e + ". Now Closed", e);
174             }
175         }
176     }
177 
178     /***
179      * Asynchronously send a Packet
180      * 
181      * @param packet
182      * @throws JMSException
183      */
184     public void asyncSend(final Packet packet) throws JMSException {
185         if (executor != null) {
186             try {
187                 executor.execute(new Runnable() {
188                     public void run() {
189                         try {
190                             if (!closed.get()) {
191                                 doAsyncSend(packet);
192                             }
193                         }
194                         catch (JMSException e) {
195                             try {
196                                 exceptionsList.put(e);
197                             }
198                             catch (InterruptedException e1) {
199                                 log.warn("Failed to add element to exception list: " + e1);
200                             }
201                         }
202                     }
203                 });
204             }
205             catch (InterruptedException e) {
206                 log.info("Caught: " + e, e);
207             }
208             try {
209                 JMSException e = (JMSException) exceptionsList.poll(0);
210                 if (e != null) {
211                     throw e;
212                 }
213             }
214             catch (InterruptedException e1) {
215                 log.warn("Failed to remove element to exception list: " + e1);
216             }
217         }
218         else {
219             doAsyncSend(packet);
220         }
221     }
222 
223     /***
224      * @return false
225      */
226     public boolean isMulticast() {
227         return false;
228     }
229 
230     /***
231      * reads packets from a Socket
232      */
233     public void run() {
234         log.trace("TCP consumer thread starting");
235         int count = 0;
236         while (!closed.get()) {
237             if (isServerSide() && ++count > 500) {
238                 count = 0;
239                 Thread.yield();
240             }
241             int type = 0;
242             try {
243                 if (changeTimeout) {
244                     socket.setSoTimeout(soTimeout);
245                 }
246                 while ((type = dataIn.read()) == 0) {
247                 }
248                 if (type == -1) {
249                     log.info("The socket peer is now closed");
250                     onAsyncException(new JMSException("Socket peer is now closed"));
251                     stop();
252                 }
253                 else {
254                     if (changeTimeout) {
255                         socket.setSoTimeout(0);
256                     }
257                     Packet packet = wireFormat.readPacket(type, dataIn);
258                     if (packet != null) {
259                         doConsumePacket(packet);
260                     }
261                 }
262             }
263             catch (SocketTimeoutException e) {
264                 //onAsyncException(JMSExceptionHelper.newJMSException(e));
265             }
266             catch (InterruptedIOException e) {
267                 // TODO confirm that this really is a bug in the AS/400 JVM
268                 // Patch for AS/400 JVM
269                 // lets ignore these exceptions
270                 // as they typically just indicate the thread was interupted
271                 // while waiting for input, not that the socket is in error
272                 //onAsyncException(JMSExceptionHelper.newJMSException(e));
273             }
274             catch (IOException e) {
275                 doClose(e);
276             }
277         }
278     }
279 
280     /***
281      * pretty print for object
282      * 
283      * @return String representation of this object
284      */
285     public String toString() {
286         return "TcpTransportChannel: " + socket;
287     }
288 
289     public Socket getSocket() {
290         return socket;
291     }
292 
293     /***
294      * Can this wireformat process packets of this version
295      * 
296      * @param version the version number to test
297      * @return true if can accept the version
298      */
299     public boolean canProcessWireFormatVersion(int version) {
300         return wireFormat.canProcessWireFormatVersion(version);
301     }
302 
303     /***
304      * @return the current version of this wire format
305      */
306     public int getCurrentWireFormatVersion() {
307         return wireFormat.getCurrentWireFormatVersion();
308     }
309 
310     // Properties
311     //-------------------------------------------------------------------------
312     public boolean isChangeTimeout() {
313         return changeTimeout;
314     }
315 
316     public void setChangeTimeout(boolean changeTimeout) {
317         this.changeTimeout = changeTimeout;
318     }
319 
320     public boolean isUseAsyncSend() {
321         return useAsyncSend;
322     }
323 
324     public void setUseAsyncSend(boolean useAsyncSend) {
325         this.useAsyncSend = useAsyncSend;
326     }
327 
328     public int getSoTimeout() {
329         return soTimeout;
330     }
331 
332     public void setSoTimeout(int soTimeout) {
333         this.soTimeout = soTimeout;
334         this.changeTimeout = true;
335     }
336 
337     // Implementation methods
338     //-------------------------------------------------------------------------
339     /***
340      * Actually performs the async send of a packet
341      * 
342      * @param packet
343      * @throws JMSException
344      */
345     protected void doAsyncSend(Packet packet) throws JMSException {
346         try {
347             synchronized (outboundLock) {
348                 wireFormat.writePacket(packet, dataOut);
349                 dataOut.flush();
350             }
351         }
352         catch (IOException e) {
353             if (closed.get()) {
354                 log.trace("Caught exception while closed: " + e, e);
355             }
356             else {
357                 throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
358             }
359         }
360         catch (JMSException e) {
361             if (closed.get()) {
362                 log.trace("Caught exception while closed: " + e, e);
363             }
364             else {
365                 throw e;
366             }
367         }
368     }
369 
370     private void doClose(Exception ex) {
371         if (!closed.get()) {
372             setPendingStop(true);
373             onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
374             stop();
375         }
376     }
377 
378     /***
379      * Configures the socket for use
380      * 
381      * @throws IOException
382      */
383     protected void initialiseSocket() throws IOException {
384         socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
385         socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
386         socket.setSoTimeout(soTimeout);
387         BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream());
388         this.dataIn = new DataInputStream(buffIn);
389         TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream());
390         this.dataOut = new DataOutputStream(buffOut);
391     }
392 
393     /***
394      * Factory method to create a new socket
395      * 
396      * @param remoteLocation the URI to connect to
397      * @return the newly created socket
398      * @throws UnknownHostException
399      * @throws IOException
400      */
401     protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException {
402         return new Socket(remoteLocation.getHost(), remoteLocation.getPort());
403     }
404 
405     /***
406      * Factory method to create a new socket
407      * 
408      * @param remoteLocation
409      * @param localLocation
410      * @return @throws IOException
411      * @throws IOException
412      * @throws UnknownHostException
413      */
414     protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
415         return new Socket(remoteLocation.getHost(), remoteLocation.getPort(), InetAddress.getByName(localLocation
416                 .getHost()), localLocation.getPort());
417     }
418 }