package org.apache.activemq.transport.vm;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/vm/VMTransportThreadSafeTest.class */
public class VMTransportThreadSafeTest {
    private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
    private static final String location1 = "vm://transport1";
    private static final String location2 = "vm://transport2";
    private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<>();
    private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/transport/vm/VMTransportThreadSafeTest$DummyCommand.class */
    public class DummyCommand extends BaseCommand {
        public final int sequenceId;

        public DummyCommand() {
            this.sequenceId = 0;
        }

        public DummyCommand(int i) {
            this.sequenceId = i;
        }

        public Response visit(CommandVisitor commandVisitor) throws Exception {
            return null;
        }

        public byte getDataStructureType() {
            return (byte) 42;
        }
    }

    /* loaded from: input_file:org/apache/activemq/transport/vm/VMTransportThreadSafeTest$GatedVMTestTransportListener.class */
    private class GatedVMTestTransportListener extends VMTestTransportListener {
        private final CountDownLatch gate;

        public GatedVMTestTransportListener(VMTransportThreadSafeTest vMTransportThreadSafeTest, Queue<DummyCommand> queue) {
            this(queue, new CountDownLatch(1));
        }

        public GatedVMTestTransportListener(Queue<DummyCommand> queue, CountDownLatch countDownLatch) {
            super(queue);
            this.gate = countDownLatch;
        }

        @Override // org.apache.activemq.transport.vm.VMTransportThreadSafeTest.VMTestTransportListener
        public void onCommand(Object obj) {
            super.onCommand(obj);
            try {
                this.gate.await();
            } catch (InterruptedException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/transport/vm/VMTransportThreadSafeTest$SlowVMTestTransportListener.class */
    public class SlowVMTestTransportListener extends VMTestTransportListener {
        private final TimeUnit delayUnit;
        private final long delay;

        public SlowVMTestTransportListener(VMTransportThreadSafeTest vMTransportThreadSafeTest, Queue<DummyCommand> queue) {
            this(queue, 10L, TimeUnit.MILLISECONDS);
        }

        public SlowVMTestTransportListener(Queue<DummyCommand> queue, long j, TimeUnit timeUnit) {
            super(queue);
            this.delay = j;
            this.delayUnit = timeUnit;
        }

        @Override // org.apache.activemq.transport.vm.VMTransportThreadSafeTest.VMTestTransportListener
        public void onCommand(Object obj) {
            super.onCommand(obj);
            try {
                this.delayUnit.sleep(this.delay);
            } catch (InterruptedException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/transport/vm/VMTransportThreadSafeTest$VMResponderTransportListener.class */
    public class VMResponderTransportListener implements TransportListener {
        protected final Queue<DummyCommand> received;
        private final Transport peer;

        public VMResponderTransportListener(Queue<DummyCommand> queue, Transport transport) {
            this.received = queue;
            this.peer = transport;
        }

        public void onCommand(Object obj) {
            if (obj instanceof ShutdownInfo) {
                return;
            }
            this.received.add((DummyCommand) obj);
            if (this.peer != null) {
                try {
                    this.peer.oneway(obj);
                } catch (IOException e) {
                }
            }
        }

        public void onException(IOException iOException) {
        }

        public void transportInterupted() {
        }

        public void transportResumed() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/transport/vm/VMTransportThreadSafeTest$VMTestTransportListener.class */
    public class VMTestTransportListener implements TransportListener {
        protected final Queue<DummyCommand> received;
        public boolean shutdownReceived = false;

        public VMTestTransportListener(Queue<DummyCommand> queue) {
            this.received = queue;
        }

        public void onCommand(Object obj) {
            if (obj instanceof ShutdownInfo) {
                this.shutdownReceived = true;
            } else {
                this.received.add((DummyCommand) obj);
            }
        }

        public void onException(IOException iOException) {
        }

        public void transportInterupted() {
        }

        public void transportResumed() {
        }
    }

    private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> concurrentLinkedQueue) {
        Iterator<DummyCommand> it = concurrentLinkedQueue.iterator();
        while (it.hasNext()) {
            int i = it.next().sequenceId;
            Assert.assertTrue("Last id: 0 should be less than current id: " + i, i > 0);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.localReceived.clear();
        this.remoteReceived.clear();
    }

    @After
    public void tearDown() throws Exception {
    }

    @Test(timeout = 60000)
    public void testStartWthoutListenerIOE() throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport2.setTransportListener(new VMTestTransportListener(this.localReceived));
        try {
            vMTransport.start();
            Assert.fail("Should have thrown an IOExcoption");
        } catch (IOException e) {
        }
    }

    @Test(timeout = 60000)
    public void testOnewayOnStoppedTransportTDE() throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        vMTransport.start();
        vMTransport.stop();
        try {
            vMTransport.oneway(new DummyCommand());
            Assert.fail("Should have thrown a TransportDisposedException");
        } catch (TransportDisposedIOException e) {
        }
    }

    @Test(timeout = 60000)
    public void testStopSendsShutdownToPeer() throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        final VMTestTransportListener vMTestTransportListener = new VMTestTransportListener(this.remoteReceived);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(vMTestTransportListener);
        vMTransport.start();
        vMTransport.stop();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.1
            public boolean isSatisified() throws Exception {
                return vMTestTransportListener.shutdownReceived;
            }
        }));
    }

