/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsRollbackRedeliveryTest {
    @Rule
    public TestName testName = new TestName();
    protected static final Logger LOG = LoggerFactory.getLogger(JmsRollbackRedeliveryTest.class);
    final int nbMessages = 10;
    final String destinationName = "Destination";
    final String brokerUrl = "vm://localhost?create=false";
    boolean consumerClose = true;
    boolean rollback = true;
    BrokerService broker;

    @Before
    public void setUp() throws Exception {
        LOG.debug("Starting " + this.testName.getMethodName());
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        LOG.debug("Finishing " + this.testName.getMethodName());
        Thread.sleep(100L);
    }

    @Test
    public void testRedelivery() throws Exception {
        this.doTestRedelivery("vm://localhost?create=false", false);
    }

    @Test
    public void testRedeliveryWithInterleavedProducer() throws Exception {
        this.doTestRedelivery("vm://localhost?create=false", true);
    }

    @Test
    public void testRedeliveryWithPrefetch0() throws Exception {
        this.doTestRedelivery("vm://localhost?create=false?jms.prefetchPolicy.queuePrefetch=0", true);
    }

    @Test
    public void testRedeliveryWithPrefetch1() throws Exception {
        this.doTestRedelivery("vm://localhost?create=false?jms.prefetchPolicy.queuePrefetch=1", true);
    }

    public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception {
        LOG.debug("entering doTestRedelivery interleaveProducer is " + interleaveProducer);
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        if (interleaveProducer) {
            this.populateDestinationWithInterleavedProducer(10, "Destination", connection);
        } else {
            this.populateDestination(10, "Destination", connection);
        }
        AtomicInteger received = new AtomicInteger();
        ConcurrentHashMap<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
        while (received.get() < 10) {
            Queue destination;
            Session session = connection.createSession(true, 1);
            MessageConsumer consumer = session.createConsumer((Destination)(destination = session.createQueue("Destination")));
            TextMessage msg = (TextMessage)consumer.receive(6000000L);
            if (msg != null) {
                if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
                    Assert.assertTrue((boolean)msg.getJMSRedelivered());
                    Assert.assertEquals((long)2L, (long)msg.getLongProperty("JMSXDeliveryCount"));
                    session.commit();
                } else {
                    LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
                    Assert.assertFalse((String)("should not have redelivery flag set, id: " + msg.getJMSMessageID()), (boolean)msg.getJMSRedelivered());
                    session.rollback();
                }
            }
            consumer.close();
            session.close();
        }
    }

    @Test
    public void testRedeliveryOnSingleConsumer() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        this.populateDestinationWithInterleavedProducer(10, "Destination", connection);
        AtomicInteger received = new AtomicInteger();
        ConcurrentHashMap<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        while (received.get() < 10) {
            TextMessage msg = (TextMessage)consumer.receive(6000000L);
            if (msg == null) continue;
            if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
                LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
                Assert.assertTrue((boolean)msg.getJMSRedelivered());
                session.commit();
                continue;
            }
            LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
            session.rollback();
        }
        consumer.close();
        session.close();
    }

    @Test
    public void testRedeliveryOnSingleSession() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        this.populateDestination(10, "Destination", connection);
        AtomicInteger received = new AtomicInteger();
        ConcurrentHashMap<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        while (received.get() < 10) {
            MessageConsumer consumer = session.createConsumer((Destination)destination);
            TextMessage msg = (TextMessage)consumer.receive(6000000L);
            if (msg != null) {
                if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
                    LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
                    Assert.assertTrue((boolean)msg.getJMSRedelivered());
                    session.commit();
                } else {
                    LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
                    session.rollback();
                }
            }
            consumer.close();
        }
        session.close();
    }

    @Test
    public void testValidateRedeliveryCountOnRollback() throws Exception {
        boolean numMessages = true;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        this.populateDestination(1, "Destination", connection);
        AtomicInteger received = new AtomicInteger();
        int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
        while (received.get() < maxRetries) {
            Queue destination;
            Session session = connection.createSession(true, 0);
            MessageConsumer consumer = session.createConsumer((Destination)(destination = session.createQueue("Destination")));
            TextMessage msg = (TextMessage)consumer.receive(1000L);
            if (msg != null) {
                LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
                Assert.assertEquals((String)"redelivery property matches deliveries", (long)received.get(), (long)msg.getLongProperty("JMSXDeliveryCount"));
                session.rollback();
            }
            session.close();
        }
        this.consumeMessage(connection, maxRetries + 1);
    }

    @Test
    public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
        boolean numMessages = true;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false?jms.prefetchPolicy.queuePrefetch=0");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        this.populateDestination(1, "Destination", connection);
        AtomicInteger received = new AtomicInteger();
        int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
        while (received.get() < maxRetries) {
            Queue destination;
            Session session = connection.createSession(true, 0);
            MessageConsumer consumer = session.createConsumer((Destination)(destination = session.createQueue("Destination")));
            TextMessage msg = (TextMessage)consumer.receive(1000L);
            if (msg != null) {
                LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
                Assert.assertEquals((String)"redelivery property matches deliveries", (long)received.get(), (long)msg.getLongProperty("JMSXDeliveryCount"));
                session.rollback();
            }
            session.close();
        }
        this.consumeMessage(connection, maxRetries + 1);
    }

    private void consumeMessage(Connection connection, int deliveryCount) throws JMSException {
        Session session = connection.createSession(true, 0);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        TextMessage msg = (TextMessage)consumer.receive(1000L);
        Assert.assertNotNull((Object)msg);
        Assert.assertEquals((String)"redelivery property matches deliveries", (long)deliveryCount, (long)msg.getLongProperty("JMSXDeliveryCount"));
        session.commit();
        session.close();
    }

    @Test
    public void testRedeliveryPropertyWithNoRollback() throws Exception {
        boolean numMessages = true;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        this.populateDestination(1, "Destination", connection);
        connection.close();
        AtomicInteger received = new AtomicInteger();
        int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
        while (received.get() < maxRetries) {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, 0);
            Queue destination = session.createQueue("Destination");
            MessageConsumer consumer = session.createConsumer((Destination)destination);
            TextMessage msg = (TextMessage)consumer.receive(2000L);
            if (msg != null) {
                LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
                Assert.assertEquals((String)"redelivery property matches deliveries", (long)received.get(), (long)msg.getLongProperty("JMSXDeliveryCount"));
            }
            session.close();
            connection.close();
        }
        connection = connectionFactory.createConnection();
        connection.start();
        this.consumeMessage(connection, maxRetries + 1);
    }

    private void populateDestination(int nbMessages, String destinationName, Connection connection) throws JMSException {
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue(destinationName);
        MessageProducer producer = session.createProducer((Destination)destination);
        for (int i = 1; i <= nbMessages; ++i) {
            producer.send((Message)session.createTextMessage("<hello id='" + i + "'/>"));
        }
        producer.close();
        session.close();
    }

    private void populateDestinationWithInterleavedProducer(int nbMessages, String destinationName, Connection connection) throws JMSException {
        Session session1 = connection.createSession(false, 1);
        Queue destination1 = session1.createQueue(destinationName);
        MessageProducer producer1 = session1.createProducer((Destination)destination1);
        Session session2 = connection.createSession(false, 1);
        Queue destination2 = session2.createQueue(destinationName);
        MessageProducer producer2 = session2.createProducer((Destination)destination2);
        for (int i = 1; i <= nbMessages; ++i) {
            if (i % 2 == 0) {
                producer1.send((Message)session1.createTextMessage("<hello id='" + i + "'/>"));
                continue;
            }
            producer2.send((Message)session2.createTextMessage("<hello id='" + i + "'/>"));
        }
        producer1.close();
        session1.close();
        producer2.close();
        session2.close();
    }
}

