1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.usecases;
19
20 import java.util.HashMap;
21 import javax.jms.Connection;
22 import javax.jms.DeliveryMode;
23 import javax.jms.Destination;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageProducer;
27 import javax.jms.ObjectMessage;
28 import javax.jms.Queue;
29 import javax.jms.QueueConnection;
30 import javax.jms.QueueReceiver;
31 import javax.jms.QueueSender;
32 import javax.jms.QueueSession;
33 import javax.jms.Session;
34 import javax.jms.TextMessage;
35 import javax.jms.Topic;
36 import org.codehaus.activemq.TestSupport;
37 import org.codehaus.activemq.util.IdGenerator;
38
39 /***
40 * @version $Revision: 1.3 $
41 */
42 public class TopicRedeliverTest extends TestSupport {
43
44 private static final int RECEIVE_TIMEOUT = 10000;
45 private IdGenerator idGen = new IdGenerator();
46 protected int deliveryMode = DeliveryMode.PERSISTENT;
47 public TopicRedeliverTest(){
48 }
49
50 public TopicRedeliverTest(String n){
51 super(n);
52 }
53
54 protected void setup() throws Exception{
55 super.setUp();
56 topic = true;
57 }
58
59
60 /***
61 * test messages are acknowledged and recovered properly
62 * @throws Exception
63 */
64 public void testClientAcknowledge() throws Exception {
65 Destination destination = createDestination(getClass().getName());
66 Connection connection = createConnection();
67 connection.setClientID(idGen.generateId());
68 connection.start();
69 Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
70 MessageConsumer consumer = consumerSession.createConsumer(destination);
71 Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
72 MessageProducer producer = producerSession.createProducer(destination);
73 producer.setDeliveryMode(deliveryMode);
74
75
76
77 TextMessage sent1 = producerSession.createTextMessage();
78 sent1.setText("msg1");
79 producer.send(sent1);
80
81 TextMessage sent2 = producerSession.createTextMessage();
82 sent1.setText("msg2");
83 producer.send(sent2);
84
85 TextMessage sent3 = producerSession.createTextMessage();
86 sent1.setText("msg3");
87 producer.send(sent3);
88
89 Message rec1 = consumer.receive(RECEIVE_TIMEOUT);
90 Message rec2 = consumer.receive(RECEIVE_TIMEOUT);
91 Message rec3 = consumer.receive(RECEIVE_TIMEOUT);
92
93
94 rec2.acknowledge();
95
96 TextMessage sent4 = producerSession.createTextMessage();
97 sent4.setText("msg4");
98 producer.send(sent4);
99
100 Message rec4 = consumer.receive(RECEIVE_TIMEOUT);
101 assertTrue(rec4.equals(sent4));
102 consumerSession.recover();
103 rec4 = consumer.receive(RECEIVE_TIMEOUT);
104 assertTrue(rec4.equals(sent4));
105 assertTrue(rec4.getJMSRedelivered());
106 rec4.acknowledge();
107 connection.close();
108
109 }
110
111 /***
112 * Test redelivered flag is set on rollbacked transactions
113 * @throws Exception
114 */
115 public void testRedilveredFlagSetOnRollback() throws Exception {
116 Destination destination = createDestination(getClass().getName());
117 Connection connection = createConnection();
118 connection.setClientID(idGen.generateId());
119 connection.start();
120 Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
121 MessageConsumer consumer = null;
122 if (topic){
123 consumer = consumerSession.createDurableSubscriber((Topic)destination, "TESTRED");
124 }else{
125 consumer = consumerSession.createConsumer(destination);
126 }
127 Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
128 MessageProducer producer = producerSession.createProducer(destination);
129 producer.setDeliveryMode(deliveryMode);
130
131 TextMessage sentMsg = producerSession.createTextMessage();
132 sentMsg.setText("msg1");
133 producer.send(sentMsg);
134 producerSession.commit();
135
136 Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
137 assertTrue(recMsg.getJMSRedelivered() == false);
138 recMsg = consumer.receive(RECEIVE_TIMEOUT);
139 consumerSession.rollback();
140 recMsg = consumer.receive(RECEIVE_TIMEOUT);
141 assertTrue(recMsg.getJMSRedelivered());
142 consumerSession.commit();
143 assertTrue(recMsg.equals(sentMsg));
144 assertTrue(recMsg.getJMSRedelivered());
145 connection.close();
146 }
147
148
149 /***
150 * Check a session is rollbacked on a Session close();
151 * @throws Exception
152 */
153
154 public void XtestTransactionRollbackOnSessionClose() throws Exception {
155 Destination destination = createDestination(getClass().getName());
156 Connection connection = createConnection();
157 connection.setClientID(idGen.generateId());
158 connection.start();
159 Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
160 MessageConsumer consumer = null;
161 if (topic){
162 consumer = consumerSession.createDurableSubscriber((Topic)destination, "TESTRED");
163 }else{
164 consumer = consumerSession.createConsumer(destination);
165 }
166 Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
167 MessageProducer producer = producerSession.createProducer(destination);
168 producer.setDeliveryMode(deliveryMode);
169
170 TextMessage sentMsg = producerSession.createTextMessage();
171 sentMsg.setText("msg1");
172 producer.send(sentMsg);
173
174 producerSession.commit();
175
176 Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
177 assertTrue(recMsg.getJMSRedelivered() == false);
178 consumerSession.close();
179 consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
180 consumer = consumerSession.createConsumer(destination);
181
182 recMsg = consumer.receive(RECEIVE_TIMEOUT);
183 consumerSession.commit();
184 assertTrue(recMsg.equals(sentMsg));
185 connection.close();
186 }
187
188 /***
189 * check messages are actuallly sent on a tx rollback
190 * @throws Exception
191 */
192
193 public void testTransactionRollbackOnSend() throws Exception {
194 Destination destination = createDestination(getClass().getName());
195 Connection connection = createConnection();
196 connection.setClientID(idGen.generateId());
197 connection.start();
198 Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
199 MessageConsumer consumer = consumerSession.createConsumer(destination);
200 Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
201 MessageProducer producer = producerSession.createProducer(destination);
202 producer.setDeliveryMode(deliveryMode);
203
204 TextMessage sentMsg = producerSession.createTextMessage();
205 sentMsg.setText("msg1");
206 producer.send(sentMsg);
207 producerSession.commit();
208
209 Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
210 consumerSession.commit();
211 assertTrue(recMsg.equals(sentMsg));
212
213 sentMsg = producerSession.createTextMessage();
214 sentMsg.setText("msg2");
215 producer.send(sentMsg);
216 producerSession.rollback();
217
218 sentMsg = producerSession.createTextMessage();
219 sentMsg.setText("msg3");
220 producer.send(sentMsg);
221 producerSession.commit();
222
223 recMsg = consumer.receive(RECEIVE_TIMEOUT);
224 assertTrue(recMsg.equals(sentMsg));
225 consumerSession.commit();
226
227 connection.close();
228 }
229
230 }