package org.apache.activemq.test.rollback;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.RedeliveryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.MessageCreator;

/* loaded from: input_file:org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.class */
public class RollbacksWhileConsumingLargeQueueTest extends EmbeddedBrokerTestSupport implements MessageListener {
    private static final transient Logger LOG = LoggerFactory.getLogger(RollbacksWhileConsumingLargeQueueTest.class);
    private Connection connection;
    private CountDownLatch latch;
    private Throwable failure;
    protected int numberOfMessagesOnQueue = 650;
    private AtomicInteger deliveryCounter = new AtomicInteger(0);
    private AtomicInteger ackCounter = new AtomicInteger(0);

    public void testWithReciever() throws Throwable {
        this.latch = new CountDownLatch(this.numberOfMessagesOnQueue);
        Session createSession = this.connection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 1000000) {
            if (getFailure() != null) {
                throw getFailure();
            }
            if (this.ackCounter.get() == this.numberOfMessagesOnQueue) {
                return;
            }
            Message receive = createConsumer.receive(1000L);
            if (receive != null) {
                try {
                    onMessage(receive);
                    createSession.commit();
                } catch (Throwable th) {
                    createSession.rollback();
                }
            }
        }
        fail("Did not receive all the messages.");
    }

    public void testWithMessageListener() throws Throwable {
        this.latch = new CountDownLatch(this.numberOfMessagesOnQueue);
        new DelegatingTransactionalMessageListener(this, this.connection, this.destination);
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 1000000) {
            if (getFailure() != null) {
                throw getFailure();
            }
            if (this.latch.await(1L, TimeUnit.SECONDS)) {
                LOG.debug("Received: " + this.deliveryCounter.get() + "  message(s)");
                return;
            }
        }
        fail("Did not receive all the messages.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public ConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = super.createConnectionFactory();
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        redeliveryPolicy.setRedeliveryDelay(0L);
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        createConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return createConnectionFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public void setUp() throws Exception {
        super.setUp();
        this.connection = createConnection();
        this.connection.start();
        for (int i = 0; i < this.numberOfMessagesOnQueue; i++) {
            this.template.send(createMessageCreator(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        super.tearDown();
    }

    protected MessageCreator createMessageCreator(final int i) {
        return new MessageCreator() { // from class: org.apache.activemq.test.rollback.RollbacksWhileConsumingLargeQueueTest.1
            public Message createMessage(Session session) throws JMSException {
                TextMessage createTextMessage = session.createTextMessage("Message: " + i);
                createTextMessage.setIntProperty("Counter", i);
                return createTextMessage;
            }
        };
    }

    public void onMessage(Message message) {
        String str = null;
        String str2 = null;
        try {
            str = message.getJMSMessageID();
            str2 = ((TextMessage) message).getText();
        } catch (JMSException e) {
            setFailure(e);
        }
        try {
            assertEquals("Message: " + this.ackCounter.get(), str2);
        } catch (Throwable th) {
            setFailure(th);
        }
        int incrementAndGet = this.deliveryCounter.incrementAndGet();
        if (incrementAndGet % 2 == 0) {
            LOG.info("Rolling Back message: " + this.ackCounter.get() + " id: " + str + ", content: " + str2);
            throw new RuntimeException("Dummy exception on message: " + incrementAndGet);
        }
        LOG.info("Received message: " + this.ackCounter.get() + " id: " + str + ", content: " + str2);
        this.ackCounter.incrementAndGet();
        this.latch.countDown();
    }

    public synchronized Throwable getFailure() {
        return this.failure;
    }

    public synchronized void setFailure(Throwable th) {
        this.failure = th;
    }
}
