package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.KahaDBStoreRecoveryBrokerTest;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/DurableConsumerTest.class */
public class DurableConsumerTest extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
    private static int COUNT = 1024;
    private static String CONSUMER_NAME = "DURABLE_TEST";
    protected BrokerService broker;
    protected ConnectionFactory factory;
    private static final String TOPIC_NAME = "failoverTopic";
    private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
    protected String bindAddress = NetworkedSyncTest.broker1URL;
    protected byte[] payload = new byte[32768];
    protected Vector<Exception> exceptions = new Vector<>();
    public boolean useDedicatedTaskRunner = false;

    /* loaded from: input_file:org/apache/activemq/bugs/DurableConsumerTest$MessagePublisher.class */
    private class MessagePublisher implements Runnable {
        private final boolean shouldPublish = true;

        private MessagePublisher() {
            this.shouldPublish = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            TopicPublisher topicPublisher = null;
            Message message = null;
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DurableConsumerTest.CONNECTION_URL);
            try {
                ActiveMQTopic activeMQTopic = new ActiveMQTopic(DurableConsumerTest.TOPIC_NAME);
                TopicSession createTopicSession = activeMQConnectionFactory.createTopicConnection().createTopicSession(false, 1);
                topicPublisher = createTopicSession.createPublisher(activeMQTopic);
                message = createTopicSession.createMessage();
            } catch (Exception e) {
                DurableConsumerTest.this.exceptions.add(e);
            }
            while (true) {
                try {
                    topicPublisher.publish(message, 2, 1, 7200000L);
                } catch (JMSException e2) {
                    DurableConsumerTest.this.exceptions.add(e2);
                }
                try {
                    Thread.sleep(1L);
                } catch (Exception e3) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/DurableConsumerTest$SimpleTopicSubscriber.class */
    private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
        private TopicConnection topicConnection;

        public SimpleTopicSubscriber(String str, String str2, String str3) {
            this.topicConnection = null;
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(str);
            try {
                ActiveMQTopic activeMQTopic = new ActiveMQTopic(str3);
                this.topicConnection = activeMQConnectionFactory.createTopicConnection();
                this.topicConnection.setClientID(str2);
                this.topicConnection.start();
                this.topicConnection.createTopicSession(false, 1).createDurableSubscriber(activeMQTopic, str2).setMessageListener(this);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

        public void onMessage(Message message) {
        }

        public void closeConnection() {
            if (this.topicConnection != null) {
                try {
                    this.topicConnection.close();
                } catch (JMSException e) {
                }
            }
        }

        public void onException(JMSException jMSException) {
            DurableConsumerTest.this.exceptions.add(jMSException);
        }
    }

    private void configurePersistence(BrokerService brokerService) throws Exception {
        File file = new File("target/" + getName());
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(file);
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
    }

    public void testFailover() throws Exception {
        configurePersistence(this.broker);
        this.broker.start();
        new Thread(new MessagePublisher()).start();
        final ArrayList arrayList = new ArrayList(100);
        for (int i = 0; i < 100; i++) {
            final int i2 = i;
            new Thread(new Runnable() { // from class: org.apache.activemq.bugs.DurableConsumerTest.1
                @Override // java.lang.Runnable
                public void run() {
                    arrayList.add(new SimpleTopicSubscriber(DurableConsumerTest.CONNECTION_URL, System.currentTimeMillis() + "-" + i2, DurableConsumerTest.TOPIC_NAME));
                }
            }).start();
        }
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.DurableConsumerTest.2
            public boolean isSatisified() throws Exception {
                return 100 == arrayList.size();
            }
        });
        this.broker.stop();
        this.broker = createBroker(false);
        configurePersistence(this.broker);
        this.broker.start();
        Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((SimpleTopicSubscriber) it.next()).closeConnection();
        }
        assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    public void initCombosForTestConcurrentDurableConsumer() {
        addCombinationValues("useDedicatedTaskRunner", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testConcurrentDurableConsumer() throws Exception {
        this.broker.start();
        this.factory = createConnectionFactory();
        final String name = getName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.bugs.DurableConsumerTest.3
            @Override // java.lang.Runnable
            public void run() {
                Message receive;
                String name2 = Thread.currentThread().getName();
                int i = 0;
                int i2 = 0;
                while (i < 250) {
                    try {
                        ActiveMQConnection createConnection = DurableConsumerTest.this.factory.createConnection();
                        createConnection.setWatchTopicAdvisories(false);
                        createConnection.setClientID(name2);
                        Session createSession = createConnection.createSession(false, 2);
                        Topic createTopic = createSession.createTopic(name);
                        createConnection.start();
                        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, name2);
                        countDownLatch.countDown();
                        do {
                            receive = createDurableSubscriber.receive(5000L);
                            if (receive != null) {
                                atomicInteger.incrementAndGet();
                                if (i2 != 0 && i2 % 100 == 0) {
                                    DurableConsumerTest.LOG.info("Received msg: " + receive.getJMSMessageID());
                                }
                                i2++;
                                if (i2 % 2 == 0) {
                                    receive.acknowledge();
                                    i++;
                                }
                            }
                        } while (receive == null);
                        createConnection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                        DurableConsumerTest.this.exceptions.add(e);
                        return;
                    }
                }
                TestCase.assertTrue(i2 >= i);
            }
        };
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 1; i++) {
            newFixedThreadPool.execute(runnable);
        }
        assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        ActiveMQConnection createConnection = this.factory.createConnection();
        createConnection.setWatchTopicAdvisories(false);
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createTopic(name));
        createConnection.start();
        for (int i2 = 0; i2 < 500; i2++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(this.payload);
            createProducer.send(createBytesMessage);
            if (i2 != 0 && i2 % 100 == 0) {
                LOG.info("Sent msg " + i2);
            }
        }
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.DurableConsumerTest.4
            public boolean isSatisified() throws Exception {
                DurableConsumerTest.LOG.info("receivedCount: " + atomicInteger.get());
                return atomicInteger.get() == 500;
            }
        }, 360000L);
        assertEquals("got required some messages", DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP, atomicInteger.get());
        assertTrue("no exceptions, but: " + this.exceptions, this.exceptions.isEmpty());
    }

    public void testConsumerRecover() throws Exception {
        doTestConsumer(true);
    }

    public void testConsumer() throws Exception {
        doTestConsumer(false);
    }

    public void testPrefetchViaBrokerConfig() throws Exception {
        Integer num = new Integer(150);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setDurableTopicPrefetch(num.intValue());
        policyEntry.setPrioritizedMessages(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.factory = createConnectionFactory();
        Connection createConnection = this.factory.createConnection();
        createConnection.setClientID(CONSUMER_NAME);
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(createSession.createTopic(getClass().getName()), CONSUMER_NAME);
        createConnection.start();
        assertEquals(num, this.broker.getManagementContext().getAttribute(this.broker.getAdminView().getDurableTopicSubscribers()[0], "PrefetchSize"));
    }

    public void doTestConsumer(boolean z) throws Exception {
        if (z) {
            configurePersistence(this.broker);
        }
        this.broker.start();
        this.factory = createConnectionFactory();
        Connection createConnection = this.factory.createConnection();
        createConnection.setClientID(CONSUMER_NAME);
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic(getClass().getName());
        createSession.createDurableSubscriber(createTopic, CONSUMER_NAME);
        createConnection.start();
        createConnection.close();
        this.broker.stop();
        this.broker = createBroker(false);
        if (z) {
            configurePersistence(this.broker);
        }
        this.broker.start();
        Connection createConnection2 = this.factory.createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer(createTopic);
        createConnection2.start();
        for (int i = 0; i < COUNT; i++) {
            BytesMessage createBytesMessage = createSession2.createBytesMessage();
            createBytesMessage.writeBytes(this.payload);
            createProducer.send(createBytesMessage);
            if (i != 0 && i % 1000 == 0) {
                LOG.info("Sent msg " + i);
            }
        }
        createConnection2.close();
        this.broker.stop();
        this.broker = createBroker(false);
        if (z) {
            configurePersistence(this.broker);
        }
        this.broker.start();
        Connection createConnection3 = this.factory.createConnection();
        createConnection3.setClientID(CONSUMER_NAME);
        createConnection3.start();
        TopicSubscriber createDurableSubscriber = createConnection3.createSession(false, 1).createDurableSubscriber(createTopic, CONSUMER_NAME);
        for (int i2 = 0; i2 < COUNT; i2++) {
            assertNotNull("Missing message: " + i2, createDurableSubscriber.receive(DurableSubProcessWithRestartTest.BROKER_RESTART));
            if (i2 != 0 && i2 % 1000 == 0) {
                LOG.info("Received msg " + i2);
            }
        }
        createConnection3.close();
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker(true);
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
            this.broker = null;
        }
    }

    protected Topic creatTopic(Session session, String str) throws JMSException {
        return session.createTopic(str);
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService, z);
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService, boolean z) throws Exception {
        brokerService.setDeleteAllMessagesOnStartup(z);
        KahaDBStore kahaDBStore = new KahaDBStore();
        File file = new File(KahaDBStoreRecoveryBrokerTest.KAHADB_DIR_BASE);
        if (z) {
            IOHelper.deleteChildren(file);
        }
        kahaDBStore.setDirectory(file);
        brokerService.setPersistenceAdapter(kahaDBStore);
        brokerService.addConnector(this.bindAddress);
        brokerService.setUseShutdownHook(false);
        brokerService.setAdvisorySupport(false);
        brokerService.setDedicatedTaskRunner(this.useDedicatedTaskRunner);
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.bindAddress);
        activeMQConnectionFactory.setUseDedicatedTaskRunner(this.useDedicatedTaskRunner);
        return activeMQConnectionFactory;
    }

    public static Test suite() {
        return suite(DurableConsumerTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }
}
