1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq.usecases; 19 20 import org.codehaus.activemq.ActiveMQConnectionFactory; 21 import org.codehaus.activemq.JmsSendReceiveTestSupport; 22 import org.codehaus.activemq.message.ActiveMQTopic; 23 24 import javax.jms.Connection; 25 import javax.jms.Destination; 26 import javax.jms.JMSException; 27 import javax.jms.Message; 28 import javax.jms.MessageConsumer; 29 import javax.jms.MessageListener; 30 import javax.jms.Session; 31 import java.util.List; 32 33 /*** 34 * @version $Revision: 1.4 $ 35 */ 36 public class CompositePublishTest extends JmsSendReceiveTestSupport { 37 38 protected Connection sendConnection; 39 protected Connection receiveConnection; 40 protected Session receiveSession; 41 protected MessageConsumer[] consumers; 42 protected List[] messageLists; 43 44 protected void setUp() throws Exception { 45 super.setUp(); 46 47 connectionFactory = createConnectionFactory(); 48 49 sendConnection = createConnection(); 50 sendConnection.start(); 51 52 receiveConnection = createConnection(); 53 receiveConnection.start(); 54 55 System.out.println("Created sendConnection: " + sendConnection); 56 System.out.println("Created receiveConnection: " + receiveConnection); 57 58 session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 59 receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 60 61 System.out.println("Created sendSession: " + session); 62 System.out.println("Created receiveSession: " + receiveSession); 63 64 producer = session.createProducer(null); 65 66 System.out.println("Created producer: " + producer); 67 68 if (topic) { 69 consumerDestination = session.createTopic(getConsumerSubject()); 70 producerDestination = session.createTopic(getProducerSubject()); 71 } 72 else { 73 consumerDestination = session.createQueue(getConsumerSubject()); 74 producerDestination = session.createQueue(getProducerSubject()); 75 } 76 77 System.out.println("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); 78 System.out.println("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); 79 80 Destination[] destinations = getDestinations(); 81 consumers = new MessageConsumer[destinations.length]; 82 messageLists = new List[destinations.length]; 83 for (int i = 0; i < destinations.length; i++) { 84 Destination dest = destinations[i]; 85 messageLists[i] = createConcurrentList(); 86 consumers[i] = receiveSession.createConsumer(dest); 87 consumers[i].setMessageListener(createMessageListener(i, messageLists[i])); 88 } 89 90 91 System.out.println("Started connections"); 92 } 93 94 protected MessageListener createMessageListener(int i, final List messageList) { 95 return new MessageListener() { 96 public void onMessage(Message message) { 97 consumeMessage(message, messageList); 98 } 99 }; 100 } 101 102 /*** 103 * Returns the subject on which we publish 104 */ 105 protected String getSubject() { 106 return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y"; 107 } 108 109 /*** 110 * Returns the destinations to which we consume 111 */ 112 protected Destination[] getDestinations() { 113 return new Destination[]{new ActiveMQTopic(getPrefix() + "FOO.BAR"), new ActiveMQTopic(getPrefix() + "FOO.*"), new ActiveMQTopic(getPrefix() + "FOO.X.Y")}; 114 } 115 116 protected String getPrefix() { 117 return super.getSubject() + "."; 118 } 119 120 protected void assertMessagesAreReceived() throws JMSException { 121 waitForMessagesToBeDelivered(); 122 123 for (int i = 0, size = messageLists.length; i < size; i++) { 124 System.out.println("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)"); 125 } 126 127 for (int i = 0, size = messageLists.length; i < size; i++) { 128 assertMessagesReceivedAreValid(messageLists[i]); 129 } 130 } 131 132 protected ActiveMQConnectionFactory createConnectionFactory() { 133 return new ActiveMQConnectionFactory("vm://localhost"); 134 } 135 136 protected void tearDown() throws Exception { 137 session.close(); 138 receiveSession.close(); 139 140 sendConnection.close(); 141 receiveConnection.close(); 142 } 143 }