1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.transport;
19  
20  import junit.framework.TestCase;
21  import org.codehaus.activemq.message.ActiveMQMessage;
22  import org.codehaus.activemq.message.DefaultWireFormat;
23  import org.codehaus.activemq.message.Packet;
24  import org.codehaus.activemq.message.PacketListener;
25  import org.codehaus.activemq.message.Receipt;
26  import org.codehaus.activemq.message.WireFormat;
27  import org.codehaus.activemq.util.IdGenerator;
28  
29  import javax.jms.ExceptionListener;
30  import javax.jms.JMSException;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Vector;
36  
37  /***
38   * @version $Revision: 1.18 $
39   */
40  public class TransportChannelTestSupport extends TestCase implements PacketListener, TransportChannelListener {
41  
42      protected int TEST_SIZE = 100;
43      protected Object mutex;
44      protected TransportChannel sender;
45      protected TransportChannel receiver;
46      protected TransportServerChannel server;
47      protected ArrayList packets;
48      protected List exceptions = new Vector();
49      protected boolean rpcTest = false;
50      private IdGenerator idGenerator = new IdGenerator();
51      protected WireFormat wireFormat = new DefaultWireFormat();
52      private boolean closeReceiver = true;
53  
54      public TransportChannelTestSupport(String name) {
55          super(name);
56      }
57  
58      /*
59       * test for Receipt send(Packet, int)
60       */
61      public void testSendPacket() throws Exception {
62          System.out.println("Sending packets");
63  
64          List tmpList = (List) packets.clone();
65          for (int i = 0; i < TEST_SIZE; i++) {
66              sender.asyncSend((Packet) tmpList.get(i));
67          }
68          System.out.println("Sent: " + TEST_SIZE + " packets");
69  
70          for (int i = 0; i < 10; i++) {
71              synchronized (mutex) {
72                  if (!packets.isEmpty()) {
73                      mutex.wait(500);
74                  }
75              }
76          }
77          assertTrue("Packets not consumed, still have: " + packets.size() + " packet(s) unconsumed", packets.isEmpty());
78          assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
79      }
80  
81      public void testRpc() throws Exception {
82          rpcTest = true;
83  
84          List tmpList = (List) packets.clone();
85          for (int i = 0; i < TEST_SIZE; i++) {
86              Packet packet = (Packet) tmpList.get(i);
87              Receipt receipt = sender.send(packet, 4000);
88              assertTrue("Receipt should not be null!", receipt != null);
89              System.out.println("Got receipt: " + receipt + " for packet: " + packet);
90          }
91  
92      }
93  
94      public void consume(Packet packet) {
95          System.out.println("Received packet: " + packet);
96  
97          if (rpcTest) {
98              // lets send a receipt
99              Receipt receipt = new Receipt();
100             receipt.setId(idGenerator.generateId());
101             receipt.setCorrelationId(packet.getId());
102             try {
103                 receiver.asyncSend(receipt);
104             }
105             catch (JMSException e) {
106                 logMessage("Sending receipt: " + receipt + " for packet: " + packet, e);
107             }
108         }
109         else {
110             packets.remove(packet);
111             if (packets.isEmpty()) {
112                 synchronized (mutex) {
113                     mutex.notify();
114                 }
115             }
116         }
117     }
118 
119     /***
120      * Assume that sender and receiver are created before we're invoked
121      */
122     protected void setUp() throws Exception {
123         super.setUp();
124 
125         assertTrue("sender must be constructed in the TestCase before setUp() is invoked", sender != null);
126         assertTrue("receiver or server must be constructed in the TestCase before setUp() is invoked", receiver != null
127                 || server != null);
128 
129         mutex = new Object();
130 
131         sender.setExceptionListener(new ExceptionListener() {
132 
133             public void onException(JMSException ex) {
134                 String message = "Sender got an exception:";
135                 logMessage(message, ex);
136             }
137         });
138 
139         sender.setPacketListener(new PacketListener() {
140 
141             public void consume(Packet packet) {
142                 System.err.println("Error - sender received a packet: " + packet);
143                 exceptions.add(packet);
144             }
145 
146         });
147 
148         sender.setClientID("sender");
149         sender.start();
150 
151         packets = new ArrayList(TEST_SIZE);
152         for (int i = 0; i < TEST_SIZE; i++) {
153             Packet test = new ActiveMQMessage();
154             test.setId("test:" + i);
155             packets.add(test);
156         }
157     }
158 
159     protected void tearDown() throws Exception {
160         //getting exceptions when peers stop is acceptable
161         sender.setExceptionListener(null);
162         receiver.setExceptionListener(null);
163         super.tearDown();
164 
165         System.out.println("Stopping sender");
166         sender.stop();
167         if (receiver == null) {
168             System.out.println("No receiver created!");
169         }
170         else {
171             if (closeReceiver) {
172                 System.out.println("Stopping receiver");
173                 //assertTrue("No receiver created!", receiver != null);
174                 receiver.stop();
175             }
176             else {
177                 System.out.println("Receiver will be closed by the server");
178             }
179         }
180         if (server != null) {
181             System.out.println("Stopping server");
182             server.stop();
183         }
184         assertTrue("Exceptions were thrown: " + exceptions, exceptions.size() == 0);
185     }
186 
187     protected void configureServer() throws JMSException {
188         if (server != null) {
189             server.setTransportChannelListener(this);
190             server.start();
191             System.out.println("Server has started");
192 
193             // lets wait a little for the server to startup
194 /*
195             try {
196                 Thread.sleep(500);
197             }
198             catch (InterruptedException e) {
199                 System.out.println("Caught: " + e);
200                 e.printStackTrace();
201             }
202 */
203         }
204     }
205 
206     protected void configureReceiver() {
207         receiver.setPacketListener(this);
208 
209         receiver.setExceptionListener(new ExceptionListener() {
210 
211             public void onException(JMSException ex) {
212                 logMessage("Receiver got an exception:", ex);
213             }
214 
215         });
216 
217         receiver.setClientID("receiver");
218 
219         try {
220             receiver.start();
221         }
222         catch (JMSException e) {
223             logMessage("Failure starting receiver: ", e);
224         }
225         System.out.println("Receiver has started");
226     }
227 
228 
229     protected void createSenderAndReceiver(String string) throws URISyntaxException, JMSException {
230         URI uri = new URI(string);
231         sender = TransportChannelProvider.create(wireFormat, uri);
232         receiver = TransportChannelProvider.create(wireFormat, uri);
233         if (receiver != null) {
234             configureReceiver();
235         }
236     }
237 
238     protected void createSenderAndServer(String subject) throws URISyntaxException, JMSException {
239         URI uri = new URI(subject);
240         server = TransportServerChannelProvider.create(wireFormat, uri);
241         configureServer();
242         sender = TransportChannelProvider.create(wireFormat, uri);
243     }
244 
245     protected void logMessage(String message, JMSException ex) {
246         System.err.println(message);
247         ex.printStackTrace();
248         Throwable t = ex.getLinkedException();
249         if (t != null && t != ex) {
250             System.out.println("Reason: " + t);
251             t.printStackTrace();
252         }
253         exceptions.add(ex);
254     }
255 
256     public void addClient(TransportChannel channel) {
257         this.receiver = channel;
258         this.closeReceiver = false;
259 
260         System.out.println("addClient() with receiver: " + receiver);
261 
262         assertTrue("Should have received a receiver by now", receiver != null);
263 
264         configureReceiver();
265     }
266 
267     public void removeClient(TransportChannel channel) {
268     }
269 }