/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4413Test {
    static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class);
    final String brokerUrl = "tcp://localhost:0";
    private String connectionUri;
    final int numMsgsTriggeringReconnection = 2;
    final int numMsgs = 30;
    final int numTests = 75;
    final ExecutorService threadPool = Executors.newCachedThreadPool();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDurableSubMessageLoss() throws Exception {
        BrokerService brokerService = new BrokerService();
        this.connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setKeepDurableSubsActive(true);
        brokerService.setAdvisorySupport(false);
        brokerService.start();
        LOG.info("##### broker started");
        try {
            for (int i = 0; i < 75; ++i) {
                LOG.info("##### test " + i + " started");
                this.test();
            }
            LOG.info("##### tests are done");
        }
        catch (Exception e) {
            e.printStackTrace();
            LOG.info("##### tests failed!");
        }
        finally {
            this.threadPool.shutdown();
            brokerService.stop();
            LOG.info("##### broker stopped");
        }
    }

    private void test() throws Exception {
        final String topicName = "topic-" + UUID.randomUUID();
        final String clientId = "client-" + UUID.randomUUID();
        final String subName = "sub-" + UUID.randomUUID();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        factory.setWatchTopicAdvisories(false);
        Connection connection = factory.createConnection();
        connection.setClientID(clientId);
        connection.start();
        Session session = connection.createSession(false, 1);
        Topic topic = session.createTopic(topicName);
        TopicSubscriber durableSubscriptionCreator = session.createDurableSubscriber(topic, subName);
        connection.stop();
        durableSubscriptionCreator.close();
        session.close();
        connection.close();
        Callable<Boolean> publisher = new Callable<Boolean>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Boolean call() throws Exception {
                Connection connection = null;
                try {
                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(AMQ4413Test.this.connectionUri);
                    factory.setWatchTopicAdvisories(false);
                    connection = factory.createConnection();
                    Session session = connection.createSession(false, 1);
                    Topic topic = session.createTopic(topicName);
                    MessageProducer producer = session.createProducer((Destination)topic);
                    producer.setDeliveryMode(2);
                    producer.setPriority(4);
                    producer.setTimeToLive(0L);
                    for (int seq = 1; seq <= 30; ++seq) {
                        TextMessage msg = session.createTextMessage(String.valueOf(seq));
                        producer.send((Message)msg);
                        LOG.info("pub sent msg: " + seq);
                        Thread.sleep(1L);
                    }
                    LOG.info("pub is done");
                }
                finally {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return Boolean.TRUE;
            }
        };
        Callable<Boolean> durableSubscriber = new Callable<Boolean>(){
            ActiveMQConnectionFactory factory;
            Connection connection;
            Session session;
            Topic topic;
            TopicSubscriber consumer;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Boolean call() throws Exception {
                this.factory = new ActiveMQConnectionFactory(AMQ4413Test.this.connectionUri);
                this.factory.setWatchTopicAdvisories(false);
                try {
                    this.connect();
                    for (int seqExpected = 1; seqExpected <= 30; ++seqExpected) {
                        TextMessage msg = (TextMessage)this.consumer.receive(3000L);
                        if (msg == null) {
                            LOG.info("expected: " + seqExpected + ", actual: timed out", (Object)msg);
                            Boolean bl = Boolean.FALSE;
                            return bl;
                        }
                        int seq = Integer.parseInt(msg.getText());
                        LOG.info("sub received msg: " + seq);
                        if (seqExpected != seq) {
                            LOG.info("expected: " + seqExpected + ", actual: " + seq);
                            Boolean bl = Boolean.FALSE;
                            return bl;
                        }
                        if (seq % 2 != 0) continue;
                        this.close(false);
                        this.connect();
                        LOG.info("sub reconnected");
                    }
                    LOG.info("sub is done");
                }
                finally {
                    try {
                        this.close(true);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                return Boolean.TRUE;
            }

            void connect() throws Exception {
                this.connection = this.factory.createConnection();
                this.connection.setClientID(clientId);
                this.connection.start();
                this.session = this.connection.createSession(false, 1);
                this.topic = this.session.createTopic(topicName);
                this.consumer = this.session.createDurableSubscriber(this.topic, subName);
            }

            void close(boolean unsubscribe) throws Exception {
                if (this.connection != null) {
                    this.connection.stop();
                }
                if (this.consumer != null) {
                    this.consumer.close();
                }
                if (this.session != null) {
                    if (unsubscribe) {
                        this.session.unsubscribe(subName);
                    }
                    this.session.close();
                }
                if (this.connection != null) {
                    this.connection.close();
                }
            }
        };
        ArrayList<Future<Boolean>> results = new ArrayList<Future<Boolean>>();
        results.add(this.threadPool.submit(publisher));
        results.add(this.threadPool.submit(durableSubscriber));
        for (Future future : results) {
            Assert.assertTrue((boolean)((Boolean)future.get()));
        }
    }
}

