package org.apache.activemq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/JmsRollbackRedeliveryTest.class */
public class JmsRollbackRedeliveryTest extends AutoFailTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(JmsRollbackRedeliveryTest.class);
    final int nbMessages = 10;
    final String destinationName = "Destination";
    final String brokerUrl = "vm://localhost?create=false";
    boolean consumerClose = true;
    boolean rollback = true;
    BrokerService broker;

    public void setUp() throws Exception {
        setAutoFail(true);
        super.setUp();
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.broker.start();
    }

    public void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void testRedelivery() throws Exception {
        doTestRedelivery("vm://localhost?create=false", false);
    }

    public void testRedeliveryWithInterleavedProducer() throws Exception {
        doTestRedelivery("vm://localhost?create=false", true);
    }

    public void testRedeliveryWithPrefetch0() throws Exception {
        doTestRedelivery("vm://localhost?create=false?jms.prefetchPolicy.queuePrefetch=0", true);
    }

    public void testRedeliveryWithPrefetch1() throws Exception {
        doTestRedelivery("vm://localhost?create=false?jms.prefetchPolicy.queuePrefetch=1", true);
    }

    public void doTestRedelivery(String str, boolean z) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(str).createConnection();
        createConnection.start();
        if (z) {
            populateDestinationWithInterleavedProducer(10, "Destination", createConnection);
        } else {
            populateDestination(10, "Destination", createConnection);
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        while (atomicInteger.get() < 10) {
            Session createSession = createConnection.createSession(true, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("Destination"));
            TextMessage receive = createConsumer.receive(6000000L);
            if (receive != null) {
                if (receive == null || concurrentHashMap.put(receive.getText(), Boolean.TRUE) == null) {
                    LOG.info("Rollback message " + receive.getText() + " id: " + receive.getJMSMessageID());
                    assertFalse("should not have redelivery flag set, id: " + receive.getJMSMessageID(), receive.getJMSRedelivered());
                    createSession.rollback();
                } else {
                    LOG.info("Received message " + receive.getText() + " (" + atomicInteger.getAndIncrement() + ")" + receive.getJMSMessageID());
                    assertTrue(receive.getJMSRedelivered());
                    assertEquals(2L, receive.getLongProperty("JMSXDeliveryCount"));
                    createSession.commit();
                }
            }
            createConsumer.close();
            createSession.close();
        }
    }

    public void testRedeliveryOnSingleConsumer() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        createConnection.start();
        populateDestinationWithInterleavedProducer(10, "Destination", createConnection);
        AtomicInteger atomicInteger = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Session createSession = createConnection.createSession(true, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("Destination"));
        while (atomicInteger.get() < 10) {
            TextMessage receive = createConsumer.receive(6000000L);
            if (receive != null) {
                if (receive == null || concurrentHashMap.put(receive.getText(), Boolean.TRUE) == null) {
                    LOG.info("Rollback message " + receive.getText() + " id: " + receive.getJMSMessageID());
                    createSession.rollback();
                } else {
                    LOG.info("Received message " + receive.getText() + " (" + atomicInteger.getAndIncrement() + ")" + receive.getJMSMessageID());
                    assertTrue(receive.getJMSRedelivered());
                    createSession.commit();
                }
            }
        }
        createConsumer.close();
        createSession.close();
    }

    public void testRedeliveryOnSingleSession() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        createConnection.start();
        populateDestination(10, "Destination", createConnection);
        AtomicInteger atomicInteger = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue("Destination");
        while (atomicInteger.get() < 10) {
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            TextMessage receive = createConsumer.receive(6000000L);
            if (receive != null) {
                if (receive == null || concurrentHashMap.put(receive.getText(), Boolean.TRUE) == null) {
                    LOG.info("Rollback message " + receive.getText() + " id: " + receive.getJMSMessageID());
                    createSession.rollback();
                } else {
                    LOG.info("Received message " + receive.getText() + " (" + atomicInteger.getAndIncrement() + ")" + receive.getJMSMessageID());
                    assertTrue(receive.getJMSRedelivered());
                    createSession.commit();
                }
            }
            createConsumer.close();
        }
        createSession.close();
    }

    public void testValidateRedeliveryCountOnRollback() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        createConnection.start();
        populateDestination(1, "Destination", createConnection);
        AtomicInteger atomicInteger = new AtomicInteger();
        int maximumRedeliveries = new RedeliveryPolicy().getMaximumRedeliveries();
        while (atomicInteger.get() < maximumRedeliveries) {
            Session createSession = createConnection.createSession(true, 0);
            TextMessage receive = createSession.createConsumer(createSession.createQueue("Destination")).receive(1000L);
            if (receive != null) {
                LOG.info("Received message " + receive.getText() + " (" + atomicInteger.getAndIncrement() + ")" + receive.getJMSMessageID());
                assertEquals("redelivery property matches deliveries", atomicInteger.get(), receive.getLongProperty("JMSXDeliveryCount"));
                createSession.rollback();
            }
            createSession.close();
        }
        consumeMessage(createConnection, maximumRedeliveries + 1);
    }

    public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false?jms.prefetchPolicy.queuePrefetch=0").createConnection();
        createConnection.start();
        populateDestination(1, "Destination", createConnection);
        AtomicInteger atomicInteger = new AtomicInteger();
        int maximumRedeliveries = new RedeliveryPolicy().getMaximumRedeliveries();
        while (atomicInteger.get() < maximumRedeliveries) {
            Session createSession = createConnection.createSession(true, 0);
            TextMessage receive = createSession.createConsumer(createSession.createQueue("Destination")).receive(1000L);
            if (receive != null) {
                LOG.info("Received message " + receive.getText() + " (" + atomicInteger.getAndIncrement() + ")" + receive.getJMSMessageID());
                assertEquals("redelivery property matches deliveries", atomicInteger.get(), receive.getLongProperty("JMSXDeliveryCount"));
                createSession.rollback();
            }
            createSession.close();
        }
        consumeMessage(createConnection, maximumRedeliveries + 1);
    }

    private void consumeMessage(Connection connection, int i) throws JMSException {
        Session createSession = connection.createSession(true, 0);
        TextMessage receive = createSession.createConsumer(createSession.createQueue("Destination")).receive(1000L);
        assertNotNull(receive);
        assertEquals("redelivery property matches deliveries", i, receive.getLongProperty("JMSXDeliveryCount"));
        createSession.commit();
        createSession.close();
    }

    public void testRedeliveryPropertyWithNoRollback() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        populateDestination(1, "Destination", createConnection);
        createConnection.close();
        AtomicInteger atomicInteger = new AtomicInteger();
        int maximumRedeliveries = new RedeliveryPolicy().getMaximumRedeliveries();
        while (atomicInteger.get() < maximumRedeliveries) {
            Connection createConnection2 = activeMQConnectionFactory.createConnection();
            createConnection2.start();
            Session createSession = createConnection2.createSession(true, 0);
            TextMessage receive = createSession.createConsumer(createSession.createQueue("Destination")).receive(2000L);
            if (receive != null) {
                LOG.info("Received message " + receive.getText() + " (" + atomicInteger.getAndIncrement() + ")" + receive.getJMSMessageID());
                assertEquals("redelivery property matches deliveries", atomicInteger.get(), receive.getLongProperty("JMSXDeliveryCount"));
            }
            createSession.close();
            createConnection2.close();
        }
        Connection createConnection3 = activeMQConnectionFactory.createConnection();
        createConnection3.start();
        consumeMessage(createConnection3, maximumRedeliveries + 1);
    }

    private void populateDestination(int i, String str, Connection connection) throws JMSException {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        for (int i2 = 1; i2 <= i; i2++) {
            createProducer.send(createSession.createTextMessage("<hello id='" + i2 + "'/>"));
        }
        createProducer.close();
        createSession.close();
    }

    private void populateDestinationWithInterleavedProducer(int i, String str, Connection connection) throws JMSException {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        Session createSession2 = connection.createSession(false, 1);
        MessageProducer createProducer2 = createSession2.createProducer(createSession2.createQueue(str));
        for (int i2 = 1; i2 <= i; i2++) {
            if (i2 % 2 == 0) {
                createProducer.send(createSession.createTextMessage("<hello id='" + i2 + "'/>"));
            } else {
                createProducer2.send(createSession2.createTextMessage("<hello id='" + i2 + "'/>"));
            }
        }
        createProducer.close();
        createSession.close();
        createProducer2.close();
        createSession2.close();
    }
}
