package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4413Test.class */
public class AMQ4413Test {
    static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class);
    private String connectionUri;
    final String brokerUrl = JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT;
    final int numMsgsTriggeringReconnection = 2;
    final int numMsgs = 30;
    final int numTests = 75;
    final ExecutorService threadPool = Executors.newCachedThreadPool();

    @Test
    public void testDurableSubMessageLoss() throws Exception {
        BrokerService brokerService = new BrokerService();
        this.connectionUri = brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).getPublishableConnectString();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setKeepDurableSubsActive(true);
        brokerService.setAdvisorySupport(false);
        brokerService.start();
        LOG.info("##### broker started");
        try {
            for (int i = 0; i < 75; i++) {
                LOG.info("##### test " + i + " started");
                test();
            }
            LOG.info("##### tests are done");
        } catch (Exception e) {
            e.printStackTrace();
            LOG.info("##### tests failed!");
        } finally {
            this.threadPool.shutdown();
            brokerService.stop();
            LOG.info("##### broker stopped");
        }
    }

    private void test() throws Exception {
        final String str = "topic-" + UUID.randomUUID();
        final String str2 = "client-" + UUID.randomUUID();
        final String str3 = "sub-" + UUID.randomUUID();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.setClientID(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createSession.createTopic(str), str3);
        createConnection.stop();
        createDurableSubscriber.close();
        createSession.close();
        createConnection.close();
        Callable<Boolean> callable = new Callable<Boolean>() { // from class: org.apache.activemq.bugs.AMQ4413Test.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                Connection connection = null;
                try {
                    ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(AMQ4413Test.this.connectionUri);
                    activeMQConnectionFactory2.setWatchTopicAdvisories(false);
                    connection = activeMQConnectionFactory2.createConnection();
                    Session createSession2 = connection.createSession(false, 1);
                    MessageProducer createProducer = createSession2.createProducer(createSession2.createTopic(str));
                    createProducer.setDeliveryMode(2);
                    createProducer.setPriority(4);
                    createProducer.setTimeToLive(0L);
                    for (int i = 1; i <= 30; i++) {
                        createProducer.send(createSession2.createTextMessage(String.valueOf(i)));
                        AMQ4413Test.LOG.info("pub sent msg: " + i);
                        Thread.sleep(1L);
                    }
                    AMQ4413Test.LOG.info("pub is done");
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                    return Boolean.TRUE;
                } catch (Throwable th) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException e2) {
                            e2.printStackTrace();
                        }
                    }
                    throw th;
                }
            }
        };
        Callable<Boolean> callable2 = new Callable<Boolean>() { // from class: org.apache.activemq.bugs.AMQ4413Test.2
            ActiveMQConnectionFactory factory;
            Connection connection;
            Session session;
            Topic topic;
            TopicSubscriber consumer;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                this.factory = new ActiveMQConnectionFactory(AMQ4413Test.this.connectionUri);
                this.factory.setWatchTopicAdvisories(false);
                try {
                    connect();
                    for (int i = 1; i <= 30; i++) {
                        TextMessage receive = this.consumer.receive(3000L);
                        if (receive == null) {
                            AMQ4413Test.LOG.info("expected: " + i + ", actual: timed out", receive);
                            return Boolean.FALSE;
                        }
                        int parseInt = Integer.parseInt(receive.getText());
                        AMQ4413Test.LOG.info("sub received msg: " + parseInt);
                        if (i != parseInt) {
                            AMQ4413Test.LOG.info("expected: " + i + ", actual: " + parseInt);
                            Boolean bool = Boolean.FALSE;
                            try {
                                close(true);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            return bool;
                        }
                        if (parseInt % 2 == 0) {
                            close(false);
                            connect();
                            AMQ4413Test.LOG.info("sub reconnected");
                        }
                    }
                    AMQ4413Test.LOG.info("sub is done");
                    try {
                        close(true);
                    } catch (Exception e2) {
                        e2.printStackTrace();
                    }
                    return Boolean.TRUE;
                } finally {
                    try {
                        close(true);
                    } catch (Exception e3) {
                        e3.printStackTrace();
                    }
                }
            }

            void connect() throws Exception {
                this.connection = this.factory.createConnection();
                this.connection.setClientID(str2);
                this.connection.start();
                this.session = this.connection.createSession(false, 1);
                this.topic = this.session.createTopic(str);
                this.consumer = this.session.createDurableSubscriber(this.topic, str3);
            }

            void close(boolean z) throws Exception {
                if (this.connection != null) {
                    this.connection.stop();
                }
                if (this.consumer != null) {
                    this.consumer.close();
                }
                if (this.session != null) {
                    if (z) {
                        this.session.unsubscribe(str3);
                    }
                    this.session.close();
                }
                if (this.connection != null) {
                    this.connection.close();
                }
            }
        };
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.threadPool.submit(callable));
        arrayList.add(this.threadPool.submit(callable2));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Boolean) ((Future) it.next()).get()).booleanValue());
        }
    }
}
