1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.ember;
19
20 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.Packet;
24 import org.codehaus.activemq.message.WireFormat;
25 import org.codehaus.activemq.transport.TransportChannelSupport;
26 import pyrasun.eio.EIOGlobalContext;
27 import pyrasun.eio.services.EmberServiceController;
28 import pyrasun.eio.services.EmberServiceException;
29 import pyrasun.eio.services.bytearray.ByteArrayServerClient;
30 import pyrasun.eio.services.bytearray.ByteArrayServerClientListener;
31
32 import javax.jms.JMSException;
33 import java.io.IOException;
34
35 /***
36 * An EmberIO (using NIO) implementation of a TransportChannel
37 *
38 * @version $Revision: 1.15 $
39 */
40 public class EmberTransportChannel extends TransportChannelSupport implements ByteArrayServerClientListener {
41
42 private static final Log log = LogFactory.getLog(EmberTransportChannel.class);
43
44 private WireFormat wireFormat;
45 private EIOGlobalContext context;
46 private EmberServiceController controller;
47 private ByteArrayServerClient client;
48
49 private SynchronizedBoolean closed;
50 private SynchronizedBoolean started;
51
52
53 /***
54 * Construct basic helpers
55 */
56 protected EmberTransportChannel(WireFormat wireFormat) {
57 this.wireFormat = wireFormat;
58
59 closed = new SynchronizedBoolean(false);
60 started = new SynchronizedBoolean(false);
61 }
62
63 /***
64 * Connect to a remote Node - e.g. a Broker
65 */
66 public EmberTransportChannel(WireFormat wireFormat, EIOGlobalContext context, EmberServiceController controller, ByteArrayServerClient client) {
67 this(wireFormat);
68 this.context = context;
69 this.client = client;
70 this.controller = controller;
71 client.setListener(this);
72 }
73
74 /***
75 * close the channel
76 */
77 public void stop() {
78 super.stop();
79 if (closed.commit(false, true)) {
80 try {
81
82 if (controller != null) {
83 controller.stopAll();
84 }
85 if (context != null) {
86 context.stop();
87 }
88 }
89 catch (EmberServiceException e) {
90 log.error("Caught while closing: " + e, e);
91 }
92 }
93 }
94
95 /***
96 * start listeneing for events
97 *
98 * @throws JMSException if an error occurs
99 */
100 public void start() throws JMSException {
101 if (started.commit(false, true)) {
102
103 try {
104
105
106 if (context != null) {
107 context.start();
108 }
109 if (controller != null) {
110 controller.startAll();
111 }
112 }
113 catch (EmberServiceException e) {
114 JMSException jmsEx = new JMSException("Error starting NIO client: " + e.getMessage());
115 jmsEx.setLinkedException(e);
116 throw jmsEx;
117 }
118 }
119 }
120
121
122 /***
123 * Asynchronously send a Packet
124 *
125 * @param packet
126 * @throws JMSException
127 */
128 public void asyncSend(Packet packet) throws JMSException {
129 try {
130 byte[] bytes = wireFormat.toBytes(packet);
131
132 synchronized (client) {
133 client.write(bytes);
134 }
135 }
136 catch (IOException e) {
137 throw createJMSException("Failed to write packet: " + packet + ". ", e);
138 }
139 }
140
141
142 public boolean isMulticast() {
143 return false;
144 }
145
146 /***
147 * Factory method to create a JMSException which is linked to the base exception
148 */
149 protected JMSException createJMSException(String message, Exception ex) {
150 JMSException jmsEx = new JMSException(message + ex.getMessage());
151 jmsEx.setLinkedException(ex);
152 return jmsEx;
153 }
154
155 /***
156 * pretty print for object
157 *
158 * @return String representation of this object
159 */
160 public String toString() {
161 return "EmberTransportChannel: " + client;
162 }
163
164 public void newMessage(ByteArrayServerClient client, Object msg) {
165 byte[] bytes = (byte[]) msg;
166 Packet packet = null;
167 try {
168 packet = wireFormat.fromBytes(bytes);
169 doConsumePacket(packet);
170 }
171 catch (IOException e) {
172 log.error("Could not parse byte[] of size: " + bytes.length + ". Reason: " + e, e);
173 }
174
175 }
176
177 /***
178 * Can this wireformat process packets of this version
179 * @param version the version number to test
180 * @return true if can accept the version
181 */
182 public boolean canProcessWireFormatVersion(int version){
183 return wireFormat.canProcessWireFormatVersion(version);
184 }
185
186 /***
187 * @return the current version of this wire format
188 */
189 public int getCurrentWireFormatVersion(){
190 return wireFormat.getCurrentWireFormatVersion();
191 }
192 }