1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.vm;
19
20 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
21 import EDU.oswego.cs.dl.util.concurrent.Channel;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.codehaus.activemq.broker.BrokerConnector;
26 import org.codehaus.activemq.message.Packet;
27 import org.codehaus.activemq.message.PacketListener;
28 import org.codehaus.activemq.transport.TransportChannelListener;
29 import org.codehaus.activemq.transport.TransportChannelSupport;
30
31 import javax.jms.JMSException;
32
33 /***
34 * A VM implementation of a TransportChannel
35 *
36 * @version $Revision: 1.22 $
37 */
38 public class VmTransportChannel extends TransportChannelSupport implements Runnable {
39
40 private static final Log log = LogFactory.getLog(VmTransportChannel.class);
41 private static final Object TERMINATE = new Object();
42 private static int lastThreadId = 0;
43
44
45 private Channel sendChannel;
46 private Channel receiveChannel;
47 private int sendCapacity = 10;
48 private int receiveCapacity = 10;
49 private boolean asyncSend = false;
50
51
52 private SynchronizedBoolean closed;
53 private SynchronizedBoolean started;
54 private Thread thread;
55 private PacketListener sendListener;
56 private VmTransportChannel clientSide;
57
58 public VmTransportChannel() {
59 closed = new SynchronizedBoolean(false);
60 started = new SynchronizedBoolean(false);
61 }
62
63 public VmTransportChannel(Channel sendChannel, Channel receiveChannel) {
64 this();
65 this.sendChannel = sendChannel;
66 this.receiveChannel = receiveChannel;
67 }
68
69 public VmTransportChannel(int capacity) {
70 this(new BoundedLinkedQueue(capacity), new BoundedLinkedQueue(capacity));
71 }
72
73 public void start() throws JMSException {
74 if (started.commit(false, true)) {
75 if (isAsyncSend()) {
76
77
78
79 getSendChannel();
80 getReceiveChannel();
81
82 thread = new Thread(this, "VM Transport: " + getNextThreadId());
83 if (isServerSide()) {
84 thread.setDaemon(true);
85 }
86 thread.start();
87 }
88 }
89 }
90
91 public void stop() {
92 if (closed.commit(false, true)) {
93 super.stop();
94 try {
95
96 if (sendChannel != null) {
97 sendChannel.put(TERMINATE);
98 }
99 if (receiveChannel != null) {
100 receiveChannel.put(TERMINATE);
101 }
102
103 if (thread != null) {
104
105 thread.join();
106 }
107 }
108 catch (Exception e) {
109 log.trace(toString() + " now closed with exception: " + e);
110 }
111 }
112 }
113
114 /***
115 * Asynchronously send a Packet
116 *
117 * @param packet
118 * @throws JMSException
119 */
120 public void asyncSend(Packet packet) throws JMSException {
121 if (sendChannel != null) {
122 while (true) {
123 try {
124 sendChannel.put(packet);
125 break;
126 }
127 catch (InterruptedException e) {
128
129 }
130 }
131 }
132 else {
133 if (sendListener == null) {
134 if (clientSide != null) {
135 sendListener = clientSide.createPacketListenerSender();
136 }
137 }
138 if (sendListener != null) {
139 sendListener.consume(packet);
140 }
141 else {
142 throw new JMSException("No sendListener available");
143 }
144 }
145 }
146
147
148 public boolean isMulticast() {
149 return false;
150 }
151
152 /***
153 * reads packets from a Socket
154 */
155 public void run() {
156 while (!closed.get()) {
157 try {
158 Object answer = receiveChannel.take();
159 if (answer == TERMINATE) {
160 log.trace("The socket peer is now closed");
161 stop();
162 return;
163 }
164 else if (answer != null) {
165 Packet packet = (Packet) answer;
166
167 if (closed.get()) {
168 break;
169 }
170 doConsumePacket(packet);
171 }
172 }
173 catch (InterruptedException e) {
174
175 }
176 }
177 }
178
179 /***
180 * pretty print for object
181 *
182 * @return String representation of this object
183 */
184 public String toString() {
185 return "VmTransportChannel: " + sendChannel;
186 }
187
188 /***
189 * Connects the client side transport channel with the broker
190 */
191 public void connect(BrokerConnector brokerConnector) throws JMSException {
192 TransportChannelListener listener = (TransportChannelListener) brokerConnector;
193 VmTransportChannel serverSide = createServerSide();
194 listener.addClient(serverSide);
195 serverSide.start();
196 }
197
198 /***
199 * Creates the server side version of this client side channel. On the server side
200 * the client's side sendChannel is the receiveChannel and vice versa
201 *
202 * @return
203 */
204 public VmTransportChannel createServerSide() throws JMSException {
205 VmTransportChannel channel = new VmTransportChannel(getReceiveChannel(), getSendChannel());
206 channel.clientSide = this;
207 return channel;
208 }
209
210 public void setPacketListener(PacketListener listener) {
211 super.setPacketListener(listener);
212 if (clientSide != null) {
213 clientSide.sendListener = listener;
214
215 }
216 }
217
218 /***
219 * Can this wireformat process packets of this version
220 * @param version the version number to test
221 * @return true if can accept the version
222 */
223 public boolean canProcessWireFormatVersion(int version){
224 return true;
225 }
226
227 /***
228 * @return the current version of this wire format
229 */
230 public int getCurrentWireFormatVersion(){
231 return 1;
232 }
233
234
235
236 public int getReceiveCapacity() {
237 return receiveCapacity;
238 }
239
240 public void setReceiveCapacity(int receiveCapacity) {
241 this.receiveCapacity = receiveCapacity;
242 }
243
244 public int getSendCapacity() {
245 return sendCapacity;
246 }
247
248 public void setSendCapacity(int sendCapacity) {
249 this.sendCapacity = sendCapacity;
250 }
251
252 public boolean isAsyncSend() {
253 return asyncSend;
254 }
255
256 public void setAsyncSend(boolean asyncSend) {
257 this.asyncSend = asyncSend;
258 }
259
260 public Channel getSendChannel() {
261 if (isAsyncSend()) {
262 if (sendChannel == null) {
263 sendChannel = createChannel(getSendCapacity());
264 }
265 }
266 return sendChannel;
267 }
268
269 public void setSendChannel(Channel sendChannel) {
270 this.sendChannel = sendChannel;
271 }
272
273 public Channel getReceiveChannel() {
274 if (isAsyncSend()) {
275 if (receiveChannel == null) {
276 receiveChannel = createChannel(getReceiveCapacity());
277 }
278 }
279 return receiveChannel;
280 }
281
282 public void setReceiveChannel(Channel receiveChannel) {
283 this.receiveChannel = receiveChannel;
284 }
285
286
287
288 protected static synchronized int getNextThreadId() {
289 return lastThreadId++;
290 }
291
292 protected Channel createChannel(int capacity) {
293 return new BoundedLinkedQueue(capacity);
294 }
295
296 /***
297 * Creates a sender PacketListener which handles any receipts then delegates
298 * to the ultimate PacketListener (typically the JMS client)
299 *
300 * @return
301 */
302 protected PacketListener createPacketListenerSender() {
303 return new PacketListener() {
304 public void consume(Packet packet) {
305 doConsumePacket(packet, getPacketListener());
306 }
307 };
308 }
309
310 protected void doClose(Exception ex) {
311 if (!closed.get()) {
312 JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
313 jmsEx.setLinkedException(ex);
314 onAsyncException(jmsEx);
315 stop();
316 }
317 }
318 }