package org.apache.activemq.bugs;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2832Test.class */
public class AMQ2832Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
    private ActiveMQConnectionFactory cf;
    private String connectionUri;
    BrokerService broker = null;
    private final Destination destination = new ActiveMQQueue("AMQ2832Test");
    final String payload = new String(new byte[1024]);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2832Test$StagedConsumer.class */
    public class StagedConsumer {
        Connection connection;
        MessageConsumer consumer;

        StagedConsumer() throws Exception {
            this.connection = new ActiveMQConnectionFactory("failover://" + ((TransportConnector) AMQ2832Test.this.broker.getTransportConnectors().get(0)).getConnectUri().toString()).createConnection();
            this.connection.start();
            this.consumer = this.connection.createSession(false, 4).createConsumer(AMQ2832Test.this.destination);
        }

        public Message receive(int i) throws Exception {
            return receive(i, 2);
        }

        public Message receive(int i, int i2) throws Exception {
            Message message = null;
            while (i > 0) {
                do {
                    message = this.consumer.receive(1000L);
                    if (message != null) {
                        break;
                    }
                    i2--;
                } while (i2 > 0);
                if (i > 1) {
                    message.acknowledge();
                }
                if (message != null) {
                    AMQ2832Test.LOG.debug("received: " + message.getJMSMessageID());
                }
                i--;
            }
            return message;
        }

        void close() throws JMSException {
            this.consumer.close();
            this.connection.close();
        }
    }

    protected void startBroker() throws Exception {
        doStartBroker(true, false);
    }

    protected void restartBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        doStartBroker(false, false);
    }

    protected void recoverBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        doStartBroker(false, true);
    }

    private void doStartBroker(boolean z, boolean z2) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        configurePersistence(this.broker, z2);
        this.connectionUri = "vm://localhost?create=false";
        this.cf = new ActiveMQConnectionFactory(this.connectionUri);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    protected void configurePersistence(BrokerService brokerService, boolean z) throws Exception {
        KahaDBPersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
        persistenceAdapter.setJournalMaxFileLength(20480);
        persistenceAdapter.setCheckpointInterval(5000L);
        persistenceAdapter.setCleanupInterval(5000L);
        if (z) {
            persistenceAdapter.setForceRecoverIndex(true);
        }
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testAckChain() throws Exception {
        startBroker();
        makeAckChain();
        this.broker.stop();
        this.broker.waitUntilStopped();
        recoverBroker();
        StagedConsumer stagedConsumer = new StagedConsumer();
        Message receive = stagedConsumer.receive(1);
        Assert.assertNotNull("One message stays unacked from db-1.log", receive);
        receive.acknowledge();
        Assert.assertNull("There should not be any unconsumed messages any more", stagedConsumer.receive(1));
        stagedConsumer.close();
    }

    private void makeAckChain() throws Exception {
        StagedConsumer stagedConsumer = new StagedConsumer();
        produceMessagesToConsumeMultipleDataFiles(5);
        stagedConsumer.receive(3);
        produceAndConsumeImmediately(20, stagedConsumer);
        stagedConsumer.receive(2).acknowledge();
        stagedConsumer.close();
    }

    @Test
    public void testNoRestartOnMissingAckDataFile() throws Exception {
        startBroker();
        makeAckChain();
        File directory = this.broker.getPersistenceAdapter().getDirectory();
        this.broker.stop();
        this.broker.waitUntilStopped();
        File file = new File(directory, "db-3.log");
        LOG.info("Whacking data file with acks: " + file);
        file.delete();
        try {
            doStartBroker(false, false);
            Assert.fail("Expect failure to start with corrupt journal");
        } catch (IOException e) {
        }
    }

    private void produceAndConsumeImmediately(int i, StagedConsumer stagedConsumer) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            produceMessagesToConsumeMultipleDataFiles(1);
            stagedConsumer.receive(1).acknowledge();
        }
    }

    @Test
    public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
        startBroker();
        StagedConsumer stagedConsumer = new StagedConsumer();
        int produceMessagesToConsumeMultipleDataFiles = produceMessagesToConsumeMultipleDataFiles(20);
        Message receive = stagedConsumer.receive(10);
        LOG.info("first unacked: " + receive.getJMSMessageID());
        LOG.info("second unacked: " + stagedConsumer.receive(1).getJMSMessageID());
        int produceMessagesToConsumeMultipleDataFiles2 = (produceMessagesToConsumeMultipleDataFiles - 11) + produceMessagesToConsumeMultipleDataFiles(10);
        LOG.info("Acking firstUnacked: " + receive.getJMSMessageID());
        receive.acknowledge();
        stagedConsumer.receive(produceMessagesToConsumeMultipleDataFiles2 + produceMessagesToConsumeMultipleDataFiles(10)).acknowledge();
        stagedConsumer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        recoverBroker();
        StagedConsumer stagedConsumer2 = new StagedConsumer();
        Message receive2 = stagedConsumer2.receive(1, 5);
        Assert.assertNotNull("One messages left after recovery", receive2);
        receive2.acknowledge();
        Message receive3 = stagedConsumer2.receive(1, 5);
        Assert.assertEquals("Only one messages left after recovery: " + receive3, (Object) null, receive3);
        stagedConsumer2.close();
    }

    @Test
    public void testAlternateLossScenario() throws Exception {
        startBroker();
        if (this.broker.getPersistenceAdapter() instanceof LevelDBStore) {
            return;
        }
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("MyQueue");
        ActiveMQQueue activeMQQueue2 = new ActiveMQQueue("MyDisposableQueue");
        createInactiveDurableSub(new ActiveMQTopic("MyDurableTopic"));
        Assert.assertEquals(1L, getNumberOfJournalFiles());
        produceMessages(activeMQQueue, 1);
        Assert.assertEquals(1L, getNumberOfJournalFiles());
        produceMessages(activeMQQueue2, 50);
        int numberOfJournalFiles = getNumberOfJournalFiles();
        Assert.assertTrue(numberOfJournalFiles > 1);
        drainQueue(activeMQQueue);
        produceMessages(activeMQQueue2, 50);
        Assert.assertTrue(numberOfJournalFiles < getNumberOfJournalFiles());
        getNumberOfJournalFiles();
        restartBroker();
        this.broker.getAdminView().removeQueue(activeMQQueue2.getQueueName());
        Assert.assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ2832Test.1
            public boolean isSatisified() throws Exception {
                return AMQ2832Test.this.getNumberOfJournalFiles() <= 3;
            }
        }, TimeUnit.MINUTES.toMillis(3L)));
        recoverBroker();
        Assert.assertTrue(drainQueue(activeMQQueue) == 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getNumberOfJournalFiles() throws IOException {
        int i = 0;
        Iterator it = this.broker.getPersistenceAdapter().getStore().getJournal().getFileMap().values().iterator();
        while (it.hasNext()) {
            if (((DataFile) it.next()) != null) {
                i++;
            }
        }
        return i;
    }

    private void createInactiveDurableSub(Topic topic) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("Inactive");
        createConnection.createSession(false, 1).createDurableSubscriber(topic, "Inactive").close();
        createConnection.close();
        produceMessages(topic, 1);
    }

    private int drainQueue(Queue queue) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("Inactive");
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(queue);
        int i = 0;
        while (createConsumer.receive(5000L) != null) {
            i++;
        }
        createConsumer.close();
        createConnection.close();
        return i;
    }

    private int produceMessages(Destination destination, int i) throws Exception {
        int i2 = 0;
        Connection createConnection = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri()).createConnection();
        createConnection.start();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(destination);
            for (int i3 = 0; i3 < i; i3++) {
                createProducer.send(createMessage(createSession, i3));
                i2++;
            }
            return i2;
        } finally {
            createConnection.close();
        }
    }

    private int produceMessagesToConsumeMultipleDataFiles(int i) throws Exception {
        return produceMessages(this.destination, i);
    }

    private Message createMessage(Session session, int i) throws Exception {
        return session.createTextMessage(this.payload + "::" + i);
    }
}
