1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.jgroups;
19
20 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21 import EDU.oswego.cs.dl.util.concurrent.Executor;
22 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
23 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.codehaus.activemq.message.Packet;
27 import org.codehaus.activemq.message.WireFormat;
28 import org.codehaus.activemq.transport.TransportChannelSupport;
29 import org.codehaus.activemq.util.JMSExceptionHelper;
30 import org.jgroups.Address;
31 import org.jgroups.Channel;
32 import org.jgroups.ChannelClosedException;
33 import org.jgroups.ChannelException;
34 import org.jgroups.ChannelNotConnectedException;
35 import org.jgroups.Message;
36 import org.jgroups.TimeoutException;
37
38 import javax.jms.JMSException;
39 import java.io.IOException;
40
41 /***
42 * A JGroups implementation of a TransportChannel
43 *
44 * @version $Revision: 1.9 $
45 */
46 public class JGroupsTransportChannel extends TransportChannelSupport implements Runnable {
47 private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);
48
49 private Channel channel;
50 private Address localAddress = null;
51 private WireFormat wireFormat;
52 private SynchronizedBoolean closed;
53 private SynchronizedBoolean started;
54 private Object outboundLock;
55 private Executor executor;
56 private Thread thread;
57 private boolean useAsyncSend = false;
58
59 public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
60 this.wireFormat = wireFormat;
61 this.channel = channel;
62 this.executor = executor;
63 this.localAddress = channel.getLocalAddress();
64
65 closed = new SynchronizedBoolean(false);
66 started = new SynchronizedBoolean(false);
67 outboundLock = new Object();
68 if (useAsyncSend) {
69 executor = new PooledExecutor(new BoundedBuffer(1000), 1);
70 }
71 }
72
73 public String toString() {
74 return "JGroupsTransportChannel: " + channel;
75 }
76
77 /***
78 * close the channel
79 */
80 public void stop() {
81 if (closed.commit(false, true)) {
82 super.stop();
83 try {
84 stopExecutor(executor);
85 channel.disconnect();
86 channel.close();
87 }
88 catch (Exception e) {
89 log.warn("Caught while closing: " + e + ". Now Closed", e);
90 }
91 }
92 }
93
94 /***
95 * start listeneing for events
96 *
97 * @throws javax.jms.JMSException if an error occurs
98 */
99 public void start() throws JMSException {
100 if (started.commit(false, true)) {
101 thread = new Thread(this, toString());
102 if (isServerSide()) {
103 thread.setDaemon(true);
104 }
105 thread.start();
106 }
107 }
108
109
110 /***
111 * Asynchronously send a Packet
112 *
113 * @param packet
114 * @throws javax.jms.JMSException
115 */
116 public void asyncSend(final Packet packet) throws JMSException {
117 if (executor != null) {
118 try {
119 executor.execute(new Runnable() {
120 public void run() {
121 try {
122 writePacket(packet);
123 }
124 catch (JMSException e) {
125 onAsyncException(e);
126 }
127 }
128 });
129 }
130 catch (InterruptedException e) {
131 log.info("Caught: " + e, e);
132 }
133 }
134 else {
135 writePacket(packet);
136 }
137 }
138
139
140 public boolean isMulticast() {
141 return true;
142 }
143
144 /***
145 * Can this wireformat process packets of this version
146 * @param version the version number to test
147 * @return true if can accept the version
148 */
149 public boolean canProcessWireFormatVersion(int version){
150 return wireFormat.canProcessWireFormatVersion(version);
151 }
152
153 /***
154 * @return the current version of this wire format
155 */
156 public int getCurrentWireFormatVersion(){
157 return wireFormat.getCurrentWireFormatVersion();
158 }
159
160 /***
161 * reads packets from a Socket
162 */
163 public void run() {
164 log.trace("JGroups consumer thread starting");
165 while (!closed.get()) {
166 try {
167 Object value = channel.receive(0L);
168 if (value instanceof Message) {
169 Message message = (Message) value;
170
171
172
173 if (!localAddress.equals(message.getSrc())) {
174 byte[] data = message.getBuffer();
175 Packet packet = wireFormat.fromBytes(data);
176 if (packet != null) {
177 doConsumePacket(packet);
178 }
179 }
180 }
181
182
183
184
185
186
187
188
189
190 }
191 catch (IOException e) {
192 doClose(e);
193 }
194 catch (ChannelClosedException e) {
195 stop();
196 }
197 catch (ChannelNotConnectedException e) {
198 doClose(e);
199 }
200 catch (TimeoutException e) {
201
202 }
203 }
204 }
205
206 /***
207 * writes the packet to the channel
208 */
209 protected void writePacket(Packet packet) throws JMSException {
210 try {
211 synchronized (outboundLock) {
212 Address dest = null;
213 Message message = new Message(dest, localAddress, wireFormat.toBytes(packet));
214 channel.send(message);
215 }
216 }
217 catch (ChannelException e) {
218 throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
219 }
220 catch (IOException e) {
221 throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
222 }
223 }
224
225
226 private void doClose(Exception ex) {
227 if (!closed.get()) {
228 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
229 stop();
230 }
231 }
232 }