package org.apache.activemq.bugs;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6131Test.class */
public class AMQ6131Test {
    protected BrokerService broker;
    protected URI brokerConnectURI;

    @Before
    public void startBroker() throws Exception {
        ((Logger) Logger.class.cast(LogManager.getLogger(MessageDatabase.class))).setLevel(Level.TRACE);
        setUpBroker(true);
    }

    protected void setUpBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setDeleteAllMessagesOnStartup(z);
        TransportConnector addConnector = this.broker.addConnector(new TransportConnector());
        addConnector.setUri(new URI("tcp://0.0.0.0:0"));
        addConnector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    protected BrokerService getBroker() {
        return this.broker;
    }

    public File getPersistentDir() throws IOException {
        return getBroker().getPersistenceAdapter().getDirectory();
    }

    @Test(timeout = 300000)
    public void testDurableWithOnePendingAfterRestartAndIndexRecovery() throws Exception {
        final File persistentDir = getPersistentDir();
        this.broker.getBroker().addDestination(this.broker.getAdminConnectionContext(), new ActiveMQTopic("durable.sub"), false);
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("myId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
        final MessageProducer createProducer = createSession.createProducer(new ActiveMQTopic("durable.sub"));
        final int size = new ArrayList(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
        final byte[] bArr = new byte[100000];
        new Random().nextBytes(bArr);
        final AtomicInteger atomicInteger = new AtomicInteger();
        Assert.assertTrue("Should have added a journal file", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6131Test.1
            public boolean isSatisified() throws Exception {
                ActiveMQBytesMessage activeMQBytesMessage = new ActiveMQBytesMessage();
                activeMQBytesMessage.setContent(new ByteSequence(bArr));
                for (int i = 0; i < 100; i++) {
                    createProducer.send(activeMQBytesMessage);
                    atomicInteger.getAndIncrement();
                }
                return new ArrayList(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() > size;
            }
        }));
        for (int i = 0; i < atomicInteger.get() - 1; i++) {
            createDurableSubscriber.receive();
        }
        createDurableSubscriber.close();
        Assert.assertTrue("Subscription should go inactive", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6131Test.2
            public boolean isSatisified() throws Exception {
                return AMQ6131Test.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
            }
        }));
        getBroker().getPersistenceAdapter().checkpoint(true);
        Assert.assertFalse("Should not have garbage collected", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6131Test.3
            public boolean isSatisified() throws Exception {
                return new ArrayList(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() == size;
            }
        }, 5000L, 500L));
        getBroker().stop();
        getBroker().waitUntilStopped();
        Iterator it = FileUtils.listFiles(persistentDir, new WildcardFileFilter("db.*"), TrueFileFilter.INSTANCE).iterator();
        while (it.hasNext()) {
            FileUtils.deleteQuietly((File) it.next());
        }
        stopBroker();
        setUpBroker(false);
        Assert.assertEquals(1L, this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals(0L, this.broker.getAdminView().getDurableTopicSubscribers().length);
        ActiveMQConnection createConnection2 = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection2.setClientID("myId");
        createConnection2.start();
        TopicSubscriber createDurableSubscriber2 = createConnection2.createSession(false, 1).createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
        Assert.assertEquals(0L, this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.broker.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertNotNull(createDurableSubscriber2.receive(5000L));
    }

    @Test(timeout = 300000)
    public void testDurableWithNoMessageAfterRestartAndIndexRecovery() throws Exception {
        final File persistentDir = getPersistentDir();
        this.broker.getBroker().addDestination(this.broker.getAdminConnectionContext(), new ActiveMQTopic("durable.sub"), false);
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("myId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
        final MessageProducer createProducer = createSession.createProducer(new ActiveMQTopic("durable.sub"));
        final int size = new ArrayList(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
        final byte[] bArr = new byte[100000];
        new Random().nextBytes(bArr);
        final AtomicInteger atomicInteger = new AtomicInteger();
        Assert.assertTrue("Should have added a journal file", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6131Test.4
            public boolean isSatisified() throws Exception {
                ActiveMQBytesMessage activeMQBytesMessage = new ActiveMQBytesMessage();
                activeMQBytesMessage.setContent(new ByteSequence(bArr));
                for (int i = 0; i < 100; i++) {
                    createProducer.send(activeMQBytesMessage);
                    atomicInteger.getAndIncrement();
                }
                return new ArrayList(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() > size;
            }
        }));
        for (int i = 0; i < atomicInteger.get(); i++) {
            createDurableSubscriber.receive();
        }
        createDurableSubscriber.close();
        Assert.assertTrue("Subscription should go inactive", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6131Test.5
            public boolean isSatisified() throws Exception {
                return AMQ6131Test.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
            }
        }));
        getBroker().getPersistenceAdapter().checkpoint(true);
        Assert.assertTrue("Should have garbage collected", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6131Test.6
            public boolean isSatisified() throws Exception {
                return new ArrayList(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() == size;
            }
        }));
        getBroker().stop();
        getBroker().waitUntilStopped();
        Iterator it = FileUtils.listFiles(persistentDir, new WildcardFileFilter("db.*"), TrueFileFilter.INSTANCE).iterator();
        while (it.hasNext()) {
            FileUtils.deleteQuietly((File) it.next());
        }
        stopBroker();
        setUpBroker(false);
        Assert.assertEquals(1L, this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals(0L, this.broker.getAdminView().getDurableTopicSubscribers().length);
        ActiveMQConnection createConnection2 = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection2.setClientID("myId");
        createConnection2.start();
        TopicSubscriber createDurableSubscriber2 = createConnection2.createSession(false, 1).createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
        Assert.assertEquals(0L, this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.broker.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertNull(createDurableSubscriber2.receive(500L));
    }
}
