package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Assert;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.kahadb.page.PageFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriptionOfflineTest.class */
public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
    private BrokerService broker;
    private ActiveMQTopic topic;
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
    public boolean usePrioritySupport = Boolean.TRUE.booleanValue();
    public int journalMaxFileLength = 33554432;
    public boolean keepDurableSubsActive = true;
    private List<Throwable> exceptions = new ArrayList();

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriptionOfflineTest$FilterCheckListener.class */
    public class FilterCheckListener extends Listener {
        public FilterCheckListener() {
        }

        @Override // org.apache.activemq.usecases.DurableSubscriptionOfflineTest.Listener
        public void onMessage(Message message) {
            this.count++;
            try {
                if (message.getObjectProperty("$b") != null) {
                    Assert.assertTrue("", message.getBooleanProperty("$c"));
                } else {
                    String stringProperty = message.getStringProperty("$d");
                    Assert.assertTrue("", "D1".equals(stringProperty) || "D2".equals(stringProperty));
                }
            } catch (JMSException e) {
                e.printStackTrace();
                DurableSubscriptionOfflineTest.this.exceptions.add(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriptionOfflineTest$Listener.class */
    public static class Listener implements MessageListener {
        int count;
        String id;

        Listener() {
            this.count = 0;
            this.id = null;
        }

        Listener(String str) {
            this.count = 0;
            this.id = null;
            this.id = str;
        }

        public void onMessage(Message message) {
            this.count++;
            if (this.id != null) {
                try {
                    DurableSubscriptionOfflineTest.LOG.info(this.id + ", " + message.getJMSMessageID());
                } catch (Exception e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public Connection createConnection() throws Exception {
        return createConnection("cliName");
    }

    protected Connection createConnection(String str) throws Exception {
        Connection createConnection = super.createConnection();
        createConnection.setClientID(str);
        createConnection.start();
        return createConnection;
    }

    public static Test suite() {
        return suite(DurableSubscriptionOfflineTest.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.exceptions.clear();
        this.topic = createDestination();
        createBroker();
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
        destroyBroker();
    }

    private void createBroker() throws Exception {
        createBroker(true);
    }

    private void createBroker(boolean z) throws Exception {
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
        this.broker.setBrokerName(getName(true));
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.getManagementContext().setCreateConnector(false);
        this.broker.setAdvisorySupport(false);
        this.broker.setKeepDurableSubsActive(this.keepDurableSubsActive);
        this.broker.addConnector("tcp://0.0.0.0:0");
        if (this.usePrioritySupport) {
            PolicyEntry policyEntry = new PolicyEntry();
            policyEntry.setPrioritizedMessages(true);
            PolicyMap policyMap = new PolicyMap();
            policyMap.setDefaultEntry(policyEntry);
            this.broker.setDestinationPolicy(policyMap);
        }
        setDefaultPersistenceAdapter(this.broker);
        if (this.broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
            this.broker.getPersistenceAdapter().setCleanupPeriod(2000);
        } else if (this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
            this.broker.getPersistenceAdapter().setJournalMaxFileLength(this.journalMaxFileLength);
        }
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
        addCombinationValues("usePrioritySupport", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testConsumeOnlyMatchedMessages() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = i2 % 2 == 1;
            if (z) {
                i++;
            }
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", z ? "true" : "false");
            createProducer.send(this.topic, createMessage);
        }
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Thread.sleep(3000L);
        createSession3.close();
        createConnection3.close();
        assertEquals(i, listener.count);
    }

    public void testConsumeAllMatchedMessages() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Thread.sleep(3000L);
        createSession3.close();
        createConnection3.close();
        assertEquals(i, listener.count);
    }

    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
        addCombinationValues("usePrioritySupport", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testVerifyAllConsumedAreAcked() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Thread.sleep(3000L);
        createSession3.close();
        createConnection3.close();
        LOG.info("Consumed: " + listener.count);
        assertEquals(i, listener.count);
        Connection createConnection4 = createConnection();
        Session createSession4 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber2 = createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        createDurableSubscriber2.setMessageListener(listener2);
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        assertEquals(0, listener2.count);
    }

    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("cliId2");
        Session createSession2 = createConnection2.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession3.close();
        createConnection3.close();
        Thread.sleep(3000L);
        createSession2.close();
        createConnection2.close();
        assertEquals(i, listener.count);
        Connection createConnection4 = createConnection("cliId1");
        Session createSession4 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber2 = createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        createDurableSubscriber2.setMessageListener(listener2);
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        assertEquals("offline consumer got all", i, listener2.count);
    }

    public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
        addCombinationValues("keepDurableSubsActive", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testJMXCountersWithOfflineSubs() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        createSession.close();
        createConnection.close();
        this.broker.stop();
        createBroker(false);
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            createProducer.send(this.topic, createSession2.createMessage());
        }
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection("cliId1");
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        for (int i3 = 0; i3 < i / 2; i3++) {
            Message receive = createDurableSubscriber.receive(4000L);
            assertNotNull("got message: " + i3, receive);
            LOG.info("Got :" + i3 + ", " + receive);
        }
        ObjectName objectName = this.broker.getAdminView().getDurableTopicSubscribers()[0];
        LOG.info("active durable sub name: " + objectName);
        final DurableSubscriptionViewMBean durableSubscriptionViewMBean = (DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance(objectName, DurableSubscriptionViewMBean.class, true);
        assertTrue("is active", durableSubscriptionViewMBean.isActive());
        assertEquals("all enqueued", this.keepDurableSubsActive ? 10L : 0L, durableSubscriptionViewMBean.getEnqueueCounter());
        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return 5 == durableSubscriptionViewMBean.getMessageCountAwaitingAcknowledge();
            }
        }));
        assertEquals("correct dequeue", 5L, durableSubscriptionViewMBean.getDequeueCounter());
        TopicViewMBean topicViewMBean = (TopicViewMBean) this.broker.getManagementContext().newProxyInstance(this.broker.getAdminView().getTopics()[0], TopicViewMBean.class, true);
        assertEquals("correct enqueue", 10L, topicViewMBean.getEnqueueCount());
        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0L, topicViewMBean.getDequeueCount());
        assertEquals("inflight", 5L, topicViewMBean.getInFlightCount());
        createSession3.close();
        createConnection3.close();
        ObjectName objectName2 = this.broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
        LOG.info("inactive durable sub name: " + objectName2);
        DurableSubscriptionViewMBean durableSubscriptionViewMBean2 = (DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance(objectName2, DurableSubscriptionViewMBean.class, true);
        assertTrue("is not active", !durableSubscriptionViewMBean2.isActive());
        assertEquals("all enqueued", this.keepDurableSubsActive ? 10L : 0L, durableSubscriptionViewMBean2.getEnqueueCounter());
        assertEquals("correct awaiting ack", 0, durableSubscriptionViewMBean2.getMessageCountAwaitingAcknowledge());
        assertEquals("correct dequeue", this.keepDurableSubsActive ? 5L : 0L, durableSubscriptionViewMBean2.getDequeueCounter());
        assertEquals("correct enqueue", 10L, topicViewMBean.getEnqueueCount());
        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0L, topicViewMBean.getDequeueCount());
        assertEquals("inflight back to 0 after deactivate", 0L, topicViewMBean.getInFlightCount());
        TopicSubscriber createDurableSubscriber2 = createConnection("cliId1").createSession(false, 1).createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        for (int i4 = 0; i4 < i / 2; i4++) {
            Message receive2 = createDurableSubscriber2.receive(Wait.MAX_WAIT_MILLIS);
            assertNotNull("got message: " + i4, receive2);
            LOG.info("Got :" + i4 + ", " + receive2);
        }
        ObjectName objectName3 = this.broker.getAdminView().getDurableTopicSubscribers()[0];
        LOG.info("durable sub name: " + objectName3);
        final DurableSubscriptionViewMBean durableSubscriptionViewMBean3 = (DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance(objectName3, DurableSubscriptionViewMBean.class, true);
        assertTrue("is active", durableSubscriptionViewMBean3.isActive());
        assertEquals("all enqueued", this.keepDurableSubsActive ? 10L : 0L, durableSubscriptionViewMBean3.getEnqueueCounter());
        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                long dequeueCounter = durableSubscriptionViewMBean3.getDequeueCounter();
                DurableSubscriptionOfflineTest.LOG.info("dequeue count:" + dequeueCounter);
                return 10 == dequeueCounter;
            }
        }));
    }

    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
        addCombinationValues("usePrioritySupport", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("offCli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection("onlineCli1");
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Connection createConnection4 = createConnection();
        Session createSession4 = createConnection4.createSession(false, 1);
        MessageProducer createProducer = createSession4.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession4.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession4.close();
        createConnection4.close();
        Thread.sleep(3000L);
        createSession3.close();
        createConnection3.close();
        assertEquals(i, listener.count);
        this.broker.stop();
        createBroker(false);
        Connection createConnection5 = createConnection("offCli1");
        Session createSession5 = createConnection5.createSession(false, 1);
        TopicSubscriber createDurableSubscriber2 = createSession5.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Connection createConnection6 = createConnection("offCli2");
        Session createSession6 = createConnection6.createSession(false, 1);
        TopicSubscriber createDurableSubscriber3 = createSession6.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        createDurableSubscriber2.setMessageListener(listener2);
        Listener listener3 = new Listener();
        createDurableSubscriber3.setMessageListener(listener3);
        Thread.sleep(3000L);
        createSession5.close();
        createConnection5.close();
        createSession6.close();
        createConnection6.close();
        assertEquals(i, listener2.count);
        assertEquals(i, listener3.count);
    }

    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        Connection createConnection3 = createConnection("cliId2");
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        assertEquals(0, listener.count);
        createSession3.close();
        createConnection3.close();
        for (int i3 = 0; i3 < 10; i3++) {
            i++;
            Message createMessage2 = createSession2.createMessage();
            createMessage2.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage2);
        }
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        TopicSubscriber createDurableSubscriber2 = createConnection("cliId2").createSession(false, 1).createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener("cliId2");
        createDurableSubscriber2.setMessageListener(listener2);
        Thread.sleep(3000L);
        assertEquals(10, listener2.count);
        Connection createConnection4 = createConnection("cliId1");
        Session createSession4 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber3 = createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener3 = new Listener("cliId1");
        createDurableSubscriber3.setMessageListener(listener3);
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        assertEquals("offline consumer got all", i, listener3.count);
    }

    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", filter, true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("offCli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", filter, true);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection("onlineCli1");
        Session createSession3 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(this.topic, "SubsId", filter, true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Connection createConnection4 = createConnection("nondurableCli");
        Session createSession4 = createConnection4.createSession(false, 1);
        MessageConsumer createConsumer = createSession4.createConsumer(this.topic, filter, true);
        Listener listener2 = new Listener();
        createConsumer.setMessageListener(listener2);
        Connection createConnection5 = createConnection();
        Session createSession5 = createConnection5.createSession(false, 1);
        MessageProducer createProducer = createSession5.createProducer((Destination) null);
        boolean z = false;
        int i = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            String str = "D" + (((int) (Math.random() * 9.0d)) + 1);
            if ("D1".equals(str) || "D2".equals(str)) {
                z = true;
                i++;
            }
            Message createMessage = createSession5.createMessage();
            createMessage.setStringProperty("$a", "A1");
            createMessage.setStringProperty("$d", str);
            createProducer.send(this.topic, createMessage);
        }
        Message createMessage2 = createSession5.createMessage();
        createMessage2.setStringProperty("$a", "A1");
        createMessage2.setBooleanProperty("$b", true);
        createMessage2.setBooleanProperty("$c", z);
        createProducer.send(this.topic, createMessage2);
        if (z) {
            i++;
        }
        Thread.sleep(1000L);
        createSession5.close();
        createConnection5.close();
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        assertEquals(i, listener2.count);
        createSession3.close();
        createConnection3.close();
        assertEquals(i, listener.count);
        Connection createConnection6 = createConnection("offCli1");
        Session createSession6 = createConnection6.createSession(false, 1);
        TopicSubscriber createDurableSubscriber2 = createSession6.createDurableSubscriber(this.topic, "SubsId", filter, true);
        FilterCheckListener filterCheckListener = new FilterCheckListener();
        createDurableSubscriber2.setMessageListener(filterCheckListener);
        Thread.sleep(3000L);
        createSession6.close();
        createConnection6.close();
        assertEquals(i, filterCheckListener.count);
        Connection createConnection7 = createConnection("offCli2");
        Session createSession7 = createConnection7.createSession(false, 1);
        TopicSubscriber createDurableSubscriber3 = createSession7.createDurableSubscriber(this.topic, "SubsId", filter, true);
        FilterCheckListener filterCheckListener2 = new FilterCheckListener();
        createDurableSubscriber3.setMessageListener(filterCheckListener2);
        Thread.sleep(3000L);
        createSession7.close();
        createConnection7.close();
        assertEquals(i, filterCheckListener2.count);
        assertTrue("no unexpected exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    public void testRemovedDurableSubDeletes() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Session createSession2 = createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        Connection createConnection2 = createConnection("cliId1");
        Session createSession3 = createConnection2.createSession(false, 1);
        createSession3.unsubscribe("SubsId");
        createSession3.close();
        createConnection2.close();
        this.topic = new ActiveMQTopic(this.topic.getPhysicalName() + "?consumer.retroactive=true");
        Connection createConnection3 = createConnection("offCli2");
        Session createSession4 = createConnection3.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession4.createDurableSubscriber(this.topic, "SubsId", filter, true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        createSession4.close();
        createConnection3.close();
        assertEquals(0, listener.count);
    }

    public void testRemovedDurableSubDeletesFromIndex() throws Exception {
        if (this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
            PageFile pageFile = this.broker.getPersistenceAdapter().getStore().getPageFile();
            LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
            long j = 0;
            for (int i = 0; i < 2; i++) {
                LOG.info("Iteration: " + i + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
                Connection createConnection = createConnection("cliId1-" + i);
                Session createSession = createConnection.createSession(false, 1);
                createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
                createSession.close();
                createConnection.close();
                Connection createConnection2 = createConnection();
                Session createSession2 = createConnection2.createSession(false, 1);
                MessageProducer createProducer = createSession2.createProducer((Destination) null);
                for (int i2 = 0; i2 < 2750; i2++) {
                    Message createMessage = createSession2.createMessage();
                    createMessage.setStringProperty("filter", "true");
                    createProducer.send(this.topic, createMessage);
                }
                createConnection2.close();
                Connection createConnection3 = createConnection("cliId1-" + i);
                Session createSession3 = createConnection3.createSession(false, 1);
                createSession3.unsubscribe("SubsId");
                createSession3.close();
                createConnection3.close();
                LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
                if (j != 0) {
                    assertEquals("Only use X pages per iteration: " + i, j, pageFile.getPageCount() - pageFile.getFreePageCount());
                }
                j = pageFile.getPageCount() - pageFile.getFreePageCount();
            }
        }
    }

    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("offCli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = ((int) (Math.random() * 2.0d)) >= 1;
            if (z) {
                i++;
            }
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", z ? "true" : "false");
            createProducer.send(this.topic, createMessage);
        }
        LOG.info("sent: " + i);
        Thread.sleep(1000L);
        createSession3.close();
        createConnection3.close();
        Thread.sleep(3000L);
        this.broker.stop();
        createBroker(false);
        Connection createConnection4 = createConnection();
        Session createSession4 = createConnection4.createSession(false, 1);
        MessageProducer createProducer2 = createSession4.createProducer((Destination) null);
        for (int i3 = 0; i3 < 10; i3++) {
            boolean z2 = ((int) (Math.random() * 2.0d)) >= 1;
            if (z2) {
                i++;
            }
            Message createMessage2 = createSession4.createMessage();
            createMessage2.setStringProperty("filter", z2 ? "true" : "false");
            createProducer2.send(this.topic, createMessage2);
        }
        LOG.info("after restart, total sent with filter='true': " + i);
        Thread.sleep(1000L);
        createSession4.close();
        createConnection4.close();
        Connection createConnection5 = createConnection("offCli1");
        Session createSession5 = createConnection5.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession5.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener("1>");
        createDurableSubscriber.setMessageListener(listener);
        Connection createConnection6 = createConnection("offCli2");
        Session createSession6 = createConnection6.createSession(false, 1);
        TopicSubscriber createDurableSubscriber2 = createSession6.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        createDurableSubscriber2.setMessageListener(listener2);
        Thread.sleep(3000L);
        createSession5.close();
        createConnection5.close();
        createSession6.close();
        createConnection6.close();
        assertEquals(i, listener.count);
        assertEquals(i, listener2.count);
    }

    public void initCombosForTestOfflineAfterRestart() throws Exception {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testOfflineSubscriptionAfterRestart() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, false);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        MessageProducer createProducer = createSession.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession.createMessage();
            createMessage.setStringProperty("filter", "false");
            createProducer.send(this.topic, createMessage);
        }
        LOG.info("sent: " + i);
        Thread.sleep(5000L);
        createSession.close();
        createConnection.close();
        assertEquals(i, listener.count);
        Thread.sleep(3000L);
        this.broker.stop();
        createBroker(false);
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer2 = createSession2.createProducer((Destination) null);
        for (int i3 = 0; i3 < 10; i3++) {
            i++;
            Message createMessage2 = createSession2.createMessage();
            createMessage2.setStringProperty("filter", "false");
            createProducer2.send(this.topic, createMessage2);
        }
        LOG.info("after restart, sent: " + i);
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection("offCli1");
        Session createSession3 = createConnection3.createSession(false, 1);
        createSession3.createDurableSubscriber(this.topic, "SubsId", (String) null, true).setMessageListener(listener);
        Thread.sleep(3000L);
        createSession3.close();
        createConnection3.close();
        assertEquals(i, listener.count);
    }

    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("offCli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        createSession2.close();
        createConnection2.close();
        Session createSession3 = createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = ((int) (Math.random() * 2.0d)) >= 1;
            i++;
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", z ? "true" : "false");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        Connection createConnection3 = createConnection("offCli1");
        Session createSession4 = createConnection3.createSession(false, 1);
        createSession4.unsubscribe("SubsId");
        createSession4.close();
        createConnection3.close();
        Connection createConnection4 = createConnection("offCli2");
        Session createSession5 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession5.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        Listener listener = new Listener("SubsId");
        createDurableSubscriber.setMessageListener(listener);
        Thread.sleep(3000L);
        createSession5.close();
        createConnection4.close();
        assertEquals("offline consumer got all", i, listener.count);
    }

    public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
        for (int i = 0; i <= 10; i++) {
            Connection createConnection = createConnection("cli" + i);
            Session createSession = createConnection.createSession(false, 1);
            createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
            createSession.close();
            createConnection.close();
        }
        String str = new String(new byte[1000]);
        Connection createConnection2 = createConnection();
        final Session createSession2 = createConnection2.createSession(true, 0);
        MessageProducer createProducer = createSession2.createProducer(this.topic);
        for (int i2 = 0; i2 < 1000; i2++) {
            createProducer.send(createSession2.createTextMessage(str));
        }
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    createSession2.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        });
        for (int i3 = 0; i3 < 10; i3++) {
            newCachedThreadPool.execute(new Runnable(i3) { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1CheckForDupsClient
                HashSet<Long> ids = new HashSet<>();
                final int id;

                {
                    this.id = i3;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Connection createConnection3 = DurableSubscriptionOfflineTest.this.createConnection("cli" + this.id);
                        Session createSession3 = createConnection3.createSession(false, 1);
                        for (int i4 = 0; i4 < 2; i4++) {
                            TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", (String) null, true);
                            for (int i5 = 0; i5 < 500; i5++) {
                                Message receive = createDurableSubscriber.receive(4000L);
                                Assert.assertNotNull(receive);
                                long producerSequenceId = new MessageId(receive.getJMSMessageID()).getProducerSequenceId();
                                Assert.assertTrue("ID=" + this.id + " not a duplicate: " + producerSequenceId, this.ids.add(Long.valueOf(producerSequenceId)));
                            }
                            createDurableSubscriber.close();
                        }
                        Message receive2 = createSession3.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", (String) null, true).receive(4000L);
                        if (receive2 != null) {
                            long producerSequenceId2 = new MessageId(receive2.getJMSMessageID()).getProducerSequenceId();
                            Assert.assertTrue("ID=" + this.id + " not a duplicate: " + producerSequenceId2, this.ids.add(Long.valueOf(producerSequenceId2)));
                        }
                        Assert.assertNull(receive2);
                        createSession3.close();
                        createConnection3.close();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(th);
                    }
                }
            });
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        createConnection2.close();
        assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    public void testOrderOnActivateDeactivate() throws Exception {
        for (int i = 0; i < 10; i++) {
            LOG.info("Iteration: " + i);
            doTestOrderOnActivateDeactivate();
            this.broker.stop();
            createBroker(true);
        }
    }

    public void doTestOrderOnActivateDeactivate() throws Exception {
        Connection connection = null;
        for (int i = 0; i <= 4; i++) {
            connection = createConnection("cli" + i);
            Session createSession = connection.createSession(false, 1);
            createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
            createSession.close();
            connection.close();
        }
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + ((TransportConnector) this.broker.getTransportConnectors().get(1)).getConnectUri().getPort() + "?wireFormat.maxInactivityDuration=0)?jms.watchTopicAdvisories=false&jms.alwaysSyncSend=true&jms.dispatchAsync=true&jms.sendAcksAsync=true&initialReconnectDelay=100&maxReconnectDelay=30000&useExponentialBackOff=true");
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.4
            final String payLoad = new String(new byte[DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE]);

            @Override // java.lang.Runnable
            public void run() {
                try {
                    Connection createConnection = DurableSubscriptionOfflineTest.this.createConnection();
                    Session createSession2 = createConnection.createSession(true, 0);
                    MessageProducer createProducer = createSession2.createProducer(DurableSubscriptionOfflineTest.this.topic);
                    for (int i2 = 0; i2 < 1000; i2++) {
                        createProducer.send(createSession2.createTextMessage(this.payLoad));
                    }
                    DurableSubscriptionOfflineTest.LOG.info("About to commit: 1000");
                    createSession2.commit();
                    DurableSubscriptionOfflineTest.LOG.info("committed: 1000");
                    createConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        };
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i2 = 0; i2 < 4; i2++) {
            Runnable runnable2 = new Runnable(i2, activeMQConnectionFactory) { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1CheckOrderClient
                final int id;
                int runCount = 0;
                final /* synthetic */ ActiveMQConnectionFactory val$clientFactory;

                {
                    this.val$clientFactory = activeMQConnectionFactory;
                    this.id = i2;
                }

                @Override // java.lang.Runnable
                public void run() {
                    Message receiveNoWait;
                    try {
                        synchronized (this) {
                            Connection createConnection = this.val$clientFactory.createConnection();
                            createConnection.setClientID("cli" + this.id);
                            createConnection.start();
                            Session createSession2 = createConnection.createSession(false, 2);
                            TopicSubscriber createDurableSubscriber = createSession2.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", (String) null, true);
                            int i3 = 0;
                            this.runCount++;
                            int i4 = 0;
                            while (i4 < 500 && (receiveNoWait = createDurableSubscriber.receiveNoWait()) != null) {
                                i3++;
                                Assert.assertEquals(this.id + " expected order: runCount: " + this.runCount + " id: " + receiveNoWait.getJMSMessageID(), i3, new MessageId(receiveNoWait.getJMSMessageID()).getProducerSequenceId());
                                i4++;
                            }
                            DurableSubscriptionOfflineTest.LOG.info(createConnection.getClientID() + " peeked " + i4);
                            createSession2.close();
                            createConnection.close();
                        }
                    } catch (Throwable th) {
                        th.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(th);
                    }
                }
            };
            for (int i3 = 0; i3 < 100; i3++) {
                newCachedThreadPool.execute(runnable2);
            }
        }
        newCachedThreadPool.execute(runnable);
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        connection.close();
        assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = i2 % 2 == 0;
            if (z) {
                i++;
            }
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", z ? "true" : "false");
            createProducer.send(this.topic, createMessage);
        }
        LOG.info("sent: " + i);
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection("offCli1");
        Session createSession3 = createConnection3.createSession(false, 1);
        createSession3.unsubscribe("SubsId");
        createSession3.close();
        createConnection3.close();
        Connection createConnection4 = createConnection("offCli1");
        Session createSession4 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        assertEquals(0, listener.count);
    }

    public void testAllConsumed() throws Exception {
        Connection createConnection = createConnection("cli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("cli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
            i++;
        }
        LOG.info("sent: " + i);
        Thread.sleep(1000L);
        createSession3.close();
        createConnection3.close();
        Connection createConnection4 = createConnection("cli1");
        Session createSession4 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        assertEquals(i, listener.count);
        LOG.info("cli2 pull 2");
        Connection createConnection5 = createConnection("cli2");
        Session createSession5 = createConnection5.createSession(false, 1);
        TopicSubscriber createDurableSubscriber2 = createSession5.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        assertNotNull("got message", createDurableSubscriber2.receive(2000L));
        assertNotNull("got message", createDurableSubscriber2.receive(2000L));
        createSession5.close();
        createConnection5.close();
        Connection createConnection6 = createConnection();
        Session createSession6 = createConnection6.createSession(false, 1);
        MessageProducer createProducer2 = createSession6.createProducer((Destination) null);
        int i3 = 0;
        int i4 = 0;
        while (i4 < 2) {
            Message createMessage2 = createSession6.createMessage();
            createMessage2.setStringProperty("filter", i4 == 1 ? "true" : "false");
            createProducer2.send(this.topic, createMessage2);
            i3++;
            i4++;
        }
        LOG.info("sent: " + i3);
        Thread.sleep(1000L);
        createSession6.close();
        createConnection6.close();
        LOG.info("cli1 again, should get 1 new ones");
        Connection createConnection7 = createConnection("cli1");
        Session createSession7 = createConnection7.createSession(false, 1);
        TopicSubscriber createDurableSubscriber3 = createSession7.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        createDurableSubscriber3.setMessageListener(listener2);
        Thread.sleep(3000L);
        createSession7.close();
        createConnection7.close();
        assertEquals(1, listener2.count);
    }

    public void testNoMissOnMatchingSubAfterRestart() throws Exception {
        Connection createConnection = createConnection("cli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        Message createMessage = createSession2.createMessage();
        createMessage.setStringProperty("filter", "true");
        createMessage.setIntProperty("ID", 0);
        createProducer.send(this.topic, createMessage);
        int i = 0 + 1;
        for (int i2 = i; i2 < 10; i2++) {
            Message createMessage2 = createSession2.createMessage();
            createMessage2.setStringProperty("filter", "false");
            createMessage2.setIntProperty("ID", i2);
            createProducer.send(this.topic, createMessage2);
            i++;
        }
        createConnection2.close();
        LOG.info("sent: " + i);
        Connection createConnection3 = createConnection("cli2");
        Session createSession3 = createConnection3.createSession(false, 1);
        createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession3.close();
        createConnection3.close();
        destroyBroker();
        createBroker(false);
        Connection createConnection4 = createConnection();
        Session createSession4 = createConnection4.createSession(false, 1);
        MessageProducer createProducer2 = createSession4.createProducer((Destination) null);
        for (int i3 = i; i3 < 30; i3++) {
            Message createMessage3 = createSession4.createMessage();
            createMessage3.setStringProperty("filter", "true");
            createMessage3.setIntProperty("ID", i3);
            createProducer2.send(this.topic, createMessage3);
            i++;
        }
        createConnection4.close();
        LOG.info("sent: " + i);
        Connection createConnection5 = createConnection("cli2");
        Session createSession5 = createConnection5.createSession(false, 1);
        assertEquals("is message 10", 10, createSession5.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).receive(3000L).getIntProperty("ID"));
        createSession5.close();
        createConnection5.close();
        Connection createConnection6 = createConnection("cli1");
        Session createSession6 = createConnection6.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession6.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        assertEquals("is message 0", 0, createDurableSubscriber.receive(3000L).getIntProperty("ID"));
        assertEquals("is message 10", 10, createDurableSubscriber.receive(3000L).getIntProperty("ID"));
        createSession6.close();
        createConnection6.close();
    }

    public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
        addCombinationValues("journalMaxFileLength", new Object[]{new Integer(65536)});
        addCombinationValues("keepDurableSubsActive", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testCleanupDeletedSubAfterRestart() throws Exception {
        Connection createConnection = createConnection("cli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("cli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        String obj = new byte[40960].toString();
        int i = 0;
        for (int i2 = 0; i2 < 500; i2++) {
            TextMessage createTextMessage = createSession3.createTextMessage(obj);
            createTextMessage.setStringProperty("filter", "false");
            createTextMessage.setIntProperty("ID", i2);
            createProducer.send(this.topic, createTextMessage);
            i++;
        }
        createConnection3.close();
        LOG.info("sent: " + i);
        createConnection("cli1").createSession(false, 1).unsubscribe("SubsId");
        destroyBroker();
        createBroker(false);
        Connection createConnection4 = createConnection("cli2");
        Session createSession4 = createConnection4.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession4.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        final Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.5
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                DurableSubscriptionOfflineTest.LOG.info("Want: 500, current: " + listener.count);
                return listener.count == 500;
            }
        }));
        createSession4.close();
        createConnection4.close();
        destroyBroker();
        createBroker(false);
        assertEquals("only one journal file left after restart", 1, this.broker.getPersistenceAdapter().getStore().getJournal().getFileMap().size());
    }

    public void testPageReuse() throws Exception {
        for (int i = 0; i <= 115; i++) {
            Connection createConnection = createConnection("cli" + i);
            Session createSession = createConnection.createSession(false, 1);
            createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
            createSession.close();
            createConnection.close();
        }
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createProducer((Destination) null).send(this.topic, createSession2.createTextMessage(new byte[10].toString()));
        createConnection2.close();
        for (int i2 = 0; i2 <= 114; i2++) {
            Connection createConnection3 = createConnection("cli" + i2);
            Session createSession3 = createConnection3.createSession(false, 1);
            createSession3.unsubscribe("SubsId");
            createSession3.close();
            createConnection3.close();
        }
        destroyBroker();
        createBroker(false);
        for (int i3 = 1; i3 <= 115; i3++) {
            Connection createConnection4 = createConnection("cli" + i3);
            Session createSession4 = createConnection4.createSession(false, 1);
            createSession4.createDurableSubscriber(this.topic, "SubsId", filter, true);
            createSession4.close();
            createConnection4.close();
        }
    }

    public void testRedeliveryFlag() throws Exception {
        for (int i = 0; i < 2; i++) {
            Connection createConnection = createConnection("cliId" + i);
            Session createSession = createConnection.createSession(false, 2);
            createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
            createSession.close();
            createConnection.close();
        }
        Random random = new Random();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        for (int i2 = 0; i2 < 1000; i2++) {
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        createSession2.close();
        createConnection2.close();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i3 = 0; i3 < 2; i3++) {
            newCachedThreadPool.execute(new Runnable("cliId" + i3, random) { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1Client
                Connection con;
                Session session;
                String clientId;
                final /* synthetic */ Random val$random;

                {
                    this.val$random = random;
                    this.clientId = r5;
                }

                @Override // java.lang.Runnable
                public void run() {
                    for (int i4 = -1; i4 < this.val$random.nextInt(10); i4++) {
                        try {
                            this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                            this.session = this.con.createSession(false, 2);
                            this.session.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                            this.session.close();
                            this.con.close();
                        } catch (Throwable th) {
                            th.printStackTrace();
                            DurableSubscriptionOfflineTest.this.exceptions.add(th);
                            return;
                        }
                    }
                    this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                    this.session = this.con.createSession(false, 2);
                    Message receive = this.session.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true).receive(4000L);
                    Assert.assertNotNull("got message", receive);
                    receive.acknowledge();
                    this.session.close();
                    this.con.close();
                    for (int i5 = 0; i5 < this.val$random.nextInt(10); i5++) {
                        this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                        this.session = this.con.createSession(false, 2);
                        TopicSubscriber createDurableSubscriber = this.session.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                        for (int i6 = 0; i6 < 999; i6++) {
                            Assert.assertNotNull("got message", createDurableSubscriber.receive(4000L));
                        }
                        this.session.close();
                        this.con.close();
                    }
                    this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                    this.session = this.con.createSession(false, 2);
                    TopicSubscriber createDurableSubscriber2 = this.session.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                    for (int i7 = 0; i7 < 999; i7++) {
                        receive = createDurableSubscriber2.receive(4000L);
                        Assert.assertNotNull("got message", receive);
                        Assert.assertTrue("is redelivered", receive.getJMSRedelivered());
                    }
                    receive.acknowledge();
                    this.session.close();
                    this.con.close();
                    this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                    this.session = this.con.createSession(false, 2);
                    Assert.assertNull("no message left", this.session.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true).receive(2000L));
                }
            });
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(10L, TimeUnit.MINUTES);
        assertTrue("No exceptions expected, but was: " + this.exceptions, this.exceptions.isEmpty());
    }
}
