1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQDestination;
23
24 import javax.jms.Connection;
25 import javax.jms.Destination;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageConsumer;
29 import javax.jms.MessageListener;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.TextMessage;
33 import java.util.List;
34 import java.util.Vector;
35
36 /***
37 * @version $Revision: 1.6 $
38 */
39 public class JmsTopicRequestReplyTest extends TestSupport implements MessageListener {
40 private final Log log = LogFactory.getLog(getClass());
41
42 private Connection serverConnection;
43 private Connection clientConnection;
44 private MessageProducer replyProducer;
45 private Session serverSession;
46 private Destination requestDestination;
47 private List failures = new Vector();
48 private boolean dynamicallyCreateProducer;
49 protected boolean useAsyncConsume = false;
50 private String clientSideClientID;
51
52 public void testSendAndReceive() throws Exception {
53 clientConnection = createConnection();
54 clientConnection.setClientID("ClientConnection:" + getSubject());
55
56 Session session = clientConnection.createSession(false,
57 Session.AUTO_ACKNOWLEDGE);
58
59 clientConnection.start();
60
61 Destination replyDestination = createTemporaryDestination(session);
62
63
64
65 clientSideClientID = clientConnection.getClientID();
66 String value = ActiveMQDestination.getClientId((ActiveMQDestination) replyDestination);
67 assertEquals("clientID from the temporary destination must be the same", clientSideClientID, value);
68 log.info("Both the clientID and destination clientID match properly: " + clientSideClientID);
69
70
71
72 MessageProducer requestProducer =
73 session.createProducer(requestDestination);
74 MessageConsumer replyConsumer =
75 session.createConsumer(replyDestination);
76
77
78
79 TextMessage requestMessage = session.createTextMessage("Olivier");
80 requestMessage.setJMSReplyTo(replyDestination);
81 requestProducer.send(requestMessage);
82
83 log.info("Sent request.");
84 log.info(requestMessage.toString());
85
86 Message msg = replyConsumer.receive(4000);
87
88
89 if (msg instanceof TextMessage) {
90 TextMessage replyMessage = (TextMessage) msg;
91 log.info("Received reply.");
92 log.info(replyMessage.toString());
93
94 assertEquals("Wrong message content", "Hello: Olivier", replyMessage.getText());
95 }
96 else {
97 fail("Should have received a reply by now");
98 }
99
100 assertEquals("Should not have had any failures: " + failures, 0, failures.size());
101 }
102
103 public void testSendAndReceiveWithDynamicallyCreatedProducer() throws Exception {
104 dynamicallyCreateProducer = true;
105 testSendAndReceive();
106 }
107
108 /***
109 * Use the asynchronous subscription mechanism
110 */
111 public void onMessage(Message message) {
112 try {
113 TextMessage requestMessage = (TextMessage) message;
114
115 log.info("Received request.");
116 log.info(requestMessage.toString());
117
118 Destination replyDestination = requestMessage.getJMSReplyTo();
119
120 String value = ActiveMQDestination.getClientId((ActiveMQDestination) replyDestination);
121 assertEquals("clientID from the temporary destination must be the same", clientSideClientID, value);
122
123 TextMessage replyMessage = serverSession.createTextMessage("Hello: " + requestMessage.getText());
124
125 replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
126
127 if (dynamicallyCreateProducer) {
128 replyProducer = serverSession.createProducer(replyDestination);
129 replyProducer.send(replyMessage);
130 }
131 else {
132 replyProducer.send(replyDestination, replyMessage);
133 }
134
135 log.info("Sent reply.");
136 log.info(replyMessage.toString());
137 }
138 catch (JMSException e) {
139 onException(e);
140 }
141 }
142
143 /***
144 * Use the synchronous subscription mechanism
145 */
146 protected void syncConsumeLoop(MessageConsumer requestConsumer) {
147 try {
148 Message message = requestConsumer.receive(5000);
149 if (message != null) {
150 onMessage(message);
151 }
152 else {
153 log.error("No message received");
154 }
155 }
156 catch (JMSException e) {
157 onException(e);
158 }
159 }
160
161
162 protected void setUp() throws Exception {
163 super.setUp();
164
165 serverConnection = createConnection();
166 serverConnection.setClientID("serverConnection:" + getSubject());
167 serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
168
169 replyProducer = serverSession.createProducer(null);
170
171 requestDestination = createDestination(serverSession);
172
173
174 final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination);
175 if (useAsyncConsume) {
176 requestConsumer.setMessageListener(this);
177 }
178 else {
179 Thread thread = new Thread(new Runnable() {
180 public void run() {
181 syncConsumeLoop(requestConsumer);
182 }
183 });
184 thread.start();
185 }
186 serverConnection.start();
187 }
188
189 protected void tearDown() throws Exception {
190 super.tearDown();
191
192 serverConnection.close();
193 clientConnection.stop();
194 clientConnection.close();
195 }
196
197 protected void onException(JMSException e) {
198 log.info("Caught: " + e);
199 e.printStackTrace();
200 failures.add(e);
201 }
202
203 protected Destination createDestination(Session session) throws JMSException {
204 if (topic) {
205 return session.createTopic(getSubject());
206 }
207 return session.createQueue(getSubject());
208 }
209
210 protected Destination createTemporaryDestination(Session session) throws JMSException {
211 if (topic) {
212 return session.createTemporaryTopic();
213 }
214 return session.createTemporaryQueue();
215 }
216
217 }