package org.apache.activemq.bugs;

import java.io.File;
import java.io.FilenameFilter;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.leveldb.LevelDBStoreViewMBean;
import org.apache.activemq.usecases.DurableSubDelayedUnsubscribeTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4677Test.class */
public class AMQ4677Test {
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4677Test.class);
    private static BrokerService brokerService;

    @Rule
    public TestName name = new TestName();
    private File dataDirFile;

    @Before
    public void setUp() throws Exception {
        this.dataDirFile = new File("target/LevelDBCleanupTest");
        brokerService = new BrokerService();
        brokerService.setBrokerName("LevelDBBroker");
        brokerService.setPersistent(true);
        brokerService.setUseJmx(true);
        brokerService.setAdvisorySupport(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setDataDirectoryFile(this.dataDirFile);
        LevelDBStore levelDBStore = new LevelDBStore();
        levelDBStore.setDirectory(this.dataDirFile);
        brokerService.setPersistenceAdapter(levelDBStore);
        brokerService.start();
        brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        brokerService.stop();
        brokerService.waitUntilStopped();
    }

    @Test
    public void testSendAndReceiveAllMessages() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://LevelDBBroker").createConnection();
        createConnection.setClientID(getClass().getName());
        createConnection.start();
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(this.name.toString());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(2);
        LevelDBStoreViewMBean levelDBStoreMBean = getLevelDBStoreMBean();
        Assert.assertNotNull(levelDBStoreMBean);
        levelDBStoreMBean.compact();
        final CountDownLatch countDownLatch = new CountDownLatch(DurableSubDelayedUnsubscribeTest.Client.lifetime);
        byte[] bArr = new byte[30720];
        for (int i = 0; i < 30720; i++) {
            bArr[i] = Byte.MIN_VALUE;
        }
        for (int i2 = 0; i2 < 60000; i2++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(bArr);
            createProducer.send(createBytesMessage);
            if (i2 % 1000 == 0) {
                LOG.info("Sent message #{}", Integer.valueOf(i2));
                createSession.commit();
            }
        }
        createSession.commit();
        LOG.info("Finished sending all messages.");
        createSession.createConsumer(createQueue).setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4677Test.1
            public void onMessage(Message message) {
                if (countDownLatch.getCount() % 1000 == 0) {
                    try {
                        AMQ4677Test.LOG.info("Received message #{}", Long.valueOf(60000 - countDownLatch.getCount()));
                        createSession.commit();
                    } catch (JMSException e) {
                    }
                }
                countDownLatch.countDown();
            }
        });
        countDownLatch.await(10L, TimeUnit.MINUTES);
        createSession.commit();
        LOG.info("Finished receiving all messages.");
        LOG.info("Current number of logs {}", Long.valueOf(countLogFiles()));
        Assert.assertTrue("Should only have one log file left.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4677Test.2
            public boolean isSatisified() throws Exception {
                return AMQ4677Test.this.countLogFiles() == 1;
            }
        }, TimeUnit.MINUTES.toMillis(5L)));
        levelDBStoreMBean.compact();
        LOG.info("Current number of logs {}", Long.valueOf(countLogFiles()));
    }

    protected long countLogFiles() {
        return this.dataDirFile.list(new FilenameFilter() { // from class: org.apache.activemq.bugs.AMQ4677Test.3
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str) {
                return str.endsWith("log");
            }
        }).length;
    }

    protected LevelDBStoreViewMBean getLevelDBStoreMBean() throws Exception {
        Set queryNames = brokerService.getManagementContext().queryNames((ObjectName) null, new ObjectName("org.apache.activemq:type=Broker,brokerName=LevelDBBroker,Service=PersistenceAdapter,InstanceName=LevelDB*"));
        if (queryNames.isEmpty() || queryNames.size() > 1) {
            throw new IllegalStateException("Can't find levelDB store name.");
        }
        return (LevelDBStoreViewMBean) brokerService.getManagementContext().newProxyInstance((ObjectName) queryNames.iterator().next(), LevelDBStoreViewMBean.class, true);
    }
}
