package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2832Test.class */
public class AMQ2832Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
    BrokerService broker = null;
    private final Destination destination = new ActiveMQQueue("AMQ2832Test");
    final String payload = new String(new byte[1024]);

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2832Test$StagedConsumer.class */
    private class StagedConsumer {
        Connection connection;
        MessageConsumer consumer;

        StagedConsumer() throws Exception {
            this.connection = new ActiveMQConnectionFactory("failover://" + ((TransportConnector) AMQ2832Test.this.broker.getTransportConnectors().get(0)).getConnectUri().toString()).createConnection();
            this.connection.start();
            this.consumer = this.connection.createSession(false, 4).createConsumer(AMQ2832Test.this.destination);
        }

        public Message receive(int i) throws Exception {
            return receive(i, 2);
        }

        public Message receive(int i, int i2) throws Exception {
            Message message = null;
            while (i > 0) {
                do {
                    message = this.consumer.receive(1000L);
                    if (message != null) {
                        break;
                    }
                    i2--;
                } while (i2 > 0);
                if (i > 1) {
                    message.acknowledge();
                }
                if (message != null) {
                    AMQ2832Test.LOG.debug("received: " + message.getJMSMessageID());
                }
                i--;
            }
            return message;
        }

        void close() throws JMSException {
            this.consumer.close();
            this.connection.close();
        }
    }

    protected void startBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(false);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        configurePersistence(this.broker, z);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    protected void configurePersistence(BrokerService brokerService, boolean z) throws Exception {
        KahaDBPersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
        persistenceAdapter.setJournalMaxFileLength(20480);
        persistenceAdapter.setCheckpointInterval(500L);
        persistenceAdapter.setCleanupInterval(500L);
        if (z) {
            return;
        }
        persistenceAdapter.setForceRecoverIndex(true);
    }

    @Test
    public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
        startBroker(true);
        StagedConsumer stagedConsumer = new StagedConsumer();
        int produceMessagesToConsumeMultipleDataFiles = produceMessagesToConsumeMultipleDataFiles(20);
        Message receive = stagedConsumer.receive(10);
        LOG.info("first unacked: " + receive.getJMSMessageID());
        LOG.info("second unacked: " + stagedConsumer.receive(1).getJMSMessageID());
        int produceMessagesToConsumeMultipleDataFiles2 = (produceMessagesToConsumeMultipleDataFiles - 11) + produceMessagesToConsumeMultipleDataFiles(10);
        LOG.info("Acking firstUnacked: " + receive.getJMSMessageID());
        receive.acknowledge();
        stagedConsumer.receive(produceMessagesToConsumeMultipleDataFiles2 + produceMessagesToConsumeMultipleDataFiles(10)).acknowledge();
        stagedConsumer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        startBroker(false);
        StagedConsumer stagedConsumer2 = new StagedConsumer();
        Message receive2 = stagedConsumer2.receive(1, 5);
        Assert.assertNotNull("One messages left after recovery", receive2);
        receive2.acknowledge();
        Message receive3 = stagedConsumer2.receive(1, 5);
        Assert.assertEquals("Only one messages left after recovery: " + receive3, (Object) null, receive3);
        stagedConsumer2.close();
    }

    private int produceMessagesToConsumeMultipleDataFiles(int i) throws Exception {
        int i2 = 0;
        Connection createConnection = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri()).createConnection();
        createConnection.start();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(this.destination);
            for (int i3 = 0; i3 < i; i3++) {
                createProducer.send(createMessage(createSession, i3));
                i2++;
            }
            return i2;
        } finally {
            createConnection.close();
        }
    }

    private Message createMessage(Session session, int i) throws Exception {
        return session.createTextMessage(this.payload + "::" + i);
    }
}
