View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport.jrms;
19  
20  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21  import com.sun.multicast.reliable.RMException;
22  import com.sun.multicast.reliable.transport.RMPacketSocket;
23  import com.sun.multicast.reliable.transport.SessionDoneException;
24  import com.sun.multicast.reliable.transport.TransportProfile;
25  import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.message.Packet;
29  import org.codehaus.activemq.message.WireFormat;
30  import org.codehaus.activemq.transport.TransportChannelSupport;
31  import org.codehaus.activemq.util.IdGenerator;
32  
33  import javax.jms.JMSException;
34  import java.io.IOException;
35  import java.net.DatagramPacket;
36  import java.net.InetAddress;
37  import java.net.URI;
38  
39  /***
40   * A JRMS implementation of a TransportChannel
41   *
42   * @version $Revision: 1.20 $
43   */
44  public class JRMSTransportChannel extends TransportChannelSupport implements Runnable {
45  
46      private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
47      private static final Log log = LogFactory.getLog(JRMSTransportChannel.class);
48  
49      private WireFormat wireFormat;
50      private SynchronizedBoolean closed;
51      private SynchronizedBoolean started;
52      private Thread thread; //need to change this - and use a thread pool
53      // need to see our own messages
54      private RMPacketSocket socket;
55      private IdGenerator idGenerator;
56      private String channelId;
57      private int port;
58      private InetAddress inetAddress;
59      private Object lock;
60  
61      /***
62       * Construct basic helpers
63       */
64      protected JRMSTransportChannel(WireFormat wireFormat) {
65          this.wireFormat = wireFormat;
66          idGenerator = new IdGenerator();
67          channelId = idGenerator.generateId();
68          closed = new SynchronizedBoolean(false);
69          started = new SynchronizedBoolean(false);
70          lock = new Object();
71      }
72  
73      /***
74       * Connect to a remote Node - e.g. a Broker
75       *
76       * @param remoteLocation
77       * @throws JMSException
78       */
79      public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
80          this(wireFormat);
81          try {
82              this.port = remoteLocation.getPort();
83              this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
84              LRMPTransportProfile profile = new LRMPTransportProfile(inetAddress, port);
85              profile.setTTL((byte) 1);
86              profile.setOrdered(true);
87              this.socket = profile.createRMPacketSocket(TransportProfile.SEND_RECEIVE);
88          }
89          catch (Exception ioe) {
90              ioe.printStackTrace();
91              JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe);
92              jmsEx.setLinkedException(ioe);
93              throw jmsEx;
94          }
95      }
96  
97      /***
98       * close the channel
99       */
100     public void stop() {
101         if (closed.commit(false, true)) {
102             super.stop();
103             try {
104                 socket.close();
105             }
106             catch (Exception e) {
107                 log.trace(toString() + " now closed");
108             }
109         }
110     }
111 
112     /***
113      * start listeneing for events
114      *
115      * @throws JMSException if an error occurs
116      */
117     public void start() throws JMSException {
118         if (started.commit(false, true)) {
119             thread = new Thread(this, toString());
120             if (isServerSide()) {
121                 thread.setDaemon(true);
122             }
123             thread.start();
124         }
125     }
126 
127     /***
128      * Asynchronously send a Packet
129      *
130      * @param packet
131      * @throws JMSException
132      */
133     public void asyncSend(Packet packet) throws JMSException {
134         try {
135             DatagramPacket dpacket = createDatagramPacket(packet);
136 
137             // lets sync to avoid concurrent writes
138             //synchronized (lock) {
139             socket.send(dpacket);
140             //}
141         }
142         catch (RMException rme) {
143             JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage());
144             jmsEx.setLinkedException(rme);
145             throw jmsEx;
146         }
147         catch (IOException e) {
148             JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage());
149             jmsEx.setLinkedException(e);
150             throw jmsEx;
151         }
152     }
153 
154 
155     public boolean isMulticast() {
156         return true;
157     }
158 
159     /***
160      * reads packets from a Socket
161      */
162     public void run() {
163         try {
164             while (!closed.get()) {
165                 DatagramPacket dpacket = socket.receive();
166                 Packet packet = wireFormat.readPacket(channelId, dpacket);
167                 if (packet != null) {
168                     doConsumePacket(packet);
169                 }
170             }
171             log.trace("The socket peer is now closed");
172             //doClose(new IOException("Socket peer is now closed"));
173             stop();
174         }
175         catch (SessionDoneException e) {
176             // this isn't really an exception, it just indicates
177             // that the socket has closed normally
178             log.trace("Session completed", e);
179             stop();
180         }
181         catch (RMException ste) {
182             doClose(ste);
183         }
184         catch (IOException e) {
185             doClose(e);
186         }
187     }
188     
189     /***
190      * Can this wireformat process packets of this version
191      * @param version the version number to test
192      * @return true if can accept the version
193      */
194     public boolean canProcessWireFormatVersion(int version){
195         return wireFormat.canProcessWireFormatVersion(version);
196     }
197     
198     /***
199      * @return the current version of this wire format
200      */
201     public int getCurrentWireFormatVersion(){
202         return wireFormat.getCurrentWireFormatVersion();
203     }
204 
205     protected DatagramPacket createDatagramPacket() {
206         DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
207         answer.setPort(port);
208         answer.setAddress(inetAddress);
209         return answer;
210     }
211 
212     protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
213         DatagramPacket answer = wireFormat.writePacket(channelId, packet);
214         answer.setPort(port);
215         answer.setAddress(inetAddress);
216         return answer;
217     }
218 
219     private void doClose(Exception ex) {
220         if (!closed.get()) {
221             JMSException jmsEx = new JMSException("Error reading socket: " + ex);
222             jmsEx.setLinkedException(ex);
223             onAsyncException(jmsEx);
224             stop();
225         }
226     }
227 
228     /***
229      * pretty print for object
230      *
231      * @return String representation of this object
232      */
233     public String toString() {
234         return "JRMSTransportChannel: " + socket;
235     }
236 }