1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.usecases;
19
20 import org.codehaus.activemq.TestSupport;
21
22 import javax.jms.Connection;
23 import javax.jms.DeliveryMode;
24 import javax.jms.Destination;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageConsumer;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.Topic;
31
32 /***
33 * @version $Revision: 1.7 $
34 */
35 public class DurableConsumerCloseAndReconnectTest extends TestSupport {
36 protected static final long RECEIVE_TIMEOUT = 5000L;
37
38 private Connection connection;
39 private Session session;
40 private MessageConsumer consumer;
41 private MessageProducer producer;
42 private Destination destination;
43
44 public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
45
46 Connection dummyConnection = createConnection();
47
48 consumeMessagesDeliveredWhileConsumerClosed();
49
50 dummyConnection.close();
51
52
53 consumeMessagesDeliveredWhileConsumerClosed();
54 }
55
56 protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {
57 makeConsumer();
58 closeConsumer();
59
60 publish();
61
62
63 Thread.sleep(1000);
64
65 makeConsumer();
66
67 Message message = consumer.receive(RECEIVE_TIMEOUT);
68 assertTrue("Should have received a message!", message != null);
69
70 closeConsumer();
71
72 System.out.println("Now lets create the consumer again and because we didn't ack, we should get it again");
73 makeConsumer();
74
75 message = consumer.receive(RECEIVE_TIMEOUT);
76 assertTrue("Should have received a message!", message != null);
77 message.acknowledge();
78
79 closeConsumer();
80
81 System.out.println("Now lets create the consumer again and because we didn't ack, we should get it again");
82 makeConsumer();
83
84 message = consumer.receive(2000);
85 assertTrue("Should have no more messages left!", message == null);
86
87 closeConsumer();
88
89 System.out.println("Lets publish one more message now");
90 publish();
91
92 makeConsumer();
93 message = consumer.receive(RECEIVE_TIMEOUT);
94 assertTrue("Should have received a message!", message != null);
95 message.acknowledge();
96
97 closeConsumer();
98 }
99
100 protected void publish() throws Exception {
101 connection = createConnection();
102 connection.start();
103
104 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
105 destination = createDestination();
106
107 producer = session.createProducer(destination);
108 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
109
110 producer.send(session.createTextMessage("This is a test"));
111
112 producer.close();
113 producer = null;
114 closeSession();
115 }
116
117 protected Destination createDestination() throws JMSException {
118 if (isTopic()) {
119 return session.createTopic(getSubject());
120 }
121 else {
122 return session.createQueue(getSubject());
123 }
124 }
125
126 protected boolean isTopic() {
127 return true;
128 }
129
130 protected void closeConsumer() throws JMSException {
131 consumer.close();
132 consumer = null;
133 closeSession();
134 }
135
136 protected void closeSession() throws JMSException {
137 session.close();
138 session = null;
139 connection.close();
140 connection = null;
141 }
142
143 protected void makeConsumer() throws Exception {
144 String durableName = getName();
145 String clientID = getSubject();
146 System.out.println("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName);
147 createSession(clientID);
148 consumer = createConsumer(durableName);
149 }
150
151 private MessageConsumer createConsumer(String durableName) throws JMSException {
152 if (destination instanceof Topic) {
153 return session.createDurableSubscriber((Topic) destination, durableName);
154 }
155 else {
156 return session.createConsumer(destination);
157 }
158 }
159
160 protected void createSession(String clientID) throws Exception {
161 connection = createConnection();
162 connection.setClientID(clientID);
163 connection.start();
164
165 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
166 destination = createDestination();
167 }
168 }