1 /*** 2 * 3 * Copyright 2004 Protique Ltd 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 **/ 18 package org.codehaus.activemq; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 import org.codehaus.activemq.message.ActiveMQDestination; 23 24 import javax.jms.Connection; 25 import javax.jms.Destination; 26 import javax.jms.JMSException; 27 import javax.jms.Message; 28 import javax.jms.MessageConsumer; 29 import javax.jms.MessageListener; 30 import javax.jms.MessageProducer; 31 import javax.jms.Session; 32 import javax.jms.TextMessage; 33 import javax.jms.Topic; 34 import java.util.List; 35 import java.util.Vector; 36 37 /*** 38 * @version $Revision: 1.2 $ 39 */ 40 public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener { 41 private final Log log = LogFactory.getLog(getClass()); 42 43 private Connection connection; 44 private Session publisherSession; 45 private Session consumerSession; 46 private MessageConsumer consumer; 47 private MessageConsumer testConsumer; 48 private MessageProducer producer; 49 private Topic topic; 50 private Object lock = new Object(); 51 52 53 /*** 54 * @throws Exception 55 */ 56 public void testCreateConsumer() throws Exception{ 57 Message msg = super.createMessage(); 58 producer.send(msg); 59 if (testConsumer == null){ 60 synchronized(lock){ 61 lock.wait(3000); 62 } 63 } 64 assertTrue(testConsumer != null); 65 } 66 67 /*** 68 * Use the asynchronous subscription mechanism 69 * @param message 70 */ 71 public void onMessage(Message message) { 72 try { 73 testConsumer = consumerSession.createConsumer(topic); 74 MessageProducer anotherProducer = consumerSession.createProducer(topic); 75 synchronized(lock){ 76 lock.notify(); 77 } 78 }catch (Exception ex){ 79 ex.printStackTrace(); 80 assertTrue(false); 81 } 82 } 83 84 85 86 87 protected void setUp() throws Exception { 88 super.setUp(); 89 super.topic = true; 90 connection = createConnection(); 91 connection.setClientID("connection:" + getSubject()); 92 publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 93 consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 94 topic = (Topic)super.createDestination("Test.Topic"); 95 consumer = consumerSession.createConsumer(topic); 96 consumer.setMessageListener(this); 97 producer = publisherSession.createProducer(topic); 98 connection.start(); 99 } 100 101 protected void tearDown() throws Exception { 102 super.tearDown(); 103 connection.close(); 104 } 105 }