package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest.class */
public class MessageHandlerTest extends ActiveMQTestBase {
    private ActiveMQServer server;
    private final SimpleString QUEUE = SimpleString.of("ConsumerTestQueue");
    private ServerLocator locator;
    private ClientSessionFactory sf;

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.MessageHandlerTest$2MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest$2MyHandler.class */
    class C2MyHandler implements MessageHandler {
        boolean failed;
        private final CountDownLatch latch;
        final /* synthetic */ ClientConsumer val$consumer;
        int messageReceived = 0;
        boolean started = true;

        C2MyHandler(CountDownLatch countDownLatch, ClientConsumer clientConsumer) {
            this.val$consumer = clientConsumer;
            this.latch = countDownLatch;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.messageReceived++;
                this.latch.countDown();
                if (this.latch.getCount() == 0) {
                    clientMessage.acknowledge();
                    this.started = false;
                    this.val$consumer.setMessageHandler((MessageHandler) null);
                }
            } catch (Exception e) {
            }
        }
    }

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.MessageHandlerTest$3MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest$3MyHandler.class */
    class C3MyHandler implements MessageHandler {
        boolean failed;
        private final CountDownLatch latch;
        final /* synthetic */ ClientConsumer val$consumer;
        int messageReceived = 0;
        boolean started = true;

        C3MyHandler(CountDownLatch countDownLatch, ClientConsumer clientConsumer) {
            this.val$consumer = clientConsumer;
            this.latch = countDownLatch;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.messageReceived++;
                this.latch.countDown();
                if (this.latch.getCount() == 0) {
                    clientMessage.acknowledge();
                    this.started = false;
                    this.val$consumer.setMessageHandler((MessageHandler) null);
                }
            } catch (Exception e) {
            }
        }
    }

    /* renamed from: org.apache.activemq.artemis.tests.integration.client.MessageHandlerTest$4MyHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/MessageHandlerTest$4MyHandler.class */
    class C4MyHandler implements MessageHandler {
        boolean failed;
        private final CountDownLatch latch;
        final /* synthetic */ ClientConsumer val$consumer;
        int messageReceived = 0;
        boolean started = true;

        C4MyHandler(CountDownLatch countDownLatch, ClientConsumer clientConsumer) {
            this.val$consumer = clientConsumer;
            this.latch = countDownLatch;
        }

        public void onMessage(ClientMessage clientMessage) {
            try {
                if (!this.started) {
                    this.failed = true;
                }
                this.messageReceived++;
                this.latch.countDown();
                if (this.latch.getCount() == 0) {
                    clientMessage.acknowledge();
                    this.started = false;
                    this.val$consumer.setMessageHandler((MessageHandler) null);
                }
            } catch (Exception e) {
            }
        }
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(false);
        this.server.start();
        this.locator = createInVMNonHALocator();
        this.sf = createSessionFactory(this.locator);
    }

    @Test
    public void testSetMessageHandlerWithMessagesPending() throws Exception {
        ClientSession createSession = this.sf.createSession(false, true, true);
        createSession.createQueue(QueueConfiguration.of(this.QUEUE).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createTextMessage(createSession, "m" + i));
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE, (SimpleString) null, true);
        createSession.start();
        Thread.sleep(100L);
        createConsumer.setMessageHandler(new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.MessageHandlerTest.1MyHandler
            public void onMessage(ClientMessage clientMessage) {
                try {
                    Thread.sleep(10L);
                    clientMessage.acknowledge();
                } catch (Exception e) {
                }
            }
        });
        Thread.sleep(100L);
        createConsumer.setMessageHandler((MessageHandler) null);
        Thread.sleep(500L);
        Assertions.assertNull(createConsumer.getLastException());
        createSession.close();
    }

    @Test
    public void testSetResetMessageHandler() throws Exception {
        ClientSession createSession = this.sf.createSession(false, true, true);
        createSession.createQueue(QueueConfiguration.of(this.QUEUE).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(SimpleString.of("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        CountDownLatch countDownLatch = new CountDownLatch(50);
        C2MyHandler c2MyHandler = new C2MyHandler(countDownLatch, createConsumer);
        createConsumer.setMessageHandler(c2MyHandler);
        createSession.start();
        waitForLatch(countDownLatch);
        Thread.sleep(100L);
        Assertions.assertFalse(c2MyHandler.failed);
        Assertions.assertNull(createConsumer.getLastException());
        CountDownLatch countDownLatch2 = new CountDownLatch(50);
        C2MyHandler c2MyHandler2 = new C2MyHandler(countDownLatch2, createConsumer);
        createConsumer.setMessageHandler(c2MyHandler2);
        createSession.start();
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS), "message received " + c2MyHandler2.messageReceived);
        Thread.sleep(100L);
        Assertions.assertFalse(c2MyHandler2.failed);
        Assertions.assertNull(createConsumer.getLastException());
        createSession.close();
    }

    @Test
    public void testSetUnsetMessageHandler() throws Exception {
        ClientSession createSession = this.sf.createSession(false, true, true);
        createSession.createQueue(QueueConfiguration.of(this.QUEUE).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(SimpleString.of("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        CountDownLatch countDownLatch = new CountDownLatch(50);
        C3MyHandler c3MyHandler = new C3MyHandler(countDownLatch, createConsumer);
        createConsumer.setMessageHandler(c3MyHandler);
        waitForLatch(countDownLatch);
        Thread.sleep(100L);
        Assertions.assertFalse(c3MyHandler.failed);
        Assertions.assertNull(createConsumer.getLastException());
        createConsumer.setMessageHandler((MessageHandler) null);
        Assertions.assertNotNull(createConsumer.receiveImmediate());
        createSession.close();
    }

    @Test
    public void testSetUnsetResetMessageHandler() throws Exception {
        ClientSession createSession = this.sf.createSession(false, true, true);
        createSession.createQueue(QueueConfiguration.of(this.QUEUE).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.QUEUE);
        for (int i = 0; i < 100; i++) {
            ClientMessage createTextMessage = createTextMessage(createSession, "m" + i);
            createTextMessage.putIntProperty(SimpleString.of("i"), i);
            createProducer.send(createTextMessage);
        }
        ClientConsumer createConsumer = createSession.createConsumer(this.QUEUE);
        createSession.start();
        CountDownLatch countDownLatch = new CountDownLatch(50);
        C4MyHandler c4MyHandler = new C4MyHandler(countDownLatch, createConsumer);
        createConsumer.setMessageHandler(c4MyHandler);
        waitForLatch(countDownLatch);
        Thread.sleep(100L);
        Assertions.assertFalse(c4MyHandler.failed);
        Assertions.assertNull(createConsumer.getLastException());
        createConsumer.setMessageHandler((MessageHandler) null);
        Assertions.assertNotNull(createConsumer.receiveImmediate());
        CountDownLatch countDownLatch2 = new CountDownLatch(49);
        C4MyHandler c4MyHandler2 = new C4MyHandler(countDownLatch2, createConsumer);
        createConsumer.setMessageHandler(c4MyHandler2);
        createSession.start();
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS), "message received " + c4MyHandler2.messageReceived);
        Thread.sleep(100L);
        Assertions.assertFalse(c4MyHandler2.failed);
        Assertions.assertNull(createConsumer.getLastException());
        createSession.close();
    }
}