    @Test(timeout = 60000)
    public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        final Response[] responseArr = new Response[1];
        ResponseCorrelator responseCorrelator = new ResponseCorrelator(vMTransport);
        responseCorrelator.setTransportListener(new VMTestTransportListener(this.localReceived));
        responseCorrelator.start();
        responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.2
            public void onCompletion(FutureResponse futureResponse) {
                try {
                    responseArr[0] = futureResponse.getResult();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        vMTransport2.stop();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.3
            public boolean isSatisified() throws Exception {
                VMTransportThreadSafeTest.LOG.info("answer: " + responseArr[0]);
                return (responseArr[0] instanceof ExceptionResponse) && (responseArr[0].getException() instanceof TransportDisposedIOException);
            }
        }));
        vMTransport.stop();
    }

    @Test(timeout = 60000)
    public void testMultipleStartsAndStops() throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        vMTransport.start();
        vMTransport2.start();
        vMTransport.start();
        vMTransport2.start();
        for (int i = 0; i < 100; i++) {
            vMTransport.oneway(new DummyCommand());
        }
        for (int i2 = 0; i2 < 100; i2++) {
            vMTransport2.oneway(new DummyCommand());
        }
        vMTransport.start();
        vMTransport2.start();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.4
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 100;
            }
        }));
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.5
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.localReceived.size() == 100;
            }
        }));
        vMTransport.stop();
        vMTransport.stop();
        vMTransport2.stop();
        vMTransport2.stop();
    }

    @Test(timeout = 60000)
    public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
        doTestStartWithPeerNotStartedEnqueusCommands(false);
    }

    private void doTestStartWithPeerNotStartedEnqueusCommands(boolean z) throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport2.setAsync(z);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        vMTransport.start();
        for (int i = 0; i < 100; i++) {
            vMTransport.oneway(new DummyCommand());
        }
        Assert.assertEquals(100L, vMTransport2.getMessageQueue().size());
        vMTransport2.start();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.6
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 100;
            }
        }));
        vMTransport.stop();
        vMTransport2.stop();
    }

    @Test(timeout = 60000)
    public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
        doTestBlockedOnewayEnqeueAandStopTransport(true);
    }

    @Test(timeout = 60000)
    public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
        doTestBlockedOnewayEnqeueAandStopTransport(false);
    }

    private void doTestBlockedOnewayEnqeueAandStopTransport(boolean z) throws Exception {
        final VMTransport vMTransport = new VMTransport(new URI(location1));
        final VMTransport vMTransport2 = new VMTransport(new URI(location2));
        final AtomicInteger atomicInteger = new AtomicInteger();
        vMTransport2.setAsync(z);
        vMTransport2.setAsyncQueueDepth(99);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        vMTransport.start();
        new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.7
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        vMTransport.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        }).start();
        LOG.debug("Started async delivery, wait for remote's queue to fill up");
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.8
            public boolean isSatisified() throws Exception {
                return vMTransport2.getMessageQueue().remainingCapacity() == 0;
            }
        }));
        LOG.debug("Remote messageQ is full, start it and stop all");
        vMTransport2.start();
        vMTransport.stop();
        vMTransport2.stop();
    }

    @Test(timeout = 60000)
    public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
        final VMTransport vMTransport = new VMTransport(new URI(location1));
        final VMTransport vMTransport2 = new VMTransport(new URI(location2));
        final AtomicInteger atomicInteger = new AtomicInteger();
        vMTransport2.setAsync(true);
        vMTransport2.setAsyncQueueDepth(2);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new GatedVMTestTransportListener(this, this.remoteReceived));
        vMTransport.start();
        vMTransport2.start();
        new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.9
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 3; i++) {
                    try {
                        vMTransport.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        }).start();
        LOG.debug("Started async delivery, wait for remote's queue to fill up");
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.10
            public boolean isSatisified() throws Exception {
                return vMTransport2.getMessageQueue().remainingCapacity() == 0;
            }
        }));
        LOG.debug("Starting async gate open.");
        new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.11
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                ((GatedVMTestTransportListener) vMTransport2.getTransportListener()).gate.countDown();
            }
        }).start();
        vMTransport2.stop();
        vMTransport.stop();
        Assert.assertEquals(1L, this.remoteReceived.size());
        assertMessageAreOrdered(this.remoteReceived);
    }

    @Test(timeout = 60000)
    public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
        doTestStopWhileStartingWithNoAsyncLimit(true, 49);
    }

    @Test(timeout = 60000)
    public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
        doTestStopWhileStartingWithNoAsyncLimit(false, 100);
    }

    private void doTestStopWhileStartingWithNoAsyncLimit(boolean z, final int i) throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        final VMTransport vMTransport2 = new VMTransport(new URI(location2));
        vMTransport2.setAsync(z);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new SlowVMTestTransportListener(this, this.remoteReceived));
        vMTransport.start();
        for (int i2 = 0; i2 < 100; i2++) {
            vMTransport.oneway(new DummyCommand(i2));
        }
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.12
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(1000L);
                    vMTransport2.stop();
                } catch (Exception e) {
                }
            }
        });
        vMTransport2.start();
        thread.start();
        Assert.assertTrue("Remote should receive: " + i + ", commands but got: " + this.remoteReceived.size(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.13
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() >= i;
            }
        }));
        LOG.debug("Remote listener received " + this.remoteReceived.size() + " messages");
        vMTransport.stop();
        Assert.assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.14
            public boolean isSatisified() throws Exception {
                return vMTransport2.isDisposed();
            }
        }));
    }

    @Test(timeout = 120000)
    public void TestTwoWayMessageThroughPutSync() throws Exception {
        long j = 0;
        for (int i = 0; i < 20; i++) {
            j += doTestTwoWayMessageThroughPut(false);
        }
        LOG.info("Total time of one way sync send throughput test: " + (j / 20) + "ms");
    }

    @Test(timeout = 120000)
    public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
        long j = 0;
        for (int i = 0; i < 50; i++) {
            j += doTestTwoWayMessageThroughPut(false);
        }
        LOG.info("Total time of one way async send throughput test: " + (j / 50) + "ms");
    }

    private long doTestTwoWayMessageThroughPut(boolean z) throws Exception {
        final VMTransport vMTransport = new VMTransport(new URI(location1));
        final VMTransport vMTransport2 = new VMTransport(new URI(location2));
        final AtomicInteger atomicInteger = new AtomicInteger();
        vMTransport2.setAsync(z);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        vMTransport.start();
        vMTransport2.start();
        long currentTimeMillis = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.15
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 200000; i++) {
                    try {
                        vMTransport.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        });
        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.16
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 200000; i++) {
                    try {
                        vMTransport2.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        });
        thread.start();
        thread2.start();
        thread.join();
        thread2.join();
        long currentTimeMillis2 = System.currentTimeMillis();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.17
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 200000;
            }
        }));
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.18
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.localReceived.size() == 200000;
            }
        }));
        LOG.debug("All messages sent,stop all");
        vMTransport.stop();
        vMTransport2.stop();
        this.localReceived.clear();
        this.remoteReceived.clear();
        return currentTimeMillis2 - currentTimeMillis;
    }

    @Test(timeout = 120000)
    public void TestOneWayMessageThroughPutSync() throws Exception {
        long j = 0;
        for (int i = 0; i < 30; i++) {
            j += doTestOneWayMessageThroughPut(false);
        }
        LOG.info("Total time of one way sync send throughput test: " + (j / 30) + "ms");
    }

    @Test(timeout = 120000)
    public void TestOneWayMessageThroughPutAsnyc() throws Exception {
        long j = 0;
        for (int i = 0; i < 20; i++) {
            j += doTestOneWayMessageThroughPut(true);
        }
        LOG.info("Total time of one way async send throughput test: " + (j / 20) + "ms");
    }

    private long doTestOneWayMessageThroughPut(boolean z) throws Exception {
        final VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        final AtomicInteger atomicInteger = new AtomicInteger();
        vMTransport2.setAsync(z);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        vMTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        vMTransport2.setTransportListener(new VMTestTransportListener(this.remoteReceived));
        vMTransport.start();
        vMTransport2.start();
        long currentTimeMillis = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.19
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    try {
                        vMTransport.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        });
        thread.start();
        thread.join();
        long currentTimeMillis2 = System.currentTimeMillis();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.20
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 100000;
            }
        }));
        LOG.debug("All messages sent,stop all");
        vMTransport.stop();
        vMTransport2.stop();
        this.localReceived.clear();
        this.remoteReceived.clear();
        return currentTimeMillis2 - currentTimeMillis;
    }

    @Test(timeout = 120000)
    public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
        for (int i = 0; i < 20; i++) {
            doTestTwoWayTrafficWithMutexTransport(false, false);
        }
    }

    @Test(timeout = 120000)
    public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
        for (int i = 0; i < 20; i++) {
            doTestTwoWayTrafficWithMutexTransport(true, false);
        }
    }

    @Test(timeout = 120000)
    public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
        for (int i = 0; i < 20; i++) {
            doTestTwoWayTrafficWithMutexTransport(false, true);
        }
    }

    @Test(timeout = 120000)
    public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
        for (int i = 0; i < 20; i++) {
            doTestTwoWayTrafficWithMutexTransport(false, false);
        }
    }

    public void doTestTwoWayTrafficWithMutexTransport(boolean z, boolean z2) throws Exception {
        VMTransport vMTransport = new VMTransport(new URI(location1));
        VMTransport vMTransport2 = new VMTransport(new URI(location2));
        final MutexTransport mutexTransport = new MutexTransport(vMTransport);
        final MutexTransport mutexTransport2 = new MutexTransport(vMTransport2);
        final AtomicInteger atomicInteger = new AtomicInteger();
        vMTransport.setAsync(z);
        vMTransport2.setAsync(z2);
        vMTransport.setPeer(vMTransport2);
        vMTransport2.setPeer(vMTransport);
        mutexTransport.setTransportListener(new VMTestTransportListener(this.localReceived));
        mutexTransport2.setTransportListener(new VMResponderTransportListener(this.remoteReceived, mutexTransport2));
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.21
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 200000; i++) {
                    try {
                        mutexTransport.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        });
        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.22
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 200000; i++) {
                    try {
                        mutexTransport2.oneway(new DummyCommand(atomicInteger.incrementAndGet()));
                    } catch (Exception e) {
                    }
                }
            }
        });
        thread.start();
        thread2.start();
        Thread.sleep(10L);
        mutexTransport.start();
        mutexTransport2.start();
        thread.join();
        thread2.join();
        Assert.assertTrue("Remote should have received (200000) but got ()" + this.remoteReceived.size(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.23
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.remoteReceived.size() == 200000;
            }
        }));
        Assert.assertTrue("Local should have received (400000) but got ()" + this.localReceived.size(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.vm.VMTransportThreadSafeTest.24
            public boolean isSatisified() throws Exception {
                return VMTransportThreadSafeTest.this.localReceived.size() == 400000;
            }
        }));
        LOG.debug("All messages sent,stop all");
        mutexTransport.stop();
        mutexTransport2.stop();
        this.localReceived.clear();
        this.remoteReceived.clear();
    }
}
