/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.File;
import java.util.HashSet;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.util.LoggingBrokerPlugin;
import org.apache.activemq.bugs.Configurer;
import org.apache.activemq.bugs.TeardownTask;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ2149Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class);
    @Rule
    public TestName testName = new TestName();
    private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
    private static final String DEFAULT_BROKER_URL = "failover:(tcp://localhost:61617)?maxReconnectDelay=1000&useExponentialBackOff=false";
    private final String SEQ_NUM_PROPERTY = "seqNum";
    final int MESSAGE_LENGTH_BYTES = 76800;
    final long SLEEP_BETWEEN_SEND_MS = 25L;
    final int NUM_SENDERS_AND_RECEIVERS = 10;
    final Object brokerLock = new Object();
    private static final long DEFAULT_BROKER_STOP_PERIOD = 10000L;
    private static final long DEFAULT_NUM_TO_SEND = 1400L;
    long brokerStopPeriod = 10000L;
    long numtoSend = 1400L;
    long sleepBetweenSend = 25L;
    String brokerURL = "failover:(tcp://localhost:61617)?maxReconnectDelay=1000&useExponentialBackOff=false";
    int numBrokerRestarts = 0;
    static final int MAX_BROKER_RESTARTS = 4;
    BrokerService broker;
    Vector<Throwable> exceptions = new Vector();
    protected File dataDirFile;
    final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
    HashSet<Connection> connections = new HashSet();

    public void createBroker(Configurer configurer) throws Exception {
        this.broker = new BrokerService();
        this.configurePersistenceAdapter(this.broker);
        this.broker.getSystemUsage().getMemoryUsage().setLimit(153600000L);
        this.broker.addConnector(BROKER_CONNECTOR);
        this.broker.setBrokerName(this.testName.getMethodName());
        this.broker.setDataDirectoryFile(this.dataDirFile);
        if (configurer != null) {
            configurer.configure(this.broker);
        }
        this.broker.start();
    }

    protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
    }

    @Before
    public void setUp() throws Exception {
        LOG.debug("Starting test {}", (Object)this.testName.getMethodName());
        this.dataDirFile = new File("target/" + this.testName.getMethodName());
        this.numtoSend = 1400L;
        this.brokerStopPeriod = 10000L;
        this.sleepBetweenSend = 25L;
        this.brokerURL = DEFAULT_BROKER_URL;
    }

    @After
    public void tearDown() throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Boolean> future = executor.submit(new TeardownTask(this.brokerLock, this.broker));
        try {
            LOG.debug("Teardown started.");
            long start = System.currentTimeMillis();
            Boolean result = future.get(30L, TimeUnit.SECONDS);
            long finish = System.currentTimeMillis();
            LOG.debug("Result of teardown: {} after {} ms ", (Object)result, (Object)(finish - start));
        }
        catch (TimeoutException e) {
            Assert.fail((String)"Teardown timed out");
            AutoFailTestSupport.dumpAllThreads((String)this.testName.getMethodName());
        }
        executor.shutdownNow();
        this.exceptions.clear();
    }

    private String buildLongString() {
        StringBuilder stringBuilder = new StringBuilder(76800);
        for (int i = 0; i < 76800; ++i) {
            stringBuilder.append((int)(Math.random() * 10.0));
        }
        return stringBuilder.toString();
    }

    public void x_testRestartReReceive() throws Exception {
        this.createBroker(new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
                broker.deleteAllMessages();
            }
        });
        ActiveMQDestination destination = ActiveMQDestination.createDestination((String)"test.dest.X", (byte)1);
        Thread thread = new Thread(new Sender((Destination)destination));
        thread.start();
        thread.join();
        Connection connection = new ActiveMQConnectionFactory(this.brokerURL).createConnection();
        connection.setClientID(destination.toString());
        Session session = connection.createSession(true, 0);
        MessageConsumer messageConsumer = session.createConsumer((Destination)destination);
        connection.start();
        int batch = 200;
        TimerTask restartTask = this.schedualRestartTask(null, new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
            }
        });
        long expectedSeq = 0L;
        for (int s = 0; s < 4; ++s) {
            for (int i = 0; i < batch; ++i) {
                Message message = messageConsumer.receive(20000L);
                Assert.assertNotNull((String)("s:" + s + ", i:" + i), (Object)message);
                long seqNum = message.getLongProperty("seqNum");
                Assert.assertEquals((String)("expected order s:" + s), (long)expectedSeq++, (long)seqNum);
                if (i <= 0 || i % 600 != 0) continue;
                LOG.info("Commit on %5");
            }
            restartTask.run();
        }
    }

    public void vanilaVerify_testOrder() throws Exception {
        this.createBroker(new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
                broker.deleteAllMessages();
            }
        });
        this.verifyOrderedMessageReceipt();
        this.verifyStats(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=300000L)
    public void testOrderWithRestart() throws Exception {
        this.createBroker(new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
                broker.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        this.schedualRestartTask(timer, new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
            }
        });
        try {
            this.verifyOrderedMessageReceipt();
        }
        finally {
            timer.cancel();
        }
        this.verifyStats(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=300000L)
    public void testTopicOrderWithRestart() throws Exception {
        this.createBroker(new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
                broker.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        this.schedualRestartTask(timer, null);
        try {
            this.verifyOrderedMessageReceipt((byte)2);
        }
        finally {
            timer.cancel();
        }
        this.verifyStats(true);
    }

    @Test(timeout=300000L)
    public void testQueueTransactionalOrderWithRestart() throws Exception {
        this.doTestTransactionalOrderWithRestart((byte)1);
    }

    @Test(timeout=300000L)
    public void testTopicTransactionalOrderWithRestart() throws Exception {
        this.doTestTransactionalOrderWithRestart((byte)2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
        this.numtoSend = 10000L;
        this.sleepBetweenSend = 3L;
        this.brokerStopPeriod = 10000L;
        this.createBroker(new Configurer(){

            @Override
            public void configure(BrokerService broker) throws Exception {
                broker.deleteAllMessages();
            }
        });
        Timer timer = new Timer();
        this.schedualRestartTask(timer, null);
        try {
            this.verifyOrderedMessageReceipt(destinationType, 1, true);
        }
        finally {
            timer.cancel();
        }
        this.verifyStats(true);
    }

    private void verifyStats(boolean brokerRestarts) throws Exception {
        RegionBroker regionBroker = (RegionBroker)this.broker.getRegionBroker();
        for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
            DestinationStatistics stats = dest.getDestinationStatistics();
            if (brokerRestarts) {
                LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName() + " " + stats.getEnqueues().getCount() + " <= " + stats.getDequeues().getCount());
                continue;
            }
            Assert.assertEquals((String)("qneue/dequeue match for: " + dest.getName()), (long)stats.getEnqueues().getCount(), (long)stats.getDequeues().getCount());
        }
    }

    private TimerTask schedualRestartTask(Timer timer, Configurer configurer) {
        class RestartTask
        extends TimerTask {
            final /* synthetic */ Configurer val$configurer;
            final /* synthetic */ Timer val$timer;

            RestartTask() {
                this.val$configurer = configurer;
                this.val$timer = timer;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = AMQ2149Test.this.brokerLock;
                synchronized (object) {
                    LOG.info("stopping broker..");
                    try {
                        AMQ2149Test.this.broker.stop();
                        AMQ2149Test.this.broker.waitUntilStopped();
                    }
                    catch (Exception e) {
                        LOG.error("ex on broker stop", (Throwable)e);
                        AMQ2149Test.this.exceptions.add(e);
                    }
                    LOG.info("restarting broker");
                    try {
                        AMQ2149Test.this.createBroker(this.val$configurer);
                        AMQ2149Test.this.broker.waitUntilStarted();
                    }
                    catch (Exception e) {
                        LOG.error("ex on broker restart", (Throwable)e);
                        AMQ2149Test.this.exceptions.add(e);
                    }
                }
                if (++AMQ2149Test.this.numBrokerRestarts < 4 && this.val$timer != null) {
                    try {
                        this.val$timer.schedule((TimerTask)new RestartTask(), AMQ2149Test.this.brokerStopPeriod);
                    }
                    catch (IllegalStateException illegalStateException) {}
                } else {
                    LOG.info("no longer stopping broker on reaching Max restarts: 4");
                }
            }
        }
        RestartTask task = new RestartTask();
        if (timer != null) {
            timer.schedule((TimerTask)task, this.brokerStopPeriod);
        }
        return task;
    }

    private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
        this.verifyOrderedMessageReceipt(destinationType, 10, false);
    }

    private void verifyOrderedMessageReceipt() throws Exception {
        this.verifyOrderedMessageReceipt((byte)1, 10, false);
    }

    private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception {
        Vector<Thread> threads = new Vector<Thread>();
        Vector<Receiver> receivers = new Vector<Receiver>();
        for (int i = 0; i < concurrentPairs; ++i) {
            ActiveMQDestination destination = ActiveMQDestination.createDestination((String)("test.dest." + i), (byte)destinationType);
            receivers.add(new Receiver((Destination)destination, transactional));
            Thread thread = new Thread(new Sender((Destination)destination));
            thread.start();
            threads.add(thread);
        }
        long expiry = System.currentTimeMillis() + 240000L;
        while (!threads.isEmpty() && this.exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
            Thread sendThread = (Thread)threads.firstElement();
            sendThread.join(30000L);
            if (!sendThread.isAlive()) {
                threads.remove(sendThread);
                continue;
            }
            AutoFailTestSupport.dumpAllThreads((String)"Send blocked");
        }
        LOG.info("senders done..." + threads);
        while (!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
            Receiver receiver = (Receiver)receivers.firstElement();
            if (receiver.getNextExpectedSeqNo() < this.numtoSend && this.exceptions.isEmpty()) continue;
            receiver.close();
            receivers.remove(receiver);
        }
        for (Connection connection : this.connections) {
            try {
                connection.close();
            }
            catch (Exception ignored) {}
        }
        this.connections.clear();
        Assert.assertTrue((String)"No timeout waiting for senders/receivers to complete", (System.currentTimeMillis() < expiry ? 1 : 0) != 0);
        if (!this.exceptions.isEmpty()) {
            this.exceptions.get(0).printStackTrace();
        }
        LOG.info("Dangling threads: " + threads);
        for (Thread dangling : threads) {
            dangling.interrupt();
            dangling.join(10000L);
        }
        Assert.assertTrue((String)"No exceptions", (boolean)this.exceptions.isEmpty());
    }

    private class Sender
    implements Runnable {
        private final Destination dest;
        private final Connection connection;
        private final Session session;
        private final MessageProducer messageProducer;
        private volatile long nextSequenceNumber = 0L;

        public Sender(Destination dest) throws JMSException {
            this.dest = dest;
            this.connection = new ActiveMQConnectionFactory(AMQ2149Test.this.brokerURL).createConnection();
            this.session = this.connection.createSession(false, 1);
            this.messageProducer = this.session.createProducer(dest);
            this.messageProducer.setDeliveryMode(2);
            this.connection.start();
            AMQ2149Test.this.connections.add(this.connection);
        }

        @Override
        public void run() {
            String longString = AMQ2149Test.this.buildLongString();
            while (this.nextSequenceNumber < AMQ2149Test.this.numtoSend) {
                try {
                    TextMessage message = this.session.createTextMessage(longString);
                    message.setLongProperty("seqNum", this.nextSequenceNumber);
                    ++this.nextSequenceNumber;
                    this.messageProducer.send((Message)message);
                    if (this.nextSequenceNumber % 500L == 0L) {
                        LOG.info(this.dest + " sent " + this.nextSequenceNumber);
                    }
                }
                catch (javax.jms.IllegalStateException e) {
                    LOG.error(this.dest + " bailing on send error", (Throwable)e);
                    AMQ2149Test.this.exceptions.add(e);
                    break;
                }
                catch (Exception e) {
                    LOG.error(this.dest + " send error", (Throwable)e);
                    AMQ2149Test.this.exceptions.add(e);
                }
                if (AMQ2149Test.this.sleepBetweenSend <= 0L) continue;
                try {
                    Thread.sleep(AMQ2149Test.this.sleepBetweenSend);
                }
                catch (InterruptedException e) {
                    LOG.warn(this.dest + " sleep interrupted", (Throwable)e);
                }
            }
            try {
                this.connection.close();
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
        }
    }

    private class Receiver
    implements MessageListener {
        private final Destination dest;
        private final Connection connection;
        private final Session session;
        private final MessageConsumer messageConsumer;
        private volatile long nextExpectedSeqNum = 0L;
        private final boolean transactional;
        private String lastId = null;
        final int TRANSACITON_BATCH = 500;
        boolean resumeOnNextOrPreviousIsOk = false;

        public Receiver(Destination dest, boolean transactional) throws JMSException {
            this.dest = dest;
            this.transactional = transactional;
            this.connection = new ActiveMQConnectionFactory(AMQ2149Test.this.brokerURL).createConnection();
            this.connection.setClientID(dest.toString());
            this.session = this.connection.createSession(transactional, transactional ? 0 : 1);
            this.messageConsumer = ActiveMQDestination.transform((Destination)dest).isTopic() ? this.session.createDurableSubscriber((Topic)dest, dest.toString()) : this.session.createConsumer(dest);
            this.messageConsumer.setMessageListener((MessageListener)this);
            this.connection.start();
            AMQ2149Test.this.connections.add(this.connection);
        }

        public void close() throws JMSException {
            this.connection.close();
        }

        public long getNextExpectedSeqNo() {
            return this.nextExpectedSeqNum;
        }

        public void onMessage(Message message) {
            try {
                long seqNum = message.getLongProperty("seqNum");
                if (seqNum % 500L == 0L) {
                    LOG.info(this.dest + " received " + seqNum);
                    if (this.transactional) {
                        LOG.info("committing..");
                        this.session.commit();
                    }
                }
                if (this.resumeOnNextOrPreviousIsOk) {
                    if (seqNum != this.nextExpectedSeqNum && seqNum == this.nextExpectedSeqNum - 499L) {
                        this.nextExpectedSeqNum -= 499L;
                        LOG.info("In doubt commit failed, getting replay at:" + this.nextExpectedSeqNum);
                    }
                    this.resumeOnNextOrPreviousIsOk = false;
                }
                if (seqNum != this.nextExpectedSeqNum) {
                    LOG.warn(this.dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() + " expected " + this.nextExpectedSeqNum + ", lastId: " + this.lastId + ", message:" + message);
                    Assert.fail((String)(this.dest + " received " + seqNum + " expected " + this.nextExpectedSeqNum));
                }
                ++this.nextExpectedSeqNum;
                this.lastId = message.getJMSMessageID();
            }
            catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
                LOG.info("got rollback: " + (Object)((Object)expectedSometimesOnFailoverRecovery));
                if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) {
                    this.resumeOnNextOrPreviousIsOk = true;
                    ++this.nextExpectedSeqNum;
                    LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + this.nextExpectedSeqNum);
                } else {
                    this.resumeOnNextOrPreviousIsOk = false;
                    this.nextExpectedSeqNum -= 499L;
                }
            }
            catch (Throwable e) {
                LOG.error(this.dest + " onMessage error", e);
                AMQ2149Test.this.exceptions.add(e);
            }
        }
    }
}

