package org.apache.activemq.broker.region;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/QueueResendDuringShutdownTest.class */
public class QueueResendDuringShutdownTest {
    public static final int NUM_CONNECTION_TO_TEST = 8;
    private BrokerService broker;
    private ActiveMQConnectionFactory factory;
    private Connection[] connections;
    private Connection producerConnection;
    private Queue queue;
    private Object messageReceiveSync = new Object();
    private int receiveCount;
    private static final Logger LOG = LoggerFactory.getLogger(QueueResendDuringShutdownTest.class);
    private static boolean iterationFoundFailure = false;

    @Before
    public void setUp() throws Exception {
        this.receiveCount = 0;
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        this.queue = new ActiveMQQueue("TESTQUEUE");
        this.connections = new Connection[8];
        for (int i = 0; i < 8; i++) {
            this.connections[i] = this.factory.createConnection();
        }
        this.producerConnection = this.factory.createConnection();
        this.producerConnection.start();
    }

    @After
    public void cleanup() throws Exception {
        for (Connection connection : this.connections) {
            if (connection != null) {
                closeConnection(connection);
            }
        }
        this.connections = null;
        if (this.producerConnection != null) {
            closeConnection(this.producerConnection);
            this.producerConnection = null;
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test(timeout = 3000)
    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter1() throws Throwable {
        runTestIteration();
    }

    @Test(timeout = 3000)
    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter2() throws Throwable {
        runTestIteration();
    }

    @Test(timeout = 3000)
    public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter3() throws Throwable {
        runTestIteration();
    }

    protected void runTestIteration() throws Throwable {
        if (iterationFoundFailure) {
            LOG.info("skipping test iteration; failure previously detected");
            return;
        }
        try {
            testRedeliverAtBrokerShutdownAutoAckMsgListener();
        } catch (Throwable th) {
            iterationFoundFailure = true;
            throw th;
        }
    }

    protected void testRedeliverAtBrokerShutdownAutoAckMsgListener() throws Exception {
        for (Connection connection : this.connections) {
            configureMessageListener(startupConsumer(connection, false, 1));
            connection.start();
        }
        sendMessage();
        waitForMessage(1000L);
        Assert.assertEquals(1L, this.receiveCount);
        this.broker.stop();
        this.broker.waitUntilStopped();
        delay(100L, "give queue time flush");
        Assert.assertEquals(1L, this.receiveCount);
    }

    protected MessageConsumer startupConsumer(Connection connection, boolean z, int i) throws JMSException {
        return connection.createSession(z, i).createConsumer(this.queue);
    }

    protected void messageReceived() {
        synchronized (this) {
            this.receiveCount++;
            synchronized (this.messageReceiveSync) {
                this.messageReceiveSync.notifyAll();
            }
        }
    }

    protected void configureMessageListener(final MessageConsumer messageConsumer) throws JMSException {
        messageConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.region.QueueResendDuringShutdownTest.1
            public void onMessage(Message message) {
                QueueResendDuringShutdownTest.LOG.debug("got a message on consumer {}", messageConsumer);
                QueueResendDuringShutdownTest.this.messageReceived();
                QueueResendDuringShutdownTest.this.delay(3000L, "pause so connection shutdown leads to unacked message redelivery");
            }
        });
    }

    protected void sendMessage() throws JMSException {
        Session createSession = this.producerConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue);
        createProducer.send(createSession.createTextMessage("X-TEST-MSG-X"));
        createProducer.close();
        createSession.close();
    }

    protected void closeConnection(Connection connection) {
        try {
            connection.close();
        } catch (JMSException e) {
            LOG.info("failed to cleanup connection", e);
        }
    }

    protected void delay(long j, String str) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            LOG.warn("sleep interrupted: " + str, e);
        }
    }

    protected void waitForMessage(long j) {
        try {
            synchronized (this.messageReceiveSync) {
                if (this.receiveCount == 0) {
                    this.messageReceiveSync.wait(j);
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("sleep interrupted: wait for message to arrive");
        }
    }
}
