/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.vm;

import java.io.IOException;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VMTransportThreadSafeTest {
    private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
    private static final String location1 = "vm://transport1";
    private static final String location2 = "vm://transport2";
    private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue();

    private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
        int lastSequenceId = 0;
        for (DummyCommand command : queue) {
            int id = command.sequenceId;
            Assert.assertTrue((String)("Last id: " + lastSequenceId + " should be less than current id: " + id), (id > lastSequenceId ? 1 : 0) != 0);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.localReceived.clear();
        this.remoteReceived.clear();
    }

    @After
    public void tearDown() throws Exception {
    }

    @Test(timeout=60000L)
    public void testStartWthoutListenerIOE() throws Exception {
        VMTransport local = new VMTransport(new URI(location1));
        VMTransport remote = new VMTransport(new URI(location2));
        local.setPeer(remote);
        remote.setPeer(local);
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        try {
            local.start();
            Assert.fail((String)"Should have thrown an IOExcoption");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test(timeout=60000L)
    public void testOnewayOnStoppedTransportTDE() throws Exception {
        VMTransport local = new VMTransport(new URI(location1));
        VMTransport remote = new VMTransport(new URI(location2));
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.remoteReceived));
        local.start();
        local.stop();
        try {
            local.oneway((Object)new DummyCommand());
            Assert.fail((String)"Should have thrown a TransportDisposedException");
        }
        catch (TransportDisposedIOException transportDisposedIOException) {
            // empty catch block
        }
    }

    @Test(timeout=60000L)
    public void testStopSendsShutdownToPeer() throws Exception {
        VMTransport local = new VMTransport(new URI(location1));
        VMTransport remote = new VMTransport(new URI(location2));
        local.setPeer(remote);
        remote.setPeer(local);
        final VMTestTransportListener remoteListener = new VMTestTransportListener(this.remoteReceived);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)remoteListener);
        local.start();
        local.stop();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return remoteListener.shutdownReceived;
            }
        }));
    }

    @Test(timeout=60000L)
    public void testMultipleStartsAndStops() throws Exception {
        int i;
        VMTransport local = new VMTransport(new URI(location1));
        VMTransport remote = new VMTransport(new URI(location2));
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.remoteReceived));
        local.start();
        remote.start();
        local.start();
        remote.start();
        for (i = 0; i < 100; ++i) {
            local.oneway((Object)new DummyCommand());
        }
        for (i = 0; i < 100; ++i) {
            remote.oneway((Object)new DummyCommand());
        }
        local.start();
        remote.start();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 100;
            }
        }));
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.localReceived.size() == 100;
            }
        }));
        local.stop();
        local.stop();
        remote.stop();
        remote.stop();
    }

    @Test(timeout=60000L)
    public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
        this.doTestStartWithPeerNotStartedEnqueusCommands(false);
    }

    private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
        VMTransport local = new VMTransport(new URI(location1));
        VMTransport remote = new VMTransport(new URI(location2));
        remote.setAsync(async);
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.remoteReceived));
        local.start();
        for (int i = 0; i < 100; ++i) {
            local.oneway((Object)new DummyCommand());
        }
        Assert.assertEquals((long)100L, (long)remote.getMessageQueue().size());
        remote.start();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 100;
            }
        }));
        local.stop();
        remote.stop();
    }

    @Test(timeout=60000L)
    public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
        this.doTestBlockedOnewayEnqeueAandStopTransport(true);
    }

    @Test(timeout=60000L)
    public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
        this.doTestBlockedOnewayEnqeueAandStopTransport(false);
    }

    private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
        final VMTransport local = new VMTransport(new URI(location1));
        final VMTransport remote = new VMTransport(new URI(location2));
        final AtomicInteger sequenceId = new AtomicInteger();
        remote.setAsync(async);
        remote.setAsyncQueueDepth(99);
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.remoteReceived));
        local.start();
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 100; ++i) {
                    try {
                        local.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        t.start();
        LOG.debug("Started async delivery, wait for remote's queue to fill up");
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return remote.getMessageQueue().remainingCapacity() == 0;
            }
        }));
        LOG.debug("Remote messageQ is full, start it and stop all");
        remote.start();
        local.stop();
        remote.stop();
    }

    @Test(timeout=60000L)
    public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
        final VMTransport local = new VMTransport(new URI(location1));
        final VMTransport remote = new VMTransport(new URI(location2));
        final AtomicInteger sequenceId = new AtomicInteger();
        remote.setAsync(true);
        remote.setAsyncQueueDepth(2);
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new GatedVMTestTransportListener(this.remoteReceived));
        local.start();
        remote.start();
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 3; ++i) {
                    try {
                        local.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        t.start();
        LOG.debug("Started async delivery, wait for remote's queue to fill up");
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return remote.getMessageQueue().remainingCapacity() == 0;
            }
        }));
        LOG.debug("Starting async gate open.");
        Thread gateman = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                ((GatedVMTestTransportListener)remote.getTransportListener()).gate.countDown();
            }
        });
        gateman.start();
        Thread.sleep(10L);
        remote.stop();
        local.stop();
        Assert.assertEquals((long)1L, (long)this.remoteReceived.size());
        this.assertMessageAreOrdered(this.remoteReceived);
    }

    @Test(timeout=60000L)
    public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
        this.doTestStopWhileStartingWithNoAsyncLimit(true, 49);
    }

    @Test(timeout=60000L)
    public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
        this.doTestStopWhileStartingWithNoAsyncLimit(false, 100);
    }

    private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
        VMTransport local = new VMTransport(new URI(location1));
        final VMTransport remote = new VMTransport(new URI(location2));
        remote.setAsync(async);
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new SlowVMTestTransportListener(this.remoteReceived));
        local.start();
        for (int i = 0; i < 100; ++i) {
            local.oneway((Object)new DummyCommand(i));
        }
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(1000L);
                    remote.stop();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        remote.start();
        t.start();
        Assert.assertTrue((String)("Remote should receive: " + expect + ", commands but got: " + this.remoteReceived.size()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() >= expect;
            }
        }));
        LOG.debug("Remote listener received " + this.remoteReceived.size() + " messages");
        local.stop();
        Assert.assertTrue((String)"Remote transport never was disposed.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return remote.isDisposed();
            }
        }));
    }

    @Test(timeout=120000L)
    public void TestTwoWayMessageThroughPutSync() throws Exception {
        long totalTimes = 0L;
        long executions = 20L;
        for (int i = 0; i < 20; ++i) {
            totalTimes += this.doTestTwoWayMessageThroughPut(false);
        }
        LOG.info("Total time of one way sync send throughput test: " + totalTimes / 20L + "ms");
    }

    @Test(timeout=120000L)
    public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
        long totalTimes = 0L;
        long executions = 50L;
        int i = 0;
        while ((long)i < 50L) {
            totalTimes += this.doTestTwoWayMessageThroughPut(false);
            ++i;
        }
        LOG.info("Total time of one way async send throughput test: " + totalTimes / 50L + "ms");
    }

    private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
        final VMTransport local = new VMTransport(new URI(location1));
        final VMTransport remote = new VMTransport(new URI(location2));
        final AtomicInteger sequenceId = new AtomicInteger();
        remote.setAsync(async);
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.remoteReceived));
        int messageCount = 200000;
        local.start();
        remote.start();
        long startTime = System.currentTimeMillis();
        Thread localSend = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 200000; ++i) {
                    try {
                        local.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        Thread remoteSend = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 200000; ++i) {
                    try {
                        remote.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        localSend.start();
        remoteSend.start();
        localSend.join();
        remoteSend.join();
        long endTime = System.currentTimeMillis();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 200000;
            }
        }));
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.localReceived.size() == 200000;
            }
        }));
        LOG.debug("All messages sent,stop all");
        local.stop();
        remote.stop();
        this.localReceived.clear();
        this.remoteReceived.clear();
        return endTime - startTime;
    }

    @Test(timeout=120000L)
    public void TestOneWayMessageThroughPutSync() throws Exception {
        long totalTimes = 0L;
        long executions = 30L;
        int i = 0;
        while ((long)i < 30L) {
            totalTimes += this.doTestOneWayMessageThroughPut(false);
            ++i;
        }
        LOG.info("Total time of one way sync send throughput test: " + totalTimes / 30L + "ms");
    }

    @Test(timeout=120000L)
    public void TestOneWayMessageThroughPutAsnyc() throws Exception {
        long totalTimes = 0L;
        long executions = 20L;
        for (int i = 0; i < 20; ++i) {
            totalTimes += this.doTestOneWayMessageThroughPut(true);
        }
        LOG.info("Total time of one way async send throughput test: " + totalTimes / 20L + "ms");
    }

    private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
        final VMTransport local = new VMTransport(new URI(location1));
        VMTransport remote = new VMTransport(new URI(location2));
        final AtomicInteger sequenceId = new AtomicInteger();
        remote.setAsync(async);
        local.setPeer(remote);
        remote.setPeer(local);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMTestTransportListener(this.remoteReceived));
        int messageCount = 100000;
        local.start();
        remote.start();
        long startTime = System.currentTimeMillis();
        Thread localSend = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 100000; ++i) {
                    try {
                        local.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        localSend.start();
        localSend.join();
        long endTime = System.currentTimeMillis();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 100000;
            }
        }));
        LOG.debug("All messages sent,stop all");
        local.stop();
        remote.stop();
        this.localReceived.clear();
        this.remoteReceived.clear();
        return endTime - startTime;
    }

    @Test(timeout=120000L)
    public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
        for (int i = 0; i < 20; ++i) {
            this.doTestTwoWayTrafficWithMutexTransport(false, false);
        }
    }

    @Test(timeout=120000L)
    public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
        for (int i = 0; i < 20; ++i) {
            this.doTestTwoWayTrafficWithMutexTransport(true, false);
        }
    }

    @Test(timeout=120000L)
    public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
        for (int i = 0; i < 20; ++i) {
            this.doTestTwoWayTrafficWithMutexTransport(false, true);
        }
    }

    @Test(timeout=120000L)
    public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
        for (int i = 0; i < 20; ++i) {
            this.doTestTwoWayTrafficWithMutexTransport(false, false);
        }
    }

    public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
        VMTransport vmlocal = new VMTransport(new URI(location1));
        VMTransport vmremote = new VMTransport(new URI(location2));
        final MutexTransport local = new MutexTransport((Transport)vmlocal);
        final MutexTransport remote = new MutexTransport((Transport)vmremote);
        final AtomicInteger sequenceId = new AtomicInteger();
        vmlocal.setAsync(localAsync);
        vmremote.setAsync(remoteAsync);
        vmlocal.setPeer(vmremote);
        vmremote.setPeer(vmlocal);
        local.setTransportListener((TransportListener)new VMTestTransportListener(this.localReceived));
        remote.setTransportListener((TransportListener)new VMResponderTransportListener(this.remoteReceived, (Transport)remote));
        int messageCount = 200000;
        Thread localSend = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 200000; ++i) {
                    try {
                        local.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        Thread remoteSend = new Thread(new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 200000; ++i) {
                    try {
                        remote.oneway((Object)new DummyCommand(sequenceId.incrementAndGet()));
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        });
        localSend.start();
        remoteSend.start();
        Thread.sleep(10L);
        local.start();
        remote.start();
        localSend.join();
        remoteSend.join();
        Assert.assertTrue((String)("Remote should have received (200000) but got ()" + this.remoteReceived.size()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 200000;
            }
        }));
        Assert.assertTrue((String)("Local should have received (400000) but got ()" + this.localReceived.size()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.localReceived.size() == 400000;
            }
        }));
        LOG.debug("All messages sent,stop all");
        local.stop();
        remote.stop();
        this.localReceived.clear();
        this.remoteReceived.clear();
    }

    private class GatedVMTestTransportListener
    extends VMTestTransportListener {
        private final CountDownLatch gate;

        public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
            this(receiveQueue, new CountDownLatch(1));
        }

        public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
            super(receiveQueue);
            this.gate = gate;
        }

        @Override
        public void onCommand(Object command) {
            super.onCommand(command);
            try {
                this.gate.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class SlowVMTestTransportListener
    extends VMTestTransportListener {
        private final TimeUnit delayUnit;
        private final long delay;

        public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
            this(receiveQueue, 10L, TimeUnit.MILLISECONDS);
        }

        public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
            super(receiveQueue);
            this.delay = delay;
            this.delayUnit = delayUnit;
        }

        @Override
        public void onCommand(Object command) {
            super.onCommand(command);
            try {
                this.delayUnit.sleep(this.delay);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class VMResponderTransportListener
    implements TransportListener {
        protected final Queue<DummyCommand> received;
        private final Transport peer;

        public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
            this.received = receiveQueue;
            this.peer = peer;
        }

        public void onCommand(Object command) {
            if (command instanceof ShutdownInfo) {
                return;
            }
            this.received.add((DummyCommand)((Object)command));
            if (this.peer != null) {
                try {
                    this.peer.oneway(command);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        }

        public void onException(IOException error) {
        }

        public void transportInterupted() {
        }

        public void transportResumed() {
        }
    }

    private class VMTestTransportListener
    implements TransportListener {
        protected final Queue<DummyCommand> received;
        public boolean shutdownReceived = false;

        public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
            this.received = receiveQueue;
        }

        public void onCommand(Object command) {
            if (command instanceof ShutdownInfo) {
                this.shutdownReceived = true;
            } else {
                this.received.add((DummyCommand)((Object)command));
            }
        }

        public void onException(IOException error) {
        }

        public void transportInterupted() {
        }

        public void transportResumed() {
        }
    }

    private class DummyCommand
    extends BaseCommand {
        public final int sequenceId;

        public DummyCommand() {
            this.sequenceId = 0;
        }

        public DummyCommand(int id) {
            this.sequenceId = id;
        }

        public Response visit(CommandVisitor visitor) throws Exception {
            return null;
        }

        public byte getDataStructureType() {
            return 42;
        }
    }
}

