1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport;
19
20 import junit.framework.TestCase;
21 import org.codehaus.activemq.message.ActiveMQMessage;
22 import org.codehaus.activemq.message.DefaultWireFormat;
23 import org.codehaus.activemq.message.Packet;
24 import org.codehaus.activemq.message.PacketListener;
25 import org.codehaus.activemq.message.Receipt;
26 import org.codehaus.activemq.message.WireFormat;
27 import org.codehaus.activemq.util.IdGenerator;
28
29 import javax.jms.ExceptionListener;
30 import javax.jms.JMSException;
31 import java.net.URI;
32 import java.net.URISyntaxException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Vector;
36
37 /***
38 * @version $Revision: 1.18 $
39 */
40 public class TransportChannelTestSupport extends TestCase implements PacketListener, TransportChannelListener {
41
42 protected int TEST_SIZE = 100;
43 protected Object mutex;
44 protected TransportChannel sender;
45 protected TransportChannel receiver;
46 protected TransportServerChannel server;
47 protected ArrayList packets;
48 protected List exceptions = new Vector();
49 protected boolean rpcTest = false;
50 private IdGenerator idGenerator = new IdGenerator();
51 protected WireFormat wireFormat = new DefaultWireFormat();
52 private boolean closeReceiver = true;
53
54 public TransportChannelTestSupport(String name) {
55 super(name);
56 }
57
58
59
60
61 public void testSendPacket() throws Exception {
62 System.out.println("Sending packets");
63
64 List tmpList = (List) packets.clone();
65 for (int i = 0; i < TEST_SIZE; i++) {
66 sender.asyncSend((Packet) tmpList.get(i));
67 }
68 System.out.println("Sent: " + TEST_SIZE + " packets");
69
70 for (int i = 0; i < 10; i++) {
71 synchronized (mutex) {
72 if (!packets.isEmpty()) {
73 mutex.wait(500);
74 }
75 }
76 }
77 assertTrue("Packets not consumed, still have: " + packets.size() + " packet(s) unconsumed", packets.isEmpty());
78 assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
79 }
80
81 public void testRpc() throws Exception {
82 rpcTest = true;
83
84 List tmpList = (List) packets.clone();
85 for (int i = 0; i < TEST_SIZE; i++) {
86 Packet packet = (Packet) tmpList.get(i);
87 Receipt receipt = sender.send(packet, 4000);
88 assertTrue("Receipt should not be null!", receipt != null);
89 System.out.println("Got receipt: " + receipt + " for packet: " + packet);
90 }
91
92 }
93
94 public void consume(Packet packet) {
95 System.out.println("Received packet: " + packet);
96
97 if (rpcTest) {
98
99 Receipt receipt = new Receipt();
100 receipt.setId(idGenerator.generateId());
101 receipt.setCorrelationId(packet.getId());
102 try {
103 receiver.asyncSend(receipt);
104 }
105 catch (JMSException e) {
106 logMessage("Sending receipt: " + receipt + " for packet: " + packet, e);
107 }
108 }
109 else {
110 packets.remove(packet);
111 if (packets.isEmpty()) {
112 synchronized (mutex) {
113 mutex.notify();
114 }
115 }
116 }
117 }
118
119 /***
120 * Assume that sender and receiver are created before we're invoked
121 */
122 protected void setUp() throws Exception {
123 super.setUp();
124
125 assertTrue("sender must be constructed in the TestCase before setUp() is invoked", sender != null);
126 assertTrue("receiver or server must be constructed in the TestCase before setUp() is invoked", receiver != null
127 || server != null);
128
129 mutex = new Object();
130
131 sender.setExceptionListener(new ExceptionListener() {
132
133 public void onException(JMSException ex) {
134 String message = "Sender got an exception:";
135 logMessage(message, ex);
136 }
137 });
138
139 sender.setPacketListener(new PacketListener() {
140
141 public void consume(Packet packet) {
142 System.err.println("Error - sender received a packet: " + packet);
143 exceptions.add(packet);
144 }
145
146 });
147
148 sender.setClientID("sender");
149 sender.start();
150
151 packets = new ArrayList(TEST_SIZE);
152 for (int i = 0; i < TEST_SIZE; i++) {
153 Packet test = new ActiveMQMessage();
154 test.setId("test:" + i);
155 packets.add(test);
156 }
157 }
158
159 protected void tearDown() throws Exception {
160
161 sender.setExceptionListener(null);
162 receiver.setExceptionListener(null);
163 super.tearDown();
164
165 System.out.println("Stopping sender");
166 sender.stop();
167 if (receiver == null) {
168 System.out.println("No receiver created!");
169 }
170 else {
171 if (closeReceiver) {
172 System.out.println("Stopping receiver");
173
174 receiver.stop();
175 }
176 else {
177 System.out.println("Receiver will be closed by the server");
178 }
179 }
180 if (server != null) {
181 System.out.println("Stopping server");
182 server.stop();
183 }
184 assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
185 }
186
187 protected void configureServer() throws JMSException {
188 if (server != null) {
189 server.setTransportChannelListener(this);
190 server.start();
191 System.out.println("Server has started");
192
193
194
195
196
197
198
199
200
201
202
203 }
204 }
205
206 protected void configureReceiver() {
207 receiver.setPacketListener(this);
208
209 receiver.setExceptionListener(new ExceptionListener() {
210
211 public void onException(JMSException ex) {
212 logMessage("Receiver got an exception:", ex);
213 }
214
215 });
216
217 receiver.setClientID("receiver");
218
219 try {
220 receiver.start();
221 }
222 catch (JMSException e) {
223 logMessage("Failure starting receiver: ", e);
224 }
225 System.out.println("Receiver has started");
226 }
227
228
229 protected void createSenderAndReceiver(String string) throws URISyntaxException, JMSException {
230 URI uri = new URI(string);
231 sender = TransportChannelProvider.create(wireFormat, uri);
232 receiver = TransportChannelProvider.create(wireFormat, uri);
233 if (receiver != null) {
234 configureReceiver();
235 }
236 }
237
238 protected void createSenderAndServer(String subject) throws URISyntaxException, JMSException {
239 URI uri = new URI(subject);
240 server = TransportServerChannelProvider.create(wireFormat, uri);
241 configureServer();
242 sender = TransportChannelProvider.create(wireFormat, uri);
243 }
244
245 protected void logMessage(String message, JMSException ex) {
246 System.err.println(message);
247 ex.printStackTrace();
248 Throwable t = ex.getLinkedException();
249 if (t != null && t != ex) {
250 System.out.println("Reason: " + t);
251 t.printStackTrace();
252 }
253 exceptions.add(ex);
254 }
255
256 public void addClient(TransportChannel channel) {
257 this.receiver = channel;
258 this.closeReceiver = false;
259
260 System.out.println("addClient() with receiver: " + receiver);
261
262 assertTrue("Should have received a receiver by now", receiver != null);
263
264 configureReceiver();
265 }
266
267 public void removeClient(TransportChannel channel) {
268 }
269 }