View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   *
18   **/
19  package org.codehaus.activemq.transport.gnet;
20  
21  import EDU.oswego.cs.dl.util.concurrent.Latch;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.geronimo.network.SelectorManager;
26  import org.apache.geronimo.network.protocol.AbstractProtocol;
27  import org.apache.geronimo.network.protocol.DownPacket;
28  import org.apache.geronimo.network.protocol.PlainDownPacket;
29  import org.apache.geronimo.network.protocol.Protocol;
30  import org.apache.geronimo.network.protocol.ProtocolException;
31  import org.apache.geronimo.network.protocol.SocketProtocol;
32  import org.apache.geronimo.network.protocol.UpPacket;
33  import org.apache.geronimo.pool.ClockPool;
34  import org.apache.geronimo.pool.ThreadPool;
35  import org.codehaus.activemq.message.Packet;
36  import org.codehaus.activemq.message.WireFormat;
37  import org.codehaus.activemq.transport.TransportChannelSupport;
38  
39  import javax.jms.JMSException;
40  import java.io.ByteArrayOutputStream;
41  import java.io.DataInputStream;
42  import java.io.DataOutputStream;
43  import java.io.IOException;
44  import java.io.InputStream;
45  import java.net.InetAddress;
46  import java.net.InetSocketAddress;
47  import java.net.URI;
48  import java.net.UnknownHostException;
49  import java.nio.ByteBuffer;
50  import java.util.ArrayList;
51  
52  /***
53   * An implementation of a TransportChannel which uses the Geronimo network layer
54   * for connectivity.
55   *
56   * @version $Revision: 1.16 $
57   */
58  public class GTransportChannel extends TransportChannelSupport {
59      private static final Log log = LogFactory.getLog(GTransportChannel.class);
60  
61      private SynchronizedBoolean closed;
62      private SynchronizedBoolean started;
63      private Protocol protocol;
64      private Latch dispatchLatch;
65      private ThreadPool threadPool;
66      private WireFormat wireFormat;
67  
68      /***
69       * Construct basic helpers
70       */
71      protected GTransportChannel(WireFormat wireFormat, ThreadPool tp) {
72          this.wireFormat = wireFormat;
73          closed = new SynchronizedBoolean(false);
74          started = new SynchronizedBoolean(false);
75          dispatchLatch = new Latch();
76          threadPool = tp;
77      }
78  
79      /***
80       * @param protocol
81       */
82      public GTransportChannel(WireFormat wireFormat, Protocol protocol, ThreadPool tp) {
83          this(wireFormat, tp);
84          init(protocol);
85      }
86  
87      /***
88       * @param remoteLocation
89       * @param localLocation
90       */
91      public GTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation,
92                               SelectorManager sm, ThreadPool tp, ClockPool cp)
93              throws UnknownHostException, ProtocolException {
94          this(wireFormat, tp);
95  
96  /*
97  		ControlClientProtocolStack clientStack = new ControlClientProtocolStack();
98  		clientStack.setClassLoader(Thread.currentThread()
99  				.getContextClassLoader());
100 		clientStack.setThreadPool(tp);
101 		clientStack.setClockPool(cp);
102 		clientStack.setSelectorManager(sm);
103 */
104         SocketProtocol sp = new SocketProtocol();
105         sp.setTimeout(1000 * 30);
106         if (localLocation != null) {
107             sp.setInterface(new InetSocketAddress(InetAddress
108                     .getByName(localLocation.getHost()), localLocation
109                     .getPort()));
110         }
111         sp.setAddress(new InetSocketAddress(InetAddress
112                 .getByName(remoteLocation.getHost()), remoteLocation
113                 .getPort()));
114         sp.setSelectorManager(sm);
115 /*
116 		clientStack.push(sp);
117 		ControlClientProtocol ccp = new ControlClientProtocol();
118 		ccp.setTimeout(1000 * 30);
119 		clientStack.push(ccp);
120 		clientStack.setup();
121 */
122 //		init(clientStack);
123         init(sp);
124         sp.setup();
125     }
126 
127     /***
128      * @param protocol
129      */
130     private void init(Protocol protocol) {
131         this.protocol = protocol;
132         // Hookup a new Up protocol so we can get the up stream packets.
133         protocol.setUpProtocol(new AbstractProtocol() {
134             public void setup() {
135             }
136 
137             public void drain() {
138             }
139 
140             public void teardown() {
141             }
142 
143             public void sendUp(final UpPacket p) {
144                 try {
145                     log.trace("AQUIRING: " + dispatchLatch);
146                     dispatchLatch.acquire();
147                     log.trace("AQUIRED: " + dispatchLatch);
148 
149                     dispatch(p);
150                 }
151                 catch (InterruptedException e) {
152                     log.warn("Caught exception dispatching packet: " + p + ". Reason: "
153                             + e, e);
154                     // TODO: notify exception listner and close the connection.
155                 }
156             }
157 
158             public void sendDown(DownPacket p) throws ProtocolException {
159                 getDownProtocol().sendDown(p);
160             }
161 
162             public void flush() throws ProtocolException {
163                 getDownProtocol().flush();
164             }
165         });
166     }
167 
168     private void dispatch(UpPacket p) {
169         try {
170             // Dont dispatch messages until the channel is started..
171             Packet packet = toPacket(p);
172             log.trace("<<<< SENDING UP <<<< " + packet);
173             if (packet != null) {
174                 doConsumePacket(packet);
175             }
176         }
177         catch (IOException e) {
178             log.warn("Caught exception dispatching packet: " + p + ". Reason: "
179                     + e, e);
180             // TODO: notify exception listner and close the connection.
181         }
182     }
183 
184     /***
185      * close the channel
186      */
187     public void stop() {
188         super.stop();
189         if (closed.commit(false, true)) {
190             try {
191                 protocol.drain();
192             }
193             catch (Exception e) {
194                 log.trace(toString() + " now closed");
195             }
196         }
197     }
198 
199     /***
200      * start listeneing for events
201      *
202      * @throws JMSException if an error occurs
203      */
204     public void start() throws JMSException {
205         if (started.commit(false, true)) {
206             // Allow messages to get dispatched.
207             dispatchLatch.release();
208         }
209     }
210 
211 
212     /***
213      * Asynchronously send a Packet
214      *
215      * @param packet
216      * @throws JMSException
217      */
218     public void asyncSend(Packet packet) throws JMSException {
219         try {
220             if (log.isTraceEnabled()) {
221                 log.trace(">>>> ASYNC SENDING DOWN >>>> " + packet);
222             }
223 
224             // lets sync for now to avoid multiple threads writing to the same socket
225             synchronized (protocol) {
226                 protocol.sendDown(toPlainDownPacket(packet));
227             }
228         }
229         catch (IOException e) {
230             System.out.println("Caught: " + e);
231             e.printStackTrace();
232             JMSException jmsEx = new JMSException("asyncSend failed "
233                     + e.getMessage());
234             jmsEx.setLinkedException(e);
235             throw jmsEx;
236         }
237         catch (ProtocolException e) {
238             System.out.println("Caught: " + e);
239             e.printStackTrace();
240             JMSException jmsEx = new JMSException("asyncSend failed "
241                     + e.getMessage());
242             jmsEx.setLinkedException(e);
243             throw jmsEx;
244         }
245     }
246 
247     public boolean isMulticast() {
248         return false;
249     }
250 
251     protected PlainDownPacket toPlainDownPacket(Packet mqpacket)
252             throws IOException, JMSException {
253 
254         ByteArrayOutputStream baos = new ByteArrayOutputStream();
255         DataOutputStream dos = new DataOutputStream(baos);
256         wireFormat.writePacket(mqpacket, dos);
257         dos.close();
258         ArrayList list = new ArrayList(1);
259         ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray());
260         buffer.limit(buffer.capacity());
261         list.add(buffer);
262         PlainDownPacket packet = new PlainDownPacket();
263         packet.setBuffers(list);
264         return packet;
265     }
266 
267     protected Packet toPacket(UpPacket packet) throws IOException {
268         final ByteBuffer buffer = packet.getBuffer();
269         InputStream is = new InputStream() {
270             public int read() {
271                 if (!buffer.hasRemaining()) {
272                     return -1;
273                 }
274                 int rc = 0xFF & buffer.get();
275                 return rc;
276             }
277 
278             public synchronized int read(byte[] bytes, int off, int len) {
279                 len = Math.min(len, buffer.remaining());
280                 buffer.get(bytes, off, len);
281                 return len;
282             }
283         };
284         DataInputStream dis = new DataInputStream(is);
285         return wireFormat.readPacket(dis);
286     }
287 
288     /***
289      * pretty print for object
290      *
291      * @return String representation of this object
292      */
293     public String toString() {
294         return "GTransportChannel: " + protocol;
295     }
296     
297     /***
298      * Can this wireformat process packets of this version
299      * @param version the version number to test
300      * @return true if can accept the version
301      */
302     public boolean canProcessWireFormatVersion(int version){
303         return wireFormat.canProcessWireFormatVersion(version);
304     }
305     
306     /***
307      * @return the current version of this wire format
308      */
309     public int getCurrentWireFormatVersion(){
310         return wireFormat.getCurrentWireFormatVersion();
311     }
312 }