View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.udp;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.WireFormat;
25  import org.codehaus.activemq.transport.TransportChannelSupport;
26  import org.codehaus.activemq.util.IdGenerator;
27  
28  import javax.jms.JMSException;
29  import java.io.IOException;
30  import java.net.DatagramPacket;
31  import java.net.DatagramSocket;
32  import java.net.InetAddress;
33  import java.net.SocketTimeoutException;
34  import java.net.URI;
35  
36  /***
37   * A UDP implementation of a TransportChannel
38   *
39   * @version $Revision: 1.23 $
40   */
41  public class UdpTransportChannel extends TransportChannelSupport implements Runnable {
42  
43      private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
44      private static final int SO_TIMEOUT = 5000;
45      private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
46  
47      protected DatagramSocket socket;
48      protected int port;
49      protected InetAddress inetAddress;
50  
51      private WireFormat wireFormat;
52      private SynchronizedBoolean closed;
53      private SynchronizedBoolean started;
54      private Thread thread; //need to change this - and use a thread pool
55      private IdGenerator idGenerator = new IdGenerator();
56      private Object lock;
57  
58  
59      /***
60       * Construct basic helpers
61       */
62      protected UdpTransportChannel(WireFormat wireFormat) {
63          this.wireFormat = wireFormat;
64          closed = new SynchronizedBoolean(false);
65          started = new SynchronizedBoolean(false);
66          lock = new Object();
67      }
68  
69      public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
70          this(wireFormat, remoteLocation, remoteLocation.getPort());
71      }
72  
73      public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
74          this(wireFormat);
75          try {
76              this.port = port;
77              this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
78              this.socket = createSocket(remoteLocation.getPort());
79  
80              //log.info("Creating multicast socket on port: " + port + " on
81              // host: " + remoteLocation.getHost());
82  
83              socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
84              socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
85  
86              connect();
87              
88              // now lets update the port so that sends will go elsewhere
89          }
90          catch (Exception ioe) {
91              JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
92              jmsEx.setLinkedException(ioe);
93              throw jmsEx;
94          }
95      }
96  
97      /***
98       * @param socket
99       * @throws JMSException
100      */
101     public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException {
102         this(wireFormat);
103         this.socket = socket;
104         this.port = socket.getPort();
105         this.inetAddress = socket.getInetAddress();
106         try {
107             socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
108             socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
109         }
110         catch (IOException ioe) {
111             JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
112             jmsEx.setLinkedException(ioe);
113             throw jmsEx;
114         }
115     }
116 
117     public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
118         this(wireFormat, socket);
119         this.port = port;
120     }
121 
122     /***
123      * close the channel
124      */
125     public void stop() {
126         if (closed.commit(false, true)) {
127             super.stop();
128             try {
129                 socket.close();
130             }
131             catch (Exception e) {
132                 log.trace(toString() + " now closed");
133             }
134         }
135     }
136 
137     /***
138      * start listeneing for events
139      *
140      * @throws JMSException if an error occurs
141      */
142     public void start() throws JMSException {
143         if (started.commit(false, true)) {
144             thread = new Thread(this, toString());
145             if (isServerSide()) {
146                 thread.setDaemon(true);
147             }
148             thread.start();
149         }
150     }
151 
152 
153     /***
154      * Asynchronously send a Packet
155      *
156      * @param packet
157      * @throws JMSException
158      */
159     public void asyncSend(Packet packet) throws JMSException {
160         try {
161             if (log.isDebugEnabled()) {
162                 log.debug("Sending packet: " + packet);
163             }
164             DatagramPacket dpacket = createDatagramPacket(packet);
165 
166 
167             // lets sync to avoid concurrent writes
168             //synchronized (lock) {
169             socket.send(dpacket);
170             //}
171         }
172         catch (IOException e) {
173             JMSException jmsEx = new JMSException("asyncSend failed " + e);
174             jmsEx.setLinkedException(e);
175             throw jmsEx;
176         }
177     }
178 
179     public boolean isMulticast() {
180         return false;
181     }
182 
183     /***
184      * reads packets from a Socket
185      */
186     public void run() {
187         while (!closed.get()) {
188             try {
189                 socket.setSoTimeout(SO_TIMEOUT);
190 
191                 DatagramPacket dpacket = createDatagramPacket();
192                 while (!socket.isClosed()) {
193                     socket.setSoTimeout(0);
194                     socket.receive(dpacket);
195                     Packet packet = wireFormat.readPacket(getClientID(), dpacket);
196                     if (packet != null) {
197                         doConsumePacket(packet);
198                     }
199                 }
200 
201                 log.trace("The socket peer is now closed");
202                 doClose(new IOException("Socket peer is now closed"));
203             }
204             catch (SocketTimeoutException ste) {
205                 //continue;
206             }
207             catch (IOException e) {
208                 doClose(e);
209             }
210         }
211     }
212     
213     /***
214      * Can this wireformat process packets of this version
215      * @param version the version number to test
216      * @return true if can accept the version
217      */
218     public boolean canProcessWireFormatVersion(int version){
219         return wireFormat.canProcessWireFormatVersion(version);
220     }
221     
222     /***
223      * @return the current version of this wire format
224      */
225     public int getCurrentWireFormatVersion(){
226         return wireFormat.getCurrentWireFormatVersion();
227     }
228 
229     /***
230      * @return
231      */
232     protected DatagramPacket createDatagramPacket() {
233         DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
234         if (port >= 0) {
235             answer.setPort(port);
236         }
237         answer.setAddress(inetAddress);
238         return answer;
239     }
240 
241     protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
242         /*if (packet instanceof ActiveMQMessage) {
243             ActiveMQMessage message = (ActiveMQMessage) packet;
244             System.out.println(">>> about to send message with clientID: " + message.getJMSClientID());
245         }*/
246         DatagramPacket answer = wireFormat.writePacket(getClientID(), packet);
247         if (port >= 0) {
248             answer.setPort(port);
249         }
250         answer.setAddress(inetAddress);
251         return answer;
252     }
253 
254     private void doClose(Exception ex) {
255         if (!closed.get()) {
256             JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
257             jmsEx.setLinkedException(ex);
258             onAsyncException(jmsEx);
259             stop();
260         }
261     }
262 
263     protected void connect() throws IOException {
264         //socket.connect(inetAddress, port);
265     }
266 
267     protected DatagramSocket createSocket(int port) throws IOException {
268         return new DatagramSocket(port, inetAddress);
269     }
270 
271     /***
272      * pretty print for object
273      *
274      * @return String representation of this object
275      */
276     public String toString() {
277         return "UdpTransportChannel: " + socket;
278     }
279 }