/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSConsumerTest
extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class);
    public ActiveMQDestination destination;
    public int deliveryMode;
    public int prefetch;
    public int ackMode;
    public byte destinationType;
    public boolean durableConsumer;

    public static Test suite() {
        return JMSConsumerTest.suite(JMSConsumerTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)JMSConsumerTest.suite());
    }

    public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch done1 = new CountDownLatch(1);
        final CountDownLatch done2 = new CountDownLatch(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                counter.incrementAndGet();
                if (counter.get() == 1) {
                    done1.countDown();
                }
                if (counter.get() == 2) {
                    done2.countDown();
                }
            }
        });
        this.sendMessages(session, (Destination)this.destination, 1);
        JMSConsumerTest.assertTrue((boolean)done1.await(1L, TimeUnit.SECONDS));
        JMSConsumerTest.assertEquals((int)1, (int)counter.get());
        consumer.stop();
        this.sendMessages(session, (Destination)this.destination, 1);
        JMSConsumerTest.assertFalse((boolean)done2.await(500L, TimeUnit.MILLISECONDS));
        JMSConsumerTest.assertEquals((int)1, (int)counter.get());
        consumer.start();
        JMSConsumerTest.assertTrue((boolean)done2.await(500L, TimeUnit.MILLISECONDS));
        JMSConsumerTest.assertEquals((int)2, (int)counter.get());
    }

    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch closeDone = new CountDownLatch(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 2);
        this.destination = this.createDestination(session, (byte)1);
        this.sendMessages(session, (Destination)this.destination, 2000);
        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer((Destination)this.destination);
        final Map exceptions = Collections.synchronizedMap(new HashMap());
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                LOG.error("Uncaught exception:", e);
                exceptions.put(t, e);
            }
        });
        final ExecutorService executor = Executors.newCachedThreadPool();
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                final class AckAndClose
                implements Runnable {
                    private final Message message;
                    final /* synthetic */ AtomicInteger val$counter;
                    final /* synthetic */ ActiveMQMessageConsumer val$consumer;
                    final /* synthetic */ CountDownLatch val$closeDone;
                    final /* synthetic */ Map val$exceptions;

                    public AckAndClose(Message m) {
                        this.val$counter = atomicInteger;
                        this.val$consumer = activeMQMessageConsumer;
                        this.val$closeDone = countDownLatch;
                        this.val$exceptions = map;
                        this.message = m;
                    }

                    @Override
                    public void run() {
                        try {
                            int count = this.val$counter.incrementAndGet();
                            if (count == 590) {
                                this.val$consumer.close();
                                this.val$closeDone.countDown();
                            }
                            if (count % 200 == 0) {
                                try {
                                    this.message.acknowledge();
                                }
                                catch (IllegalStateException illegalStateException) {}
                            }
                        }
                        catch (Exception e) {
                            LOG.error("Exception on close or ack:", (Throwable)e);
                            this.val$exceptions.put(Thread.currentThread(), e);
                        }
                    }
                }
                executor.execute(new AckAndClose(JMSConsumerTest.this, m, counter, consumer, closeDone, exceptions));
            }
        });
        JMSConsumerTest.assertTrue((boolean)closeDone.await(20L, TimeUnit.SECONDS));
        executor.shutdown();
        executor.awaitTermination(1L, TimeUnit.SECONDS);
        JMSConsumerTest.assertTrue((String)("no exceptions: " + exceptions), (boolean)exceptions.isEmpty());
    }

    public void initCombosForTestMutiReceiveWithPrefetch1() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("ackMode", new Object[]{1, 3, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testMutiReceiveWithPrefetch1() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session session = this.connection.createSession(false, this.ackMode);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 4);
        Message message = null;
        for (int i = 0; i < 4; ++i) {
            message = consumer.receive(1000L);
            JMSConsumerTest.assertNotNull((Object)message);
        }
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
        message.acknowledge();
    }

    public void testReceiveTopicWithPrefetch1() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 2);
        this.destination = this.createDestination(session, (byte)2);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 4);
        Message message = null;
        for (int i = 0; i < 4; ++i) {
            message = consumer.receive(1000L);
            JMSConsumerTest.assertNotNull((Object)message);
        }
        final List<Subscription> subscriptions = TestSupport.getDestinationConsumers(this.broker, this.destination);
        JMSConsumerTest.assertTrue((String)"prefetch extension..", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                boolean prefetchExtended = false;
                for (Subscription subscription : subscriptions) {
                    if (!(subscription instanceof TopicSubscription) || ((TopicSubscription)subscription).getPrefetchExtension().get() != 4) continue;
                    prefetchExtended = true;
                }
                return prefetchExtended;
            }
        }));
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
        message.acknowledge();
        JMSConsumerTest.assertTrue((String)"prefetch extension back to 0", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                boolean prefetchExtended = true;
                for (Subscription subscription : subscriptions) {
                    if (!(subscription instanceof TopicSubscription) || ((TopicSubscription)subscription).getPrefetchExtension().get() != 0) continue;
                    prefetchExtended = false;
                }
                return !prefetchExtended;
            }
        }));
    }

    public void testReceiveQueueWithPrefetch1() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 2);
        this.destination = this.createDestination(session, (byte)1);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 4);
        Message message = null;
        for (int i = 0; i < 4; ++i) {
            message = consumer.receive(1000L);
            JMSConsumerTest.assertNotNull((Object)message);
        }
        final List<Subscription> subscriptions = TestSupport.getDestinationConsumers(this.broker, this.destination);
        JMSConsumerTest.assertTrue((String)"prefetch extension..", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                boolean prefetchExtended = false;
                for (Subscription subscription : subscriptions) {
                    LOG.info("Sub:" + subscription);
                    if (!(subscription instanceof QueueSubscription) || ((QueueSubscription)subscription).getPrefetchExtension().get() != 4) continue;
                    prefetchExtended = true;
                }
                return prefetchExtended;
            }
        }));
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
        message.acknowledge();
        JMSConsumerTest.assertTrue((String)"prefetch extension back to 0", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                boolean prefetchExtended = true;
                for (Subscription subscription : subscriptions) {
                    if (!(subscription instanceof QueueSubscription) || ((QueueSubscription)subscription).getPrefetchExtension().get() != 0) continue;
                    prefetchExtended = false;
                }
                return !prefetchExtended;
            }
        }));
    }

    public void initCombosForTestDurableConsumerSelectorChange() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)2});
    }

    public void testDurableConsumerSelectorChange() throws Exception {
        this.connection.setClientID("test");
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageProducer producer = session.createProducer((Destination)this.destination);
        producer.setDeliveryMode(this.deliveryMode);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.destination, "test", "color='red'", false);
        TextMessage message = session.createTextMessage("1st");
        message.setStringProperty("color", "red");
        producer.send((Message)message);
        Message m = consumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)m);
        JMSConsumerTest.assertEquals((String)"1st", (String)((TextMessage)m).getText());
        consumer.close();
        consumer = session.createDurableSubscriber((Topic)this.destination, "test", "color='blue'", false);
        message = session.createTextMessage("2nd");
        message.setStringProperty("color", "red");
        producer.send((Message)message);
        message = session.createTextMessage("3rd");
        message.setStringProperty("color", "blue");
        producer.send((Message)message);
        m = consumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)m);
        JMSConsumerTest.assertEquals((String)"3rd", (String)((TextMessage)m).getText());
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void initCombosForTestSendReceiveBytesMessage() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testSendReceiveBytesMessage() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        MessageProducer producer = session.createProducer((Destination)this.destination);
        BytesMessage message = session.createBytesMessage();
        message.writeBoolean(true);
        message.writeBoolean(false);
        producer.send((Message)message);
        BytesMessage m = (BytesMessage)consumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)m);
        JMSConsumerTest.assertTrue((boolean)m.readBoolean());
        JMSConsumerTest.assertFalse((boolean)m.readBoolean());
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void initCombosForTestSetMessageListenerAfterStart() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testSetMessageListenerAfterStart() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch done = new CountDownLatch(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 4);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                counter.incrementAndGet();
                if (counter.get() == 4) {
                    done.countDown();
                }
            }
        });
        JMSConsumerTest.assertTrue((boolean)done.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        JMSConsumerTest.assertEquals((int)4, (int)counter.get());
    }

    public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2});
    }

    public void testPassMessageListenerIntoCreateConsumer() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch done = new CountDownLatch(1);
        this.connection.start();
        ActiveMQSession session = (ActiveMQSession)this.connection.createSession(false, 1);
        this.destination = this.createDestination((Session)session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination, new MessageListener(){

            public void onMessage(Message m) {
                counter.incrementAndGet();
                if (counter.get() == 4) {
                    done.countDown();
                }
            }
        });
        JMSConsumerTest.assertNotNull((Object)consumer);
        this.sendMessages((Session)session, (Destination)this.destination, 4);
        JMSConsumerTest.assertTrue((boolean)done.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        JMSConsumerTest.assertEquals((int)4, (int)counter.get());
    }

    public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("ackMode", new Object[]{2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1});
    }

    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch sendDone = new CountDownLatch(1);
        final CountDownLatch got2Done = new CountDownLatch(1);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.setOptimizedMessageDispatch(false);
        this.connection.start();
        Session session = this.connection.createSession(false, this.ackMode);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                try {
                    TextMessage tm = (TextMessage)m;
                    LOG.info("Got in first listener: " + tm.getText());
                    TestCase.assertEquals((String)("" + counter.get()), (String)tm.getText());
                    counter.incrementAndGet();
                    if (counter.get() == 2) {
                        sendDone.await();
                        JMSConsumerTest.this.connection.close();
                        got2Done.countDown();
                    }
                    tm.acknowledge();
                }
                catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        });
        this.sendMessages(session, (Destination)this.destination, 4);
        sendDone.countDown();
        JMSConsumerTest.assertTrue((boolean)got2Done.await(100000L, TimeUnit.MILLISECONDS));
        this.connection = (ActiveMQConnection)this.factory.createConnection();
        this.connections.add(this.connection);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        final CountDownLatch done2 = new CountDownLatch(1);
        session = this.connection.createSession(false, this.ackMode);
        consumer = session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                try {
                    TextMessage tm = (TextMessage)m;
                    LOG.info("Got in second listener: " + tm.getText());
                    counter.incrementAndGet();
                    if (counter.get() == 4) {
                        done2.countDown();
                    }
                }
                catch (Throwable e) {
                    LOG.error("unexpected ex onMessage: ", e);
                }
            }
        });
        JMSConsumerTest.assertTrue((boolean)done2.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        JMSConsumerTest.assertEquals((int)5, (int)counter.get());
    }

    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("ackMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1});
    }

    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch sendDone = new CountDownLatch(1);
        final CountDownLatch got2Done = new CountDownLatch(1);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.setOptimizedMessageDispatch(false);
        this.connection.start();
        Session session = this.connection.createSession(false, this.ackMode);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                try {
                    TextMessage tm = (TextMessage)m;
                    LOG.info("Got in first listener: " + tm.getText());
                    TestCase.assertEquals((String)("" + counter.get()), (String)tm.getText());
                    counter.incrementAndGet();
                    m.acknowledge();
                    if (counter.get() == 2) {
                        sendDone.await();
                        JMSConsumerTest.this.connection.close();
                        got2Done.countDown();
                    }
                }
                catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        });
        this.sendMessages(session, (Destination)this.destination, 4);
        sendDone.countDown();
        JMSConsumerTest.assertTrue((boolean)got2Done.await(100000L, TimeUnit.MILLISECONDS));
        this.connection = (ActiveMQConnection)this.factory.createConnection();
        this.connections.add(this.connection);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        final CountDownLatch done2 = new CountDownLatch(1);
        session = this.connection.createSession(false, this.ackMode);
        consumer = session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                try {
                    TextMessage tm = (TextMessage)m;
                    LOG.info("Got in second listener: " + tm.getText());
                    counter.incrementAndGet();
                    if (counter.get() == 4) {
                        done2.countDown();
                    }
                }
                catch (Throwable e) {
                    LOG.error("unexpected ex onMessage: ", e);
                }
            }
        });
        JMSConsumerTest.assertTrue((boolean)done2.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        JMSConsumerTest.assertEquals((int)4, (int)counter.get());
    }

    public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch done = new CountDownLatch(1);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                counter.incrementAndGet();
                if (counter.get() == 4) {
                    done.countDown();
                }
            }
        });
        this.sendMessages(session, (Destination)this.destination, 4);
        JMSConsumerTest.assertTrue((boolean)done.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        JMSConsumerTest.assertEquals((int)4, (int)counter.get());
    }

    public void initCombosForTestMessageListenerWithConsumer() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testMessageListenerWithConsumer() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final CountDownLatch done = new CountDownLatch(1);
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message m) {
                counter.incrementAndGet();
                if (counter.get() == 4) {
                    done.countDown();
                }
            }
        });
        this.sendMessages(session, (Destination)this.destination, 4);
        JMSConsumerTest.assertTrue((boolean)done.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        JMSConsumerTest.assertEquals((int)4, (int)counter.get());
    }

    public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("ackMode", new Object[]{1, 3, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1});
    }

    public void testUnackedWithPrefetch1StayInQueue() throws Exception {
        int i;
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session session = this.connection.createSession(false, this.ackMode);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 4);
        Message message = null;
        for (i = 0; i < 2; ++i) {
            message = consumer.receive(1000L);
            JMSConsumerTest.assertNotNull((Object)message);
        }
        message.acknowledge();
        this.connection.close();
        this.connection = (ActiveMQConnection)this.factory.createConnection();
        this.connections.add(this.connection);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        session = this.connection.createSession(false, this.ackMode);
        consumer = session.createConsumer((Destination)this.destination);
        for (i = 0; i < 2; ++i) {
            message = consumer.receive(1000L);
            JMSConsumerTest.assertNotNull((Object)message);
        }
        message.acknowledge();
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void initCombosForTestPrefetch1MessageNotDispatched() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
    }

    public void testPrefetch1MessageNotDispatched() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session session = this.connection.createSession(true, 0);
        this.destination = new ActiveMQQueue("TEST");
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 2);
        session.commit();
        ActiveMQConnection connection2 = (ActiveMQConnection)this.factory.createConnection();
        connection2.start();
        this.connections.add(connection2);
        Session session2 = connection2.createSession(true, 0);
        MessageConsumer consumer2 = session2.createConsumer((Destination)this.destination);
        Message message1 = consumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)message1);
        Message message2 = consumer2.receive(5000L);
        JMSConsumerTest.assertNotNull((Object)message2);
        session.commit();
        session2.commit();
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void initCombosForTestDontStart() {
        this.addCombinationValues("deliveryMode", new Object[]{1});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2});
    }

    public void testDontStart() throws Exception {
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 1);
        JMSConsumerTest.assertNull((Object)consumer.receive(1000L));
    }

    public void initCombosForTestStartAfterSend() {
        this.addCombinationValues("deliveryMode", new Object[]{1});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2});
    }

    public void testStartAfterSend() throws Exception {
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 1);
        this.connection.start();
        JMSConsumerTest.assertNotNull((Object)consumer.receive(1000L));
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void initCombosForTestReceiveMessageWithConsumer() {
        this.addCombinationValues("deliveryMode", new Object[]{1, 2});
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2, (byte)5, (byte)6});
    }

    public void testReceiveMessageWithConsumer() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = this.createDestination(session, this.destinationType);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 1);
        Message m = consumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)m);
        JMSConsumerTest.assertEquals((String)"0", (String)((TextMessage)m).getText());
        JMSConsumerTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void testDupsOkConsumer() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(false, 3);
        this.destination = this.createDestination(session, (byte)1);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.sendMessages(session, (Destination)this.destination, 4);
        for (int i = 0; i < 4; ++i) {
            Message m = consumer.receive(1000L);
            JMSConsumerTest.assertNotNull((Object)m);
        }
        JMSConsumerTest.assertNull((Object)consumer.receive(500L));
        consumer.close();
        consumer = session.createConsumer((Destination)this.destination);
        JMSConsumerTest.assertNull((Object)consumer.receive(500L));
    }

    public void testRedispatchOfUncommittedTx() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(true, 0);
        this.destination = this.createDestination(session, (byte)1);
        this.sendMessages((Connection)this.connection, (Destination)this.destination, 2);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        JMSConsumerTest.assertNotNull((Object)consumer.receive(1000L));
        JMSConsumerTest.assertNotNull((Object)consumer.receive(1000L));
        Session redispatchSession = this.connection.createSession(true, 0);
        MessageConsumer redispatchConsumer = redispatchSession.createConsumer((Destination)this.destination);
        session.close();
        Message msg = redispatchConsumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)msg);
        JMSConsumerTest.assertTrue((String)"redelivered flag set", (boolean)msg.getJMSRedelivered());
        JMSConsumerTest.assertEquals((long)2L, (long)msg.getLongProperty("JMSXDeliveryCount"));
        msg = redispatchConsumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)msg);
        JMSConsumerTest.assertTrue((boolean)msg.getJMSRedelivered());
        JMSConsumerTest.assertEquals((long)2L, (long)msg.getLongProperty("JMSXDeliveryCount"));
        redispatchSession.commit();
        JMSConsumerTest.assertNull((Object)redispatchConsumer.receive(500L));
        redispatchSession.close();
    }

    public void testRedispatchOfRolledbackTx() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(true, 0);
        this.destination = this.createDestination(session, (byte)1);
        this.sendMessages((Connection)this.connection, (Destination)this.destination, 2);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        JMSConsumerTest.assertNotNull((Object)consumer.receive(1000L));
        JMSConsumerTest.assertNotNull((Object)consumer.receive(1000L));
        Session redispatchSession = this.connection.createSession(true, 0);
        MessageConsumer redispatchConsumer = redispatchSession.createConsumer((Destination)this.destination);
        session.rollback();
        session.close();
        Message msg = redispatchConsumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)msg);
        JMSConsumerTest.assertTrue((boolean)msg.getJMSRedelivered());
        JMSConsumerTest.assertEquals((long)2L, (long)msg.getLongProperty("JMSXDeliveryCount"));
        msg = redispatchConsumer.receive(1000L);
        JMSConsumerTest.assertNotNull((Object)msg);
        JMSConsumerTest.assertTrue((boolean)msg.getJMSRedelivered());
        JMSConsumerTest.assertEquals((long)2L, (long)msg.getLongProperty("JMSXDeliveryCount"));
        redispatchSession.commit();
        JMSConsumerTest.assertNull((Object)redispatchConsumer.receive(500L));
        redispatchSession.close();
    }

    public void initCombosForTestAckOfExpired() {
        this.addCombinationValues("destinationType", new Object[]{(byte)1, (byte)2});
    }

    public void testAckOfExpired() throws Exception {
        TextMessage message;
        int i;
        ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
        this.connection = fact.createActiveMQConnection();
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        this.destination = (ActiveMQDestination)(this.destinationType == 1 ? session.createQueue("test") : session.createTopic("test"));
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        this.connection.setStatsEnabled(true);
        Session sendSession = this.connection.createSession(false, 1);
        MessageProducer producer = sendSession.createProducer((Destination)this.destination);
        producer.setTimeToLive(500L);
        int count = 4;
        for (i = 0; i < 4; ++i) {
            message = sendSession.createTextMessage("" + i);
            producer.send((Message)message);
        }
        Thread.sleep(1000L);
        producer.setTimeToLive(0L);
        for (i = 0; i < 4; ++i) {
            message = sendSession.createTextMessage("no expiry" + i);
            producer.send((Message)message);
        }
        ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer)consumer;
        for (int i2 = 0; i2 < 4; ++i2) {
            TextMessage msg = (TextMessage)amqConsumer.receive();
            JMSConsumerTest.assertNotNull((Object)msg);
            JMSConsumerTest.assertTrue((String)("message has \"no expiry\" text: " + msg.getText()), (boolean)msg.getText().contains("no expiry"));
            amqConsumer.acknowledge();
        }
        JMSConsumerTest.assertEquals((String)"consumer has expiredMessages", (long)4L, (long)amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
        DestinationViewMBean view = this.createView(this.destination);
        JMSConsumerTest.assertEquals((String)("Wrong inFlightCount: " + view.getInFlightCount()), (long)0L, (long)view.getInFlightCount());
        JMSConsumerTest.assertEquals((String)("Wrong dispatch count: " + view.getDispatchCount()), (long)8L, (long)view.getDispatchCount());
        JMSConsumerTest.assertEquals((String)("Wrong dequeue count: " + view.getDequeueCount()), (long)8L, (long)view.getDequeueCount());
        JMSConsumerTest.assertEquals((String)("Wrong expired count: " + view.getExpiredCount()), (long)4L, (long)view.getExpiredCount());
    }

    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
        String domain = "org.apache.activemq";
        ObjectName name = destination.isQueue() ? new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test") : new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test");
        return (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
    }
}

