1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.usecases;
20 import javax.jms.Connection;
21 import javax.jms.DeliveryMode;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29 import javax.jms.TextMessage;
30 import junit.framework.TestCase;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.codehaus.activemq.ActiveMQConnectionFactory;
34 import org.codehaus.activemq.broker.BrokerContainer;
35 import org.codehaus.activemq.broker.impl.BrokerContainerImpl;
36 import org.codehaus.activemq.message.ActiveMQQueue;
37 import org.codehaus.activemq.message.ActiveMQTextMessage;
38 import org.codehaus.activemq.message.ActiveMQTopic;
39 import org.codehaus.activemq.transport.DiscoveryNetworkConnector;
40 import org.codehaus.activemq.transport.zeroconf.ZeroconfDiscoveryAgent;
41 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
42
43 /***
44 * @version $Revision: 1.3 $
45 */
46 public class TopicClusterTest extends TestCase implements MessageListener {
47 protected Log log = LogFactory.getLog(getClass());
48 protected Destination destination;
49 protected boolean topic = true;
50 protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
51 protected static int MESSAGE_COUNT = 50;
52 protected static int NUMBER_IN_CLUSTER = 3;
53 protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
54 protected MessageProducer[] producers;
55 protected Connection[] connections;
56
57 protected void setUp() throws Exception {
58 connections = new Connection[NUMBER_IN_CLUSTER];
59 producers = new MessageProducer[NUMBER_IN_CLUSTER];
60 Destination destination = createDestination();
61 int portStart = 50000;
62 for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
63 connections[i] = createConnection("broker(" + i + ")");
64 connections[i].setClientID("ClusterTest" + i);
65 connections[i].start();
66 Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
67 producers[i] = session.createProducer(destination);
68 producers[i].setDeliveryMode(deliveryMode);
69 MessageConsumer consumer = createMessageConsumer(session,destination);
70 consumer.setMessageListener(this);
71
72 }
73 System.out.println("Sleeping to ensure cluster is fully connected");
74 Thread.sleep(5000);
75 }
76
77 protected void tearDown() throws Exception {
78 if (connections != null) {
79 for (int i = 0;i < connections.length;i++) {
80 connections[i].close();
81 }
82 }
83 }
84
85 protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException{
86 return session.createConsumer(destination);
87 }
88
89 protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws JMSException {
90 BrokerContainer container = new BrokerContainerImpl(brokerName);
91 ZeroconfDiscoveryAgent agent = new ZeroconfDiscoveryAgent();
92 agent.setType(getClass().getName() + ".");
93 container.setDiscoveryAgent(agent);
94 String url = "tcp://localhost:0";
95 container.addConnector(url);
96 container.addNetworkConnector(new DiscoveryNetworkConnector(container));
97 container.start();
98
99
100 return new ActiveMQConnectionFactory(container,"vm://"+brokerName);
101 }
102
103 protected int expectedReceiveCount() {
104 return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
105 }
106
107 protected Connection createConnection(String name) throws JMSException {
108 return createGenericClusterFactory(name).createConnection();
109 }
110
111 protected Destination createDestination() {
112 return createDestination(getClass().getName());
113 }
114
115 protected Destination createDestination(String name) {
116 if (topic) {
117 return new ActiveMQTopic(name);
118 }
119 else {
120 return new ActiveMQQueue(name);
121 }
122 }
123
124
125 /***
126 * @param msg
127 */
128 public void onMessage(Message msg) {
129
130 receivedMessageCount.increment();
131 synchronized (receivedMessageCount) {
132 if (receivedMessageCount.get() >= expectedReceiveCount()) {
133 receivedMessageCount.notify();
134 }
135 }
136 }
137
138 /***
139 * @throws Exception
140 */
141 public void testSendReceive() throws Exception {
142 for (int i = 0;i < MESSAGE_COUNT;i++) {
143 TextMessage textMessage = new ActiveMQTextMessage();
144 textMessage.setText("MSG-NO:" + i);
145 for (int x = 0;x < producers.length;x++) {
146 producers[x].send(textMessage);
147 }
148 }
149 synchronized (receivedMessageCount) {
150 if (receivedMessageCount.get() < expectedReceiveCount()) {
151 receivedMessageCount.wait(20000);
152 }
153 }
154
155 Thread.sleep(2000);
156 System.err.println("GOT: " + receivedMessageCount.get());
157 assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
158 }
159
160 }