1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 19 package org.codehaus.activemq; 20 21 import javax.jms.DeliveryMode; 22 import javax.jms.Destination; 23 import javax.jms.JMSException; 24 import javax.jms.Message; 25 import javax.jms.MessageConsumer; 26 import javax.jms.MessageListener; 27 import javax.jms.MessageProducer; 28 import javax.jms.Session; 29 import javax.jms.TextMessage; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 import java.util.Collections; 33 import java.util.Date; 34 import java.util.Iterator; 35 import java.util.List; 36 37 /*** 38 * @version $Revision: 1.27 $ 39 */ 40 public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener { 41 protected int messageCount = 100; 42 protected String[] data; 43 protected Session session; 44 protected MessageConsumer consumer; 45 protected MessageProducer producer; 46 protected Destination consumerDestination; 47 protected Destination producerDestination; 48 protected List messages = createConcurrentList(); 49 protected boolean topic = true; 50 protected boolean durable = false; 51 protected int deliveryMode = DeliveryMode.PERSISTENT; 52 protected final Object lock = new Object(); 53 protected boolean verbose = false; 54 55 public void testSendReceive() throws Exception { 56 messages.clear(); 57 for (int i = 0; i < data.length; i++) { 58 Message message = session.createTextMessage(data[i]); 59 if (verbose) { 60 System.out.println("About to send a message: " + message + " with text: " + data[i]); 61 } 62 producer.send(producerDestination, message); 63 } 64 assertMessagesAreReceived(); 65 System.out.println("" + data.length + " messages(s) received, closing down connections"); 66 } 67 68 protected void assertMessagesAreReceived() throws JMSException { 69 waitForMessagesToBeDelivered(); 70 assertMessagesReceivedAreValid(messages); 71 } 72 73 protected void assertMessagesReceivedAreValid(List receivedMessages) throws JMSException { 74 List copyOfMessages = Arrays.asList(receivedMessages.toArray()); 75 int counter = 0; 76 if (data.length != copyOfMessages.size()) { 77 for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) { 78 TextMessage message = (TextMessage) iter.next(); 79 System.out.println("<== " + counter++ + " = " + message); 80 } 81 } 82 assertEquals("Not enough messages received", data.length, receivedMessages.size()); 83 for (int i = 0; i < data.length; i++) { 84 TextMessage received = (TextMessage) receivedMessages.get(i); 85 String text = received.getText(); 86 if (verbose) { 87 System.out.println("Received Text: " + text); 88 } 89 assertEquals("Message: " + i, data[i], text); 90 } 91 } 92 93 protected void waitForMessagesToBeDelivered() { 94 long maxWaitTime = 30000; 95 long waitTime = maxWaitTime; 96 long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); 97 synchronized (lock) { 98 while (messages.size() < data.length && waitTime >= 0) { 99 try { 100 lock.wait(200); 101 } 102 catch (InterruptedException e) { 103 e.printStackTrace(); 104 } 105 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 106 } 107 } 108 } 109 110 private void waitForTimeOrNotify(long timeout) { 111 try { 112 synchronized (lock) { 113 lock.wait(timeout); 114 } 115 } 116 catch (InterruptedException e) { 117 System.out.println("Caught: " + e); 118 } 119 } 120 121 protected void setUp() throws Exception { 122 super.setUp(); 123 String temp = System.getProperty("messageCount"); 124 if (temp != null) { 125 int i = Integer.parseInt(temp); 126 if (i > 0) { 127 messageCount = i; 128 } 129 } 130 System.out.println("Message count for test case is: " + messageCount); 131 data = new String[messageCount]; 132 for (int i = 0; i < messageCount; i++) { 133 data[i] = "Text for message: " + i + " at " + new Date(); 134 } 135 } 136 137 public synchronized void onMessage(Message message) { 138 consumeMessage(message, messages); 139 } 140 141 protected void consumeMessage(Message message, List messageList) { 142 if (verbose) { 143 System.out.println("Received message: " + message); 144 } 145 messageList.add(message); 146 if (messageList.size() >= data.length) { 147 synchronized (lock) { 148 lock.notifyAll(); 149 } 150 } 151 } 152 153 protected List createConcurrentList() { 154 return Collections.synchronizedList(new ArrayList()); 155 } 156 }