package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionStopStartTest.class */
public class SessionStopStartTest extends ActiveMQTestBase {
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    private ActiveMQServer server;
    private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
    private ServerLocator locator;

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.SessionStopStartTest$1MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionStopStartTest$1MyHandler.class */
    class C1MyHandler implements MessageHandler {
        boolean failed;
        boolean started = true;
        int count = 0;
        final /* synthetic */ ClientSession val$session;
        final /* synthetic */ CountDownLatch val$latch;

        C1MyHandler(ClientSession clientSession, CountDownLatch countDownLatch) {
            this.val$session = clientSession;
            this.val$latch = countDownLatch;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.count++;
                if (this.count == 10) {
                    clientMessage.acknowledge();
                    this.val$session.stop();
                    this.started = false;
                }
                this.val$latch.countDown();
            } catch (Exception e) {
            }
        }
    }

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.SessionStopStartTest$2MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionStopStartTest$2MyHandler.class */
    class C2MyHandler implements MessageHandler {
        boolean failed;
        boolean started = true;
        final /* synthetic */ CountDownLatch val$latch;
        final /* synthetic */ ClientConsumer val$consumer;

        C2MyHandler(CountDownLatch countDownLatch, ClientConsumer clientConsumer) {
            this.val$latch = countDownLatch;
            this.val$consumer = clientConsumer;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.val$latch.countDown();
                if (this.val$latch.getCount() == 0) {
                    clientMessage.acknowledge();
                    this.started = false;
                    this.val$consumer.setMessageHandler((MessageHandler) null);
                }
            } catch (Exception e) {
            }
        }
    }

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.SessionStopStartTest$3MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionStopStartTest$3MyHandler.class */
    class C3MyHandler implements MessageHandler {
        int messageReceived;
        boolean failed;
        boolean started;
        private final CountDownLatch latch;
        private boolean stop;
        final /* synthetic */ ClientSession val$session;

        public C3MyHandler(CountDownLatch countDownLatch, ClientSession clientSession) {
            this.val$session = clientSession;
            this.messageReceived = 0;
            this.started = true;
            this.stop = true;
            this.latch = countDownLatch;
        }

        public C3MyHandler(SessionStopStartTest sessionStopStartTest, CountDownLatch countDownLatch, boolean z, ClientSession clientSession) {
            this(countDownLatch, clientSession);
            this.stop = z;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.messageReceived++;
                this.latch.countDown();
                if (this.stop && this.latch.getCount() == 0) {
                    clientMessage.acknowledge();
                    this.val$session.stop();
                    this.started = false;
                }
            } catch (Exception e) {
            }
        }
    }

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.SessionStopStartTest$4MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionStopStartTest$4MyHandler.class */
    class C4MyHandler implements MessageHandler {
        int messageReceived;
        boolean failed;
        boolean started;
        private final CountDownLatch latch;
        private boolean stop;
        final /* synthetic */ ClientConsumer val$consumer;

        public C4MyHandler(CountDownLatch countDownLatch, ClientConsumer clientConsumer) {
            this.val$consumer = clientConsumer;
            this.messageReceived = 0;
            this.started = true;
            this.stop = true;
            this.latch = countDownLatch;
        }

        public C4MyHandler(SessionStopStartTest sessionStopStartTest, CountDownLatch countDownLatch, boolean z, ClientConsumer clientConsumer) {
            this(countDownLatch, clientConsumer);
            this.stop = z;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.messageReceived++;
                this.latch.countDown();
                if (this.stop && this.latch.getCount() == 0) {
                    clientMessage.acknowledge();
                    this.val$consumer.setMessageHandler((MessageHandler) null);
                    this.started = false;
                }
            } catch (Exception e) {
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(false);
        this.server.start();
        this.locator = createInVMNonHALocator();
    }

    @Test
    public void testStopStartConsumerSyncReceiveImmediate() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession.stop();
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.start();
        for (int i3 = 0; i3 < 50; i3++) {
            ClientMessage receive2 = createConsumer.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
        }
        createSession.close();
    }

    @Test
    public void testStopStartConsumerSyncReceive() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession.stop();
        long currentTimeMillis = System.currentTimeMillis();
        ClientMessage receive2 = createConsumer.receive(1000L);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        Assert.assertNull(receive2);
        createSession.start();
        for (int i3 = 0; i3 < 50; i3++) {
            ClientMessage receive3 = createConsumer.receive(5000L);
            Assert.assertNotNull(receive3);
            receive3.acknowledge();
        }
        createSession.close();
    }

    @Test
    public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        C1MyHandler c1MyHandler = new C1MyHandler(createSession, countDownLatch);
        createConsumer.setMessageHandler(c1MyHandler);
        waitForLatch(countDownLatch);
        Assert.assertFalse(c1MyHandler.failed);
        Assert.assertNull(createConsumer.getLastException());
        createConsumer.setMessageHandler((MessageHandler) null);
        createSession.start();
        for (int i2 = 0; i2 < 90; i2++) {
            ClientMessage receive = createConsumer.receive(1000L);
            if (receive == null) {
                System.out.println("ClientConsumerTest.testStopConsumer");
            }
            Assert.assertNotNull("message " + i2, receive);
            receive.acknowledge();
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.close();
    }

    @Test
    public void testStopStartConsumerAsyncSync() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        C2MyHandler c2MyHandler = new C2MyHandler(countDownLatch, createConsumer);
        createConsumer.setMessageHandler(c2MyHandler);
        waitForLatch(countDownLatch);
        try {
            createSession.stop();
            Assert.assertFalse(c2MyHandler.failed);
            Assert.assertNull(createConsumer.getLastException());
            createConsumer.setMessageHandler((MessageHandler) null);
            createSession.start();
            for (int i2 = 0; i2 < 90; i2++) {
                ClientMessage receive = createConsumer.receive(1000L);
                if (receive == null) {
                    System.out.println("ClientConsumerTest.testStopConsumer");
                }
                Assert.assertNotNull("message " + i2, receive);
                receive.acknowledge();
            }
            Assert.assertNull(createConsumer.receiveImmediate());
            createSession.close();
        } catch (Exception e) {
            log.warn(e.getMessage(), e);
            throw e;
        }
    }

    @Test
    public void testStopStartConsumerAsyncASyncStoppeeByHandler() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        C3MyHandler c3MyHandler = new C3MyHandler(countDownLatch, createSession);
        createConsumer.setMessageHandler(c3MyHandler);
        waitForLatch(countDownLatch);
        Thread.sleep(100L);
        Assert.assertFalse(c3MyHandler.failed);
        Assert.assertNull(createConsumer.getLastException());
        CountDownLatch countDownLatch2 = new CountDownLatch(90);
        C3MyHandler c3MyHandler2 = new C3MyHandler(this, countDownLatch2, false, createSession);
        createConsumer.setMessageHandler(c3MyHandler2);
        createSession.start();
        Assert.assertTrue("message received " + c3MyHandler2.messageReceived, countDownLatch2.await(5L, TimeUnit.SECONDS));
        Thread.sleep(100L);
        Assert.assertFalse(c3MyHandler2.failed);
        Assert.assertNull(createConsumer.getLastException());
        createSession.close();
    }

    @Test
    public void testStopStartConsumerAsyncASync() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        C4MyHandler c4MyHandler = new C4MyHandler(countDownLatch, createConsumer);
        createConsumer.setMessageHandler(c4MyHandler);
        waitForLatch(countDownLatch);
        Thread.sleep(100L);
        Assert.assertFalse(c4MyHandler.failed);
        Assert.assertNull(createConsumer.getLastException());
        CountDownLatch countDownLatch2 = new CountDownLatch(90);
        C4MyHandler c4MyHandler2 = new C4MyHandler(this, countDownLatch2, false, createConsumer);
        createConsumer.setMessageHandler(c4MyHandler2);
        createSession.start();
        Assert.assertTrue("message received " + c4MyHandler2.messageReceived, countDownLatch2.await(5L, TimeUnit.SECONDS));
        Thread.sleep(100L);
        Assert.assertFalse(c4MyHandler2.failed);
        Assert.assertNull(createConsumer.getLastException());
        createSession.close();
    }

    private int getMessageEncodeSize(SimpleString simpleString) throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator());
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientMessage createMessage = createSession.createMessage(false);
        createMessage.setAddress(simpleString);
        int encodeSize = createMessage.getEncodeSize();
        createSession.close();
        createSessionFactory.close();
        return encodeSize;
    }

    @Test
    public void testStopStartMultipleConsumers() throws Exception {
        this.locator.setConsumerWindowSize(getMessageEncodeSize(this.QUEUE) * 33);
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        ClientConsumer createConsumer2 = createSession.createConsumer(this.QUEUE);
        ClientConsumer createConsumer3 = createSession.createConsumer(this.QUEUE);
        createSession.start();
        ClientMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        ClientMessage receive2 = createConsumer2.receive(5000L);
        Assert.assertNotNull(receive2);
        receive2.acknowledge();
        ClientMessage receive3 = createConsumer3.receive(5000L);
        Assert.assertNotNull(receive3);
        receive3.acknowledge();
        createSession.stop();
        Assert.assertNull(createConsumer.receiveImmediate());
        Assert.assertNull(createConsumer2.receiveImmediate());
        Assert.assertNull(createConsumer3.receiveImmediate());
        createSession.start();
        Assert.assertNotNull(createConsumer.receive(5000L));
        Assert.assertNotNull(createConsumer2.receive(5000L));
        Assert.assertNotNull(createConsumer3.receive(5000L));
        createSession.close();
    }

    @Test
    public void testStopStartAlreadyStartedSession() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession.start();
        for (int i3 = 0; i3 < 50; i3++) {
            ClientMessage receive2 = createConsumer.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
        }
        createSession.close();
    }

    @Test
    public void testStopAlreadyStoppedSession() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.QUEUE, this.QUEUE, (SimpleString) null, false);
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(new SimpleString("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession.stop();
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.stop();
        Assert.assertNull(createConsumer.receiveImmediate());
        createSession.start();
        for (int i3 = 0; i3 < 50; i3++) {
            ClientMessage receive2 = createConsumer.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
        }
        createSession.close();
    }
}
