1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.usecases;
19
20 import org.codehaus.activemq.ActiveMQConnectionFactory;
21 import org.codehaus.activemq.TestSupport;
22
23 import javax.jms.Connection;
24 import javax.jms.DeliveryMode;
25 import javax.jms.Message;
26 import javax.jms.MessageProducer;
27 import javax.jms.Session;
28 import javax.jms.TextMessage;
29 import javax.jms.Topic;
30 import javax.jms.TopicSubscriber;
31
32 /***
33 * @author Paul Smith
34 * @version $Revision: 1.4 $
35 */
36 public class SubscribeClosePublishThenConsumeTest extends TestSupport {
37
38 public void testDurableTopic() throws Exception {
39 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
40 connectionFactory.setUseEmbeddedBroker(true);
41
42 String topicName = "TestTopic";
43 String clientID = "MyClientID";
44 String subscriberName = "MySubscriber";
45
46
47
48
49
50 Connection connection = connectionFactory.createConnection();
51 connection.setClientID(clientID);
52
53 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
54 Topic topic = session.createTopic(topicName);
55
56
57
58 TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriberName);
59 connection.start();
60
61 topic = null;
62 subscriber.close();
63 subscriber = null;
64
65 session.close();
66 session = null;
67
68 connection.close();
69 connection = null;
70
71
72 connection = connectionFactory.createConnection();
73
74 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
75 topic = session.createTopic(topicName);
76 MessageProducer producer = session.createProducer(topic);
77 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
78 TextMessage textMessage = session.createTextMessage("Hello World");
79 producer.send(textMessage);
80 textMessage = null;
81
82 topic = null;
83 session.close();
84 session = null;
85
86 connection.close();
87 connection = null;
88
89
90
91
92 connection = connectionFactory.createConnection();
93 connection.setClientID(clientID);
94 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
95 topic = session.createTopic(topicName);
96
97 subscriber = session.createDurableSubscriber(topic, subscriberName);
98 connection.start();
99
100 log.info("Started connection - now about to try receive the textMessage");
101
102 long time = System.currentTimeMillis();
103 Message message = subscriber.receive(15000L);
104 long elapsed = System.currentTimeMillis() - time;
105
106 log.info("Waited for: " + elapsed + " millis");
107
108 assertNotNull("Should have received the message we published by now", message);
109 assertTrue("should be text textMessage", message instanceof TextMessage);
110 textMessage = (TextMessage) message;
111 assertEquals("Hello World", textMessage.getText());
112 }
113 }