package org.apache.activemq.bugs;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6133PersistJMSRedeliveryTest.class */
public class AMQ6133PersistJMSRedeliveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6133PersistJMSRedeliveryTest.class);
    private static final String QUEUE_NAME = "test.queue";
    private BrokerService broker;

    @Test
    public void testPersistJMSRedeliveredMessageLossOnIndexRebuild() throws Exception {
        sendMessages();
        LOG.info("#### Finished sending messages, test starting. ####");
        long queueSize = getProxyToQueue(QUEUE_NAME).getQueueSize();
        for (int i = 0; i < 3; i++) {
            LOG.info("Consumer and Rollback iteration: {}", Integer.valueOf(i));
            consumerAndRollback(i);
        }
        TimeUnit.SECONDS.sleep(20L);
        restart();
        Assert.assertEquals(queueSize, getProxyToQueue(QUEUE_NAME).getQueueSize());
        restartWithRecovery(getPersistentDir());
        Assert.assertEquals(queueSize, getProxyToQueue(QUEUE_NAME).getQueueSize());
    }

    @Before
    public void setup() throws Exception {
        LogManager.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
        createBroker(true);
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
    }

    private void restart() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker(false);
    }

    private void restartWithRecovery(File file) throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        Iterator it = FileUtils.listFiles(file, new WildcardFileFilter("db.*"), TrueFileFilter.INSTANCE).iterator();
        while (it.hasNext()) {
            FileUtils.deleteQuietly((File) it.next());
        }
        createBroker(false);
    }

    private void sendMessages() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        Queue createQueue2 = createSession.createQueue("test.queue-retain");
        MessageProducer createProducer = createSession.createProducer((Destination) null);
        createProducer.setDeliveryMode(2);
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[1000]);
        while (getLogFileCount() < 3) {
            createProducer.send(createQueue, createBytesMessage);
        }
        while (getLogFileCount() < 6) {
            createProducer.send(createQueue2, createBytesMessage);
        }
        createConnection.close();
    }

    private void consumerAndRollback(int i) throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 2);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        long queueSize = getProxyToQueue(createQueue.getQueueName()).getQueueSize();
        for (int i2 = 0; i2 < queueSize; i2++) {
            Message receive = createConsumer.receive(50000L);
            Assert.assertNotNull(receive);
            if (i > 0) {
                Assert.assertTrue(receive.getJMSRedelivered());
            }
        }
        createConnection.close();
    }

    private Connection createConnection() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=0");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        return createConnection;
    }

    private void createBroker(boolean z) throws Exception {
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPersistJMSRedelivered(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.setPersistent(true);
        this.broker.setDestinationPolicy(policyMap);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(131072);
        kahaDBPersistenceAdapter.setCleanupInterval(8000L);
        this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.broker.getSystemUsage().getStoreUsage().setLimit(7340032L);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    private int getLogFileCount() throws Exception {
        return new ArrayList(FileUtils.listFiles(getPersistentDir(), new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
    }

    private File getPersistentDir() throws IOException {
        return this.broker.getPersistenceAdapter().getDirectory();
    }

    protected QueueViewMBean getProxyToQueue(String str) throws MalformedObjectNameException, JMSException {
        return (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + str), QueueViewMBean.class, true);
    }
}
