/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OptimizeAcknowledgeWithExpiredMsgsTest {
    private static final Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
    private BrokerService broker = null;
    private String connectionUri;

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setUseJmx(false);
        this.connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
        return broker;
    }

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    @Test
    public void testOptimizedAckWithExpiredMsgs() throws Exception {
        TextMessage message;
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        final MyMessageListener listener = new MyMessageListener();
        connection.setExceptionListener((ExceptionListener)listener);
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.setDeliveryMode(1);
        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
        for (i = 0; i < 45; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 100L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 10 msec");
        }
        for (i = 0; i < 60; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 60000L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 30 sec");
        }
        consumer.setMessageListener((MessageListener)listener);
        this.sleep(1000);
        connection.start();
        Assert.assertTrue((String)("Should receive all expected messages, counter at " + listener.getCounter()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return listener.getCounter() == 60;
            }
        }));
        LOG.info("Received all expected messages with counter at: " + listener.getCounter());
        producer.close();
        consumer.close();
        session.close();
        connection.close();
    }

    @Test
    public void testOptimizedAckWithExpiredMsgsSync() throws Exception {
        int counter;
        TextMessage message;
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.setDeliveryMode(1);
        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
        for (i = 0; i < 45; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 10L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 10 msec");
        }
        for (i = 0; i < 60; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 30000L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 30 sec");
        }
        this.sleep(200);
        for (counter = 1; counter <= 60; ++counter) {
            Assert.assertNotNull((Object)consumer.receive(2000L));
            LOG.info("counter at " + counter);
        }
        LOG.info("Received all expected messages with counter at: " + counter);
        producer.close();
        consumer.close();
        session.close();
        connection.close();
    }

    @Test
    public void testOptimizedAckWithExpiredMsgsSync2() throws Exception {
        int counter;
        TextMessage message;
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue("TEST.FOO");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.setDeliveryMode(1);
        String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
        for (i = 0; i < 56; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 30000L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 30 sec");
        }
        for (i = 0; i < 44; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 10L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 10 msec");
        }
        for (i = 0; i < 4; ++i) {
            message = session.createTextMessage(text);
            producer.send((Message)message, 1, 1, 30000L);
            LOG.trace("Sent message: " + message.getJMSMessageID() + " with expiry 30 sec");
        }
        this.sleep(200);
        for (counter = 1; counter <= 60; ++counter) {
            Assert.assertNotNull((Object)consumer.receive(2000L));
            LOG.info("counter at " + counter);
        }
        LOG.info("Received all expected messages with counter at: " + counter);
        producer.close();
        consumer.close();
        session.close();
        connection.close();
    }

    private void sleep(int milliSecondTime) {
        try {
            Thread.sleep(milliSecondTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private class MyMessageListener
    implements MessageListener,
    ExceptionListener {
        private AtomicInteger counter = new AtomicInteger(0);

        private MyMessageListener() {
        }

        public void onMessage(Message message) {
            try {
                LOG.trace("Got Message " + message.getJMSMessageID());
                LOG.info("counter at " + this.counter.incrementAndGet());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        public int getCounter() {
            return this.counter.get();
        }

        public synchronized void onException(JMSException ex) {
            LOG.error("JMS Exception occured.  Shutting down client.");
        }
    }
}

