1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.usecases;
20 import java.util.HashMap;
21 import javax.jms.Connection;
22 import javax.jms.DeliveryMode;
23 import javax.jms.Destination;
24 import javax.jms.JMSException;
25 import javax.jms.Message;
26 import javax.jms.MessageConsumer;
27 import javax.jms.MessageProducer;
28 import javax.jms.ObjectMessage;
29 import javax.jms.Queue;
30 import javax.jms.QueueConnection;
31 import javax.jms.QueueReceiver;
32 import javax.jms.QueueSender;
33 import javax.jms.QueueSession;
34 import javax.jms.Session;
35 import javax.jms.TextMessage;
36 import javax.jms.Topic;
37 import org.codehaus.activemq.ActiveMQConnection;
38 import org.codehaus.activemq.ActiveMQConnectionFactory;
39 import org.codehaus.activemq.TestSupport;
40 import org.codehaus.activemq.broker.BrokerContainer;
41 import org.codehaus.activemq.broker.impl.BrokerContainerImpl;
42 import org.codehaus.activemq.util.IdGenerator;
43 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
44 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
45
46 /***
47 * @version $Revision: 1.4 $
48 */
49 public class ReliableReconnectTest extends TestSupport {
50 private static final int RECEIVE_TIMEOUT = 10000;
51 protected static final int MESSAGE_COUNT = 100;
52 private IdGenerator idGen = new IdGenerator();
53 protected int deliveryMode = DeliveryMode.PERSISTENT;
54 protected String consumerClientId;
55 protected Destination destination;
56 protected SynchronizedBoolean closeBroker = new SynchronizedBoolean(false);
57 protected SynchronizedInt messagesReceived = new SynchronizedInt(0);
58 protected BrokerContainer brokerContainer;
59 protected int firstBatch = MESSAGE_COUNT/10;
60
61 public ReliableReconnectTest() {
62 }
63
64 public ReliableReconnectTest(String n) {
65 super(n);
66 }
67
68 protected void setUp() throws Exception {
69 consumerClientId = idGen.generateId();
70 super.setUp();
71 topic = true;
72 destination = createDestination(getClass().getName());
73 }
74
75 public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
76 String url = "reliable:" + ActiveMQConnection.DEFAULT_URL;
77 return new ActiveMQConnectionFactory(url);
78 }
79
80 protected void startBroker() throws JMSException {
81 brokerContainer = new BrokerContainerImpl();
82 String url = ActiveMQConnection.DEFAULT_URL;
83 brokerContainer.addConnector(url);
84 brokerContainer.start();
85 }
86
87 protected Connection createConsumerConnection() throws Exception {
88 Connection consumerConnection = getConnectionFactory().createConnection();
89 consumerConnection.setClientID(consumerClientId);
90 consumerConnection.start();
91 return consumerConnection;
92 }
93
94 protected MessageConsumer createConsumer(Connection con) throws Exception {
95 Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
96 return s.createDurableSubscriber((Topic) destination, "TestFred");
97 }
98
99 protected void spawnConsumer() {
100 Thread thread = new Thread(new Runnable() {
101 public void run() {
102 try {
103 Connection consumerConnection = createConsumerConnection();
104 MessageConsumer consumer = createConsumer(consumerConnection);
105
106
107 for (int i = 0;i < firstBatch;i++) {
108 Message msg = consumer.receive(RECEIVE_TIMEOUT);
109 if (msg != null) {
110
111 messagesReceived.increment();
112 }
113 }
114 synchronized (closeBroker) {
115 closeBroker.set(true);
116 closeBroker.notify();
117 }
118 Thread.sleep(2000);
119 for (int i = firstBatch;i < MESSAGE_COUNT;i++) {
120 Message msg = consumer.receive(RECEIVE_TIMEOUT);
121
122 if (msg != null) {
123 messagesReceived.increment();
124 }
125 }
126 consumerConnection.close();
127 synchronized (messagesReceived) {
128 messagesReceived.notify();
129 }
130 }
131 catch (Throwable e) {
132 e.printStackTrace();
133 }
134 }
135 });
136 thread.start();
137 }
138
139 public void testReconnect() throws Exception {
140 startBroker();
141
142 Connection consumerConnection = createConsumerConnection();
143 createConsumer(consumerConnection);
144 consumerConnection.close();
145
146 Connection connection = createConnection();
147 connection.setClientID(idGen.generateId());
148 connection.start();
149 Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
150 MessageProducer producer = producerSession.createProducer(destination);
151 TextMessage msg = producerSession.createTextMessage();
152 for (int i = 0;i < MESSAGE_COUNT;i++) {
153 msg.setText("msg: " + i);
154 producer.send(msg);
155 }
156 connection.close();
157 spawnConsumer();
158 synchronized (closeBroker) {
159 if (!closeBroker.get()) {
160 closeBroker.wait();
161 }
162 }
163 System.err.println("Stopping broker");
164 brokerContainer.stop();
165 startBroker();
166 System.err.println("Started Broker again");
167 synchronized (messagesReceived) {
168 if (messagesReceived.get() < MESSAGE_COUNT) {
169 messagesReceived.wait(60000);
170 }
171 }
172
173 int count = messagesReceived.get();
174 assertTrue("Not enough messages received: " + count, count > firstBatch);
175 }
176 }