package org.apache.activemq.store.kahadb;

import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.class */
public class KahaDBStoreRecoveryExpiryTest {
    private BrokerService broker;
    private ActiveMQConnection connection;
    private Destination destination = new ActiveMQQueue("Test");
    private Session session;

    @Test
    public void testRestartWitExpired() throws Exception {
        publishMessages(1, 0);
        publishMessages(1, 2000);
        publishMessages(1, 0);
        restartBroker(3000);
        consumeMessages(2);
    }

    @Test
    public void testRestartWitExpiredLargerThanBatchRecovery() throws Exception {
        publishMessages(210, 2000);
        publishMessages(10, 0);
        restartBroker(3000);
        consumeMessages(10);
    }

    private void consumeMessages(int i) throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.destination);
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertNotNull("got message " + i2, createConsumer.receive(4000L));
        }
        Assert.assertNull("none left over", createConsumer.receive(2000L));
    }

    private void restartBroker(int i) throws Exception {
        stopBroker();
        TimeUnit.MILLISECONDS.sleep(i);
        startBroker();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    private void publishMessages(int i, int i2) throws Exception {
        MessageProducer createProducer = this.session.createProducer(this.destination);
        for (int i3 = 0; i3 < i; i3++) {
            createProducer.send(this.session.createTextMessage(), 2, 5, i2);
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.getPersistenceAdapter().setIndexCacheSize(0);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setUseJmx(false);
        this.broker.start();
        this.connection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        this.connection.setWatchTopicAdvisories(false);
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
    }
}
