/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubscriptionOfflineTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
    public boolean usePrioritySupport = Boolean.TRUE;
    public int journalMaxFileLength = 0x2000000;
    public boolean keepDurableSubsActive = true;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private final List<Throwable> exceptions = new ArrayList<Throwable>();
    private static final String osName = System.getProperty("os.name");
    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + this.getName(true));
        connectionFactory.setWatchTopicAdvisories(false);
        return connectionFactory;
    }

    @Override
    protected Connection createConnection() throws Exception {
        return this.createConnection("cliName");
    }

    protected Connection createConnection(String name) throws Exception {
        Connection con = super.createConnection();
        con.setClientID(name);
        con.start();
        return con;
    }

    public static Test suite() {
        return DurableSubscriptionOfflineTest.suite(DurableSubscriptionOfflineTest.class);
    }

    protected void setUp() throws Exception {
        this.setAutoFail(true);
        this.setMaxTestTime(300000L);
        this.exceptions.clear();
        this.topic = (ActiveMQTopic)this.createDestination();
        this.createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        this.destroyBroker();
    }

    private void createBroker() throws Exception {
        this.createBroker(true);
    }

    private void createBroker(boolean deleteAllMessages) throws Exception {
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + this.getName(true) + ")"));
        this.broker.setBrokerName(this.getName(true));
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.getManagementContext().setCreateConnector(false);
        this.broker.setAdvisorySupport(false);
        this.broker.setKeepDurableSubsActive(this.keepDurableSubsActive);
        this.broker.addConnector("tcp://0.0.0.0:0");
        if (this.usePrioritySupport) {
            PolicyEntry policy = new PolicyEntry();
            policy.setPrioritizedMessages(true);
            PolicyMap policyMap = new PolicyMap();
            policyMap.setDefaultEntry(policy);
            this.broker.setDestinationPolicy(policyMap);
        }
        this.setDefaultPersistenceAdapter(this.broker);
        if (this.broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
            ((JDBCPersistenceAdapter)this.broker.getPersistenceAdapter()).setCleanupPeriod(2000);
        } else if (this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
            ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).setJournalMaxFileLength(this.journalMaxFileLength);
        }
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private Object[] getPersistenceAdapters() {
        Object[] persistenceAdapters = !osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS") ? new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.LevelDB, TestSupport.PersistenceAdapterChoice.JDBC} : new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC};
        return persistenceAdapters;
    }

    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
        Object[] persistenceAdapters = this.getPersistenceAdapters();
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
        this.addCombinationValues("usePrioritySupport", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testConsumeOnlyMatchedMessages() throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            boolean filter;
            boolean bl = filter = i % 2 == 1;
            if (filter) {
                ++sent;
            }
            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
    }

    public void testConsumeAllMatchedMessages() throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
    }

    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
        Object[] persistenceAdapters = this.getPersistenceAdapters();
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
        this.addCombinationValues("usePrioritySupport", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testVerifyAllConsumedAreAcked() throws Exception {
        Connection con = this.createConnection();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        LOG.info("Consumed: " + listener.count);
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
        con = this.createConnection();
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)0, (int)listener.count);
    }

    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
        Connection con = this.createConnection("cliId1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        Connection con2 = this.createConnection("cliId2");
        Session session2 = con2.createSession(false, 1);
        TopicSubscriber consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener((MessageListener)listener2);
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        Thread.sleep(3000L);
        session2.close();
        con2.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener2.count);
        con = this.createConnection("cliId1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((String)"offline consumer got all", (int)sent, (int)listener.count);
    }

    public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
        this.addCombinationValues("keepDurableSubsActive", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testJMXCountersWithOfflineSubs() throws Exception {
        Connection con = this.createConnection("cliId1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        session.close();
        con.close();
        this.broker.stop();
        this.createBroker(false);
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            producer.send((Destination)this.topic, message);
        }
        session.close();
        con.close();
        con = this.createConnection("cliId1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        for (int i = 0; i < sent / 2; ++i) {
            Message m = consumer.receive(4000L);
            DurableSubscriptionOfflineTest.assertNotNull((String)("got message: " + i), (Object)m);
            LOG.info("Got :" + i + ", " + m);
        }
        ObjectName activeDurableSubName = this.broker.getAdminView().getDurableTopicSubscribers()[0];
        LOG.info("active durable sub name: " + activeDurableSubName);
        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
        DurableSubscriptionOfflineTest.assertTrue((String)"is active", (boolean)durableSubscriptionView.isActive());
        DurableSubscriptionOfflineTest.assertEquals((String)"all enqueued", (long)(this.keepDurableSubsActive ? 10L : 0L), (long)durableSubscriptionView.getEnqueueCounter());
        DurableSubscriptionOfflineTest.assertTrue((String)"correct waiting acks", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
            }
        }));
        DurableSubscriptionOfflineTest.assertEquals((String)"correct dequeue", (long)5L, (long)durableSubscriptionView.getDequeueCounter());
        ObjectName destinationName = this.broker.getAdminView().getTopics()[0];
        TopicViewMBean topicView = (TopicViewMBean)this.broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
        DurableSubscriptionOfflineTest.assertEquals((String)"correct enqueue", (long)10L, (long)topicView.getEnqueueCount());
        DurableSubscriptionOfflineTest.assertEquals((String)"still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", (long)0L, (long)topicView.getDequeueCount());
        DurableSubscriptionOfflineTest.assertEquals((String)"inflight", (long)5L, (long)topicView.getInFlightCount());
        session.close();
        con.close();
        ObjectName inActiveDurableSubName = this.broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
        DurableSubscriptionOfflineTest.assertTrue((String)"is not active", (!durableSubscriptionView1.isActive() ? 1 : 0) != 0);
        DurableSubscriptionOfflineTest.assertEquals((String)"all enqueued", (long)(this.keepDurableSubsActive ? 10L : 0L), (long)durableSubscriptionView1.getEnqueueCounter());
        DurableSubscriptionOfflineTest.assertEquals((String)"correct awaiting ack", (int)0, (int)durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
        DurableSubscriptionOfflineTest.assertEquals((String)"correct dequeue", (long)(this.keepDurableSubsActive ? 5L : 0L), (long)durableSubscriptionView1.getDequeueCounter());
        DurableSubscriptionOfflineTest.assertEquals((String)"correct enqueue", (long)10L, (long)topicView.getEnqueueCount());
        DurableSubscriptionOfflineTest.assertEquals((String)"still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", (long)0L, (long)topicView.getDequeueCount());
        DurableSubscriptionOfflineTest.assertEquals((String)"inflight back to 0 after deactivate", (long)0L, (long)topicView.getInFlightCount());
        con = this.createConnection("cliId1");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        for (int i = 0; i < sent / 2; ++i) {
            Message m = consumer.receive(30000L);
            DurableSubscriptionOfflineTest.assertNotNull((String)("got message: " + i), (Object)m);
            LOG.info("Got :" + i + ", " + m);
        }
        activeDurableSubName = this.broker.getAdminView().getDurableTopicSubscribers()[0];
        LOG.info("durable sub name: " + activeDurableSubName);
        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
        DurableSubscriptionOfflineTest.assertTrue((String)"is active", (boolean)durableSubscriptionView2.isActive());
        DurableSubscriptionOfflineTest.assertEquals((String)"all enqueued", (long)(this.keepDurableSubsActive ? 10L : 0L), (long)durableSubscriptionView2.getEnqueueCounter());
        DurableSubscriptionOfflineTest.assertTrue((String)"correct dequeue", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long val = durableSubscriptionView2.getDequeueCounter();
                LOG.info("dequeue count:" + val);
                return 10L == val;
            }
        }));
    }

    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
        Object[] persistenceAdapters = this.getPersistenceAdapters();
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
        this.addCombinationValues("usePrioritySupport", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        Connection con2 = this.createConnection("onlineCli1");
        Session session2 = con2.createSession(false, 1);
        TopicSubscriber consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener((MessageListener)listener2);
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        Thread.sleep(3000L);
        session2.close();
        con2.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener2.count);
        this.broker.stop();
        this.createBroker(false);
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Connection con3 = this.createConnection("offCli2");
        Session session3 = con3.createSession(false, 1);
        TopicSubscriber consumer3 = session3.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Listener listener3 = new Listener();
        consumer3.setMessageListener((MessageListener)listener3);
        Thread.sleep(3000L);
        session.close();
        con.close();
        session3.close();
        con3.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener3.count);
    }

    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
        Object[] persistenceAdapters = this.getPersistenceAdapters();
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
    }

    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
        Connection con = this.createConnection("cliId1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        Connection con2 = this.createConnection("cliId2");
        Session session2 = con2.createSession(false, 1);
        TopicSubscriber consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener((MessageListener)listener2);
        DurableSubscriptionOfflineTest.assertEquals((int)0, (int)listener2.count);
        session2.close();
        con2.close();
        for (int i = 0; i < 10; ++i) {
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        con2 = this.createConnection("cliId2");
        session2 = con2.createSession(false, 1);
        consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        listener2 = new Listener("cliId2");
        consumer2.setMessageListener((MessageListener)listener2);
        Thread.sleep(3000L);
        DurableSubscriptionOfflineTest.assertEquals((int)10, (int)listener2.count);
        con = this.createConnection("cliId1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener("cliId1");
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((String)"offline consumer got all", (int)sent, (int)listener.count);
    }

    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
        Object[] persistenceAdapters = this.getPersistenceAdapters();
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
    }

    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        session.close();
        con.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        session.close();
        con.close();
        Connection con2 = this.createConnection("onlineCli1");
        Session session2 = con2.createSession(false, 1);
        TopicSubscriber consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        Listener listener2 = new Listener();
        consumer2.setMessageListener((MessageListener)listener2);
        Connection con4 = this.createConnection("nondurableCli");
        Session session4 = con4.createSession(false, 1);
        MessageConsumer consumer4 = session4.createConsumer((Destination)this.topic, filter, true);
        Listener listener4 = new Listener();
        consumer4.setMessageListener((MessageListener)listener4);
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        boolean hasRelevant = false;
        int filtered = 0;
        for (int i = 0; i < 100; ++i) {
            int postf = (int)(Math.random() * 9.0) + 1;
            String d = "D" + postf;
            if ("D1".equals(d) || "D2".equals(d)) {
                hasRelevant = true;
                ++filtered;
            }
            Message message = session.createMessage();
            message.setStringProperty("$a", "A1");
            message.setStringProperty("$d", d);
            producer.send((Destination)this.topic, message);
        }
        Message message = session.createMessage();
        message.setStringProperty("$a", "A1");
        message.setBooleanProperty("$b", true);
        message.setBooleanProperty("$c", hasRelevant);
        producer.send((Destination)this.topic, message);
        if (hasRelevant) {
            ++filtered;
        }
        Thread.sleep(1000L);
        session.close();
        con.close();
        Thread.sleep(3000L);
        session4.close();
        con4.close();
        DurableSubscriptionOfflineTest.assertEquals((int)filtered, (int)listener4.count);
        session2.close();
        con2.close();
        DurableSubscriptionOfflineTest.assertEquals((int)filtered, (int)listener2.count);
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        FilterCheckListener listener = new FilterCheckListener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)filtered, (int)listener.count);
        Connection con3 = this.createConnection("offCli2");
        Session session3 = con3.createSession(false, 1);
        TopicSubscriber consumer3 = session3.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        FilterCheckListener listener3 = new FilterCheckListener();
        consumer3.setMessageListener((MessageListener)listener3);
        Thread.sleep(3000L);
        session3.close();
        con3.close();
        DurableSubscriptionOfflineTest.assertEquals((int)filtered, (int)listener3.count);
        DurableSubscriptionOfflineTest.assertTrue((String)("no unexpected exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    public void testRemovedDurableSubDeletes() throws Exception {
        Connection con = this.createConnection("cliId1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        for (int i = 0; i < 10; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        Connection con2 = this.createConnection("cliId1");
        Session session2 = con2.createSession(false, 1);
        session2.unsubscribe("SubsId");
        session2.close();
        con2.close();
        this.topic = new ActiveMQTopic(this.topic.getPhysicalName() + "?consumer.retroactive=true");
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)0, (int)listener.count);
    }

    public void testRemovedDurableSubDeletesFromIndex() throws Exception {
        if (!(this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
            return;
        }
        int numMessages = 2750;
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        PageFile pageFile = kahaDBPersistenceAdapter.getStore().getPageFile();
        LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
        long lastDiff = 0L;
        for (int repeats = 0; repeats < 2; ++repeats) {
            LOG.info("Iteration: " + repeats + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
            Connection con = this.createConnection("cliId1-" + repeats);
            Session session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
            session.close();
            con.close();
            con = this.createConnection();
            session = con.createSession(false, 1);
            MessageProducer producer = session.createProducer(null);
            for (int i = 0; i < 2750; ++i) {
                Message message = session.createMessage();
                message.setStringProperty("filter", "true");
                producer.send((Destination)this.topic, message);
            }
            con.close();
            Connection con2 = this.createConnection("cliId1-" + repeats);
            Session session2 = con2.createSession(false, 1);
            session2.unsubscribe("SubsId");
            session2.close();
            con2.close();
            LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
            if (lastDiff != 0L) {
                DurableSubscriptionOfflineTest.assertEquals((String)("Only use X pages per iteration: " + repeats), (long)lastDiff, (long)(pageFile.getPageCount() - pageFile.getFreePageCount()));
            }
            lastDiff = pageFile.getPageCount() - pageFile.getFreePageCount();
        }
    }

    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
        Object[] persistenceAdapters = this.getPersistenceAdapters();
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
    }

    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
        Message message;
        boolean filter;
        int i;
        if (TestSupport.PersistenceAdapterChoice.LevelDB == this.defaultPersistenceAdapter) {
            return;
        }
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int filtered = 0;
        for (i = 0; i < 10; ++i) {
            boolean bl = filter = (int)(Math.random() * 2.0) >= 1;
            if (filter) {
                ++filtered;
            }
            message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        LOG.info("sent: " + filtered);
        Thread.sleep(1000L);
        session.close();
        con.close();
        Thread.sleep(3000L);
        this.broker.stop();
        this.createBroker(false);
        con = this.createConnection();
        session = con.createSession(false, 1);
        producer = session.createProducer(null);
        for (i = 0; i < 10; ++i) {
            boolean bl = filter = (int)(Math.random() * 2.0) >= 1;
            if (filter) {
                ++filtered;
            }
            message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        LOG.info("after restart, total sent with filter='true': " + filtered);
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener("1>");
        consumer.setMessageListener((MessageListener)listener);
        Connection con3 = this.createConnection("offCli2");
        Session session3 = con3.createSession(false, 1);
        TopicSubscriber consumer3 = session3.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener3 = new Listener();
        consumer3.setMessageListener((MessageListener)listener3);
        Thread.sleep(3000L);
        session.close();
        con.close();
        session3.close();
        con3.close();
        DurableSubscriptionOfflineTest.assertEquals((int)filtered, (int)listener.count);
        DurableSubscriptionOfflineTest.assertEquals((int)filtered, (int)listener3.count);
    }

    public void initCombosForTestOfflineAfterRestart() throws Exception {
        this.addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.LevelDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testOfflineSubscriptionAfterRestart() throws Exception {
        Message message;
        int i;
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, false);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (i = 0; i < 10; ++i) {
            ++sent;
            message = session.createMessage();
            message.setStringProperty("filter", "false");
            producer.send((Destination)this.topic, message);
        }
        LOG.info("sent: " + sent);
        Thread.sleep(5000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
        Thread.sleep(3000L);
        this.broker.stop();
        this.createBroker(false);
        con = this.createConnection();
        session = con.createSession(false, 1);
        producer = session.createProducer(null);
        for (i = 0; i < 10; ++i) {
            ++sent;
            message = session.createMessage();
            message.setStringProperty("filter", "false");
            producer.send((Destination)this.topic, message);
        }
        LOG.info("after restart, sent: " + sent);
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
    }

    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            boolean filter = (int)(Math.random() * 2.0) >= 1;
            ++sent;
            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        Thread.sleep(1000L);
        Connection con2 = this.createConnection("offCli1");
        Session session2 = con2.createSession(false, 1);
        session2.unsubscribe("SubsId");
        session2.close();
        con2.close();
        con = this.createConnection("offCli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        Listener listener = new Listener("SubsId");
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((String)"offline consumer got all", (int)sent, (int)listener.count);
    }

    public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
        int messageCount = 1000;
        Connection con = null;
        Session session = null;
        int numConsumers = 10;
        for (int i = 0; i <= 10; ++i) {
            con = this.createConnection("cli" + i);
            session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
            session.close();
            con.close();
        }
        String payLoad = new String(new byte[1000]);
        con = this.createConnection();
        final Session sendSession = con.createSession(true, 0);
        MessageProducer producer = sendSession.createProducer((Destination)this.topic);
        for (int i = 0; i < 1000; ++i) {
            producer.send((Message)sendSession.createTextMessage(payLoad));
        }
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    sendSession.commit();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        });
        for (int i = 0; i < 10; ++i) {
            class CheckForDupsClient
            implements Runnable {
                HashSet<Long> ids = new HashSet();
                final int id;

                public CheckForDupsClient(int id) {
                    this.id = id;
                }

                @Override
                public void run() {
                    try {
                        Connection con = DurableSubscriptionOfflineTest.this.createConnection("cli" + this.id);
                        Session session = con.createSession(false, 1);
                        for (int j = 0; j < 2; ++j) {
                            TopicSubscriber consumer = session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", null, true);
                            for (int i = 0; i < 500; ++i) {
                                Message message = consumer.receive(4000L);
                                TestCase.assertNotNull((Object)message);
                                long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
                                TestCase.assertTrue((String)("ID=" + this.id + " not a duplicate: " + producerSequenceId), (boolean)this.ids.add(producerSequenceId));
                            }
                            consumer.close();
                        }
                        TopicSubscriber consumer = session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", null, true);
                        Message message = consumer.receive(4000L);
                        if (message != null) {
                            long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
                            TestCase.assertTrue((String)("ID=" + this.id + " not a duplicate: " + producerSequenceId), (boolean)this.ids.add(producerSequenceId));
                        }
                        TestCase.assertNull((Object)message);
                        session.close();
                        con.close();
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(e);
                    }
                }
            }
            executorService.execute(new CheckForDupsClient(i));
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        con.close();
        DurableSubscriptionOfflineTest.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    public void testOrderOnActivateDeactivate() throws Exception {
        for (int i = 0; i < 10; ++i) {
            LOG.info("Iteration: " + i);
            this.doTestOrderOnActivateDeactivate();
            this.broker.stop();
            this.createBroker(true);
        }
    }

    public void doTestOrderOnActivateDeactivate() throws Exception {
        int messageCount = 1000;
        Connection con = null;
        Session session = null;
        int numConsumers = 4;
        for (int i = 0; i <= 4; ++i) {
            con = this.createConnection("cli" + i);
            session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
            session.close();
            con.close();
        }
        String url = "failover:(tcp://localhost:" + ((TransportConnector)this.broker.getTransportConnectors().get(1)).getConnectUri().getPort() + "?wireFormat.maxInactivityDuration=0)?" + "jms.watchTopicAdvisories=false&" + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + "jms.sendAcksAsync=true&" + "initialReconnectDelay=100&maxReconnectDelay=30000&" + "useExponentialBackOff=true";
        final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url);
        Runnable producer = new Runnable(){
            final String payLoad = new String(new byte[600]);

            @Override
            public void run() {
                try {
                    Connection con = DurableSubscriptionOfflineTest.this.createConnection();
                    Session sendSession = con.createSession(true, 0);
                    MessageProducer producer = sendSession.createProducer((Destination)DurableSubscriptionOfflineTest.this.topic);
                    for (int i = 0; i < 1000; ++i) {
                        producer.send((Message)sendSession.createTextMessage(this.payLoad));
                    }
                    LOG.info("About to commit: 1000");
                    sendSession.commit();
                    LOG.info("committed: 1000");
                    con.close();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        };
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 4; ++i) {
            class CheckOrderClient
            implements Runnable {
                final int id;
                int runCount = 0;

                public CheckOrderClient(int id) {
                    this.id = id;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        CheckOrderClient checkOrderClient = this;
                        synchronized (checkOrderClient) {
                            Message message;
                            int i;
                            Connection con = clientFactory.createConnection();
                            con.setClientID("cli" + this.id);
                            con.start();
                            Session session = con.createSession(false, 2);
                            TopicSubscriber consumer = session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", null, true);
                            int nextId = 0;
                            ++this.runCount;
                            for (i = 0; i < 500 && (message = consumer.receiveNoWait()) != null; ++i) {
                                long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
                                TestCase.assertEquals((String)(this.id + " expected order: runCount: " + this.runCount + " id: " + message.getJMSMessageID()), (long)(++nextId), (long)producerSequenceId);
                            }
                            LOG.info(con.getClientID() + " peeked " + i);
                            session.close();
                            con.close();
                        }
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(e);
                    }
                }
            }
            CheckOrderClient client = new CheckOrderClient(i);
            for (int j = 0; j < 100; ++j) {
                executorService.execute(client);
            }
        }
        executorService.execute(producer);
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        con.close();
        DurableSubscriptionOfflineTest.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
        Connection con = this.createConnection("offCli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int filtered = 0;
        for (int i = 0; i < 10; ++i) {
            boolean filter;
            boolean bl = filter = i % 2 == 0;
            if (filter) {
                ++filtered;
            }
            Message message = session.createMessage();
            message.setStringProperty("filter", filter ? "true" : "false");
            producer.send((Destination)this.topic, message);
        }
        LOG.info("sent: " + filtered);
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        session.unsubscribe("SubsId");
        session.close();
        con.close();
        con = this.createConnection("offCli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)0, (int)listener.count);
    }

    public void testAllConsumed() throws Exception {
        String filter = "filter = 'true'";
        Connection con = this.createConnection("cli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        for (int i = 0; i < 10; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        LOG.info("sent: " + sent);
        Thread.sleep(1000L);
        session.close();
        con.close();
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)sent, (int)listener.count);
        LOG.info("cli2 pull 2");
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        DurableSubscriptionOfflineTest.assertNotNull((String)"got message", (Object)consumer.receive(2000L));
        DurableSubscriptionOfflineTest.assertNotNull((String)"got message", (Object)consumer.receive(2000L));
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        producer = session.createProducer(null);
        sent = 0;
        for (int i = 0; i < 2; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", i == 1 ? "true" : "false");
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        LOG.info("sent: " + sent);
        Thread.sleep(1000L);
        session.close();
        con.close();
        LOG.info("cli1 again, should get 1 new ones");
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        Thread.sleep(3000L);
        session.close();
        con.close();
        DurableSubscriptionOfflineTest.assertEquals((int)1, (int)listener.count);
    }

    public void testNoMissOnMatchingSubAfterRestart() throws Exception {
        int i;
        String filter = "filter = 'true'";
        Connection con = this.createConnection("cli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int sent = 0;
        Message message = session.createMessage();
        message.setStringProperty("filter", "true");
        message.setIntProperty("ID", 0);
        producer.send((Destination)this.topic, message);
        for (i = ++sent; i < 10; ++i) {
            message = session.createMessage();
            message.setStringProperty("filter", "false");
            message.setIntProperty("ID", i);
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        con.close();
        LOG.info("sent: " + sent);
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        session.close();
        con.close();
        this.destroyBroker();
        this.createBroker(false);
        con = this.createConnection();
        session = con.createSession(false, 1);
        producer = session.createProducer(null);
        for (i = sent; i < 30; ++i) {
            message = session.createMessage();
            message.setStringProperty("filter", "true");
            message.setIntProperty("ID", i);
            producer.send((Destination)this.topic, message);
            ++sent;
        }
        con.close();
        LOG.info("sent: " + sent);
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        Message m = consumer.receive(3000L);
        DurableSubscriptionOfflineTest.assertEquals((String)"is message 10", (int)10, (int)m.getIntProperty("ID"));
        session.close();
        con.close();
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        m = consumer.receive(3000L);
        DurableSubscriptionOfflineTest.assertEquals((String)"is message 0", (int)0, (int)m.getIntProperty("ID"));
        m = consumer.receive(3000L);
        DurableSubscriptionOfflineTest.assertEquals((String)"is message 10", (int)10, (int)m.getIntProperty("ID"));
        session.close();
        con.close();
    }

    public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
        this.addCombinationValues("journalMaxFileLength", new Object[]{new Integer(65536)});
        this.addCombinationValues("keepDurableSubsActive", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testCleanupDeletedSubAfterRestart() throws Exception {
        int sent;
        Connection con = this.createConnection("cli1");
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        session.close();
        con.close();
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        session.close();
        con.close();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int toSend = 500;
        String payload = new byte[40960].toString();
        for (int i = sent = 0; i < 500; ++i) {
            TextMessage message = session.createTextMessage(payload);
            message.setStringProperty("filter", "false");
            message.setIntProperty("ID", i);
            producer.send((Destination)this.topic, (Message)message);
            ++sent;
        }
        con.close();
        LOG.info("sent: " + sent);
        con = this.createConnection("cli1");
        session = con.createSession(false, 1);
        session.unsubscribe("SubsId");
        this.destroyBroker();
        this.createBroker(false);
        con = this.createConnection("cli2");
        session = con.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
        final Listener listener = new Listener();
        consumer.setMessageListener((MessageListener)listener);
        DurableSubscriptionOfflineTest.assertTrue((String)"got all sent", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Want: 500, current: " + listener.count);
                return listener.count == 500;
            }
        }));
        session.close();
        con.close();
        this.destroyBroker();
        this.createBroker(false);
        KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        DurableSubscriptionOfflineTest.assertEquals((String)"only two journal file(s) left after restart", (int)2, (int)pa.getStore().getJournal().getFileMap().size());
    }

    public void testPageReuse() throws Exception {
        int i;
        Connection con = null;
        Session session = null;
        int numConsumers = 115;
        for (int i2 = 0; i2 <= 115; ++i2) {
            con = this.createConnection("cli" + i2);
            session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", null, true);
            session.close();
            con.close();
        }
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        TextMessage message = session.createTextMessage(new byte[10].toString());
        producer.send((Destination)this.topic, (Message)message);
        con.close();
        for (i = 0; i <= 114; ++i) {
            con = this.createConnection("cli" + i);
            session = con.createSession(false, 1);
            session.unsubscribe("SubsId");
            session.close();
            con.close();
        }
        this.destroyBroker();
        this.createBroker(false);
        for (i = 1; i <= 115; ++i) {
            con = this.createConnection("cli" + i);
            session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", filter, true);
            session.close();
            con.close();
        }
    }

    public void testRedeliveryFlag() throws Exception {
        Session session;
        Connection con;
        int numClients = 2;
        for (int i = 0; i < 2; ++i) {
            con = this.createConnection("cliId" + i);
            session = con.createSession(false, 2);
            session.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
            session.close();
            con.close();
        }
        final Random random = new Random();
        con = this.createConnection();
        session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        int count = 1000;
        for (int i = 0; i < 1000; ++i) {
            Message message = session.createMessage();
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, message);
        }
        session.close();
        con.close();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 2; ++i) {
            class Client
            implements Runnable {
                Connection con;
                Session session;
                String clientId;

                Client(String id) {
                    this.clientId = id;
                }

                @Override
                public void run() {
                    TopicSubscriber consumer = null;
                    Message message = null;
                    try {
                        int i;
                        for (i = -1; i < random.nextInt(10); ++i) {
                            this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                            this.session = this.con.createSession(false, 2);
                            consumer = this.session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                            this.session.close();
                            this.con.close();
                        }
                        this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                        this.session = this.con.createSession(false, 2);
                        consumer = this.session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                        message = consumer.receive(4000L);
                        TestCase.assertNotNull((String)"got message", (Object)message);
                        message.acknowledge();
                        this.session.close();
                        this.con.close();
                        for (int j = -1; j < random.nextInt(10); ++j) {
                            this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                            this.session = this.con.createSession(false, 2);
                            consumer = this.session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                            for (int i2 = 0; i2 < 999; ++i2) {
                                TestCase.assertNotNull((String)"got message", (Object)consumer.receive(4000L));
                            }
                            this.session.close();
                            this.con.close();
                        }
                        this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                        this.session = this.con.createSession(false, 2);
                        consumer = this.session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                        for (i = 0; i < 999; ++i) {
                            message = consumer.receive(4000L);
                            TestCase.assertNotNull((String)"got message", (Object)message);
                            TestCase.assertTrue((String)"is redelivered", (boolean)message.getJMSRedelivered());
                        }
                        message.acknowledge();
                        this.session.close();
                        this.con.close();
                        this.con = DurableSubscriptionOfflineTest.this.createConnection(this.clientId);
                        this.session = this.con.createSession(false, 2);
                        consumer = this.session.createDurableSubscriber((Topic)DurableSubscriptionOfflineTest.this.topic, "SubsId", "filter = 'true'", true);
                        TestCase.assertNull((String)"no message left", (Object)consumer.receive(2000L));
                    }
                    catch (Throwable throwable) {
                        throwable.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(throwable);
                    }
                }
            }
            executorService.execute(new Client("cliId" + i));
        }
        executorService.shutdown();
        executorService.awaitTermination(10L, TimeUnit.MINUTES);
        DurableSubscriptionOfflineTest.assertTrue((String)("No exceptions expected, but was: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    public class FilterCheckListener
    extends Listener {
        @Override
        public void onMessage(Message message) {
            ++this.count;
            try {
                Object b = message.getObjectProperty("$b");
                if (b != null) {
                    boolean c = message.getBooleanProperty("$c");
                    TestCase.assertTrue((String)"", (boolean)c);
                } else {
                    String d = message.getStringProperty("$d");
                    TestCase.assertTrue((String)"", ("D1".equals(d) || "D2".equals(d) ? 1 : 0) != 0);
                }
            }
            catch (JMSException e) {
                e.printStackTrace();
                DurableSubscriptionOfflineTest.this.exceptions.add(e);
            }
        }
    }

    public static class Listener
    implements MessageListener {
        int count = 0;
        String id = null;

        Listener() {
        }

        Listener(String id) {
            this.id = id;
        }

        public void onMessage(Message message) {
            ++this.count;
            if (this.id != null) {
                try {
                    LOG.info(this.id + ", " + message.getJMSMessageID());
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }
}

