package org.apache.activemq.artemis.tests.integration.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.class */
public class SessionSendAcknowledgementHandlerTest extends ActiveMQTestBase {
    private ActiveMQServer server;
    private final SimpleString address = new SimpleString("address");
    private final SimpleString queueName = new SimpleString("queue");

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest$LatchAckHandler.class */
    public static final class LatchAckHandler implements SendAcknowledgementHandler {
        public CountDownLatch latch;
        private final String name;

        public LatchAckHandler(String str, CountDownLatch countDownLatch) {
            this.name = str;
            this.latch = countDownLatch;
        }

        public void sendAcknowledged(Message message) {
            this.latch.countDown();
        }

        public String toString() {
            return "SendAckHandler(name=" + this.name + ", latch=" + this.latch + ")";
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = createServer(false);
        this.server.start();
    }

    @Test
    public void testSetInvalidSendACK() throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(-1);
        ClientSession createSession = createSessionFactory(createInVMNonHALocator).createSession((String) null, (String) null, false, true, true, false, 1);
        boolean z = false;
        try {
            createSession.setSendAcknowledgementHandler(new SendAcknowledgementHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.SessionSendAcknowledgementHandlerTest.1
                public void sendAcknowledged(Message message) {
                }
            });
        } catch (Throwable th) {
            z = true;
        }
        assertTrue("Expected a failure on setting ACK Handler", z);
        createSession.createQueue(new QueueConfiguration(this.queueName).setAddress(this.address).setDurable(false));
    }

    @Test
    public void testSendAcknowledgementsNoWindowSize() throws Exception {
        verifySendAcknowledgements(0);
    }

    @Test
    public void testSendAcknowledgements() throws Exception {
        verifySendAcknowledgements(1024);
    }

    @Test
    public void testSendAcknowledgementsNoWindowSizeProducerOnly() throws Exception {
        verifySendAcknowledgementsProducerOnly(0);
    }

    @Test
    public void testSendAcknowledgementsProducer() throws Exception {
        verifySendAcknowledgementsProducerOnly(1024);
    }

    public void verifySendAcknowledgements(int i) throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(i);
        ClientSession createSession = createSessionFactory(createInVMNonHALocator).createSession((String) null, (String) null, false, true, true, false, 1);
        createSession.createQueue(new QueueConfiguration(this.queueName).setAddress(this.address).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.address);
        LatchAckHandler latchAckHandler = new LatchAckHandler("session", new CountDownLatch(1000));
        LatchAckHandler latchAckHandler2 = new LatchAckHandler("producer", new CountDownLatch(1000));
        createSession.setSendAcknowledgementHandler(latchAckHandler);
        for (int i2 = 0; i2 < 1000; i2++) {
            ClientMessage createMessage = createSession.createMessage(false);
            ClientMessage createMessage2 = createSession.createMessage(false);
            createProducer.send(createMessage);
            createProducer.send(this.address, createMessage2, latchAckHandler2);
        }
        Assert.assertTrue("session must have acked, " + latchAckHandler, latchAckHandler.latch.await(5L, TimeUnit.SECONDS));
        Assert.assertTrue("producer specific handler must have acked, " + latchAckHandler2, latchAckHandler2.latch.await(5L, TimeUnit.SECONDS));
    }

    public void verifySendAcknowledgementsProducerOnly(int i) throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(i);
        ClientSession createSession = createSessionFactory(createInVMNonHALocator).createSession((String) null, (String) null, false, true, true, false, 1);
        createSession.createQueue(new QueueConfiguration(this.queueName).setAddress(this.address).setDurable(false));
        ClientProducer createProducer = createSession.createProducer(this.address);
        LatchAckHandler latchAckHandler = new LatchAckHandler("producer", new CountDownLatch(1000));
        for (int i2 = 0; i2 < 1000; i2++) {
            createProducer.send(this.address, createSession.createMessage(false), latchAckHandler);
        }
        Assert.assertTrue("producer specific handler must have acked, " + latchAckHandler, latchAckHandler.latch.await(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testHandlerOnSend() throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(256);
        ClientSession createSession = createInVMNonHALocator.createSessionFactory().createSession();
        ClientProducer createProducer = createSession.createProducer(this.address);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 750; i++) {
            createProducer.send(createSession.createMessage(true), message -> {
                atomicInteger.incrementAndGet();
            });
        }
        Wait.assertEquals(750, () -> {
            return atomicInteger.get();
        }, 2000L, 100L);
    }

    @Test
    public void testHandlerOnSendWithAnonymousProducer() throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(256);
        ClientSession createSession = createInVMNonHALocator.createSessionFactory().createSession();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ClientProducer createProducer = createSession.createProducer();
        for (int i = 0; i < 750; i++) {
            createProducer.send(this.address, createSession.createMessage(true), message -> {
                atomicInteger.incrementAndGet();
            });
        }
        Wait.assertEquals(750, () -> {
            return atomicInteger.get();
        }, 2000L, 100L);
    }

    @Test
    public void testHandlerOnSession() throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(256);
        ClientSession createSession = createInVMNonHALocator.createSessionFactory().createSession();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        createSession.setSendAcknowledgementHandler(message -> {
            atomicInteger.incrementAndGet();
        });
        ClientProducer createProducer = createSession.createProducer(this.address);
        for (int i = 0; i < 750; i++) {
            createProducer.send(createSession.createMessage(true));
        }
        Wait.assertEquals(750, () -> {
            return atomicInteger.get();
        }, 2000L, 100L);
    }

    @Test
    public void testHandlerOnSessionWithAnonymousProducer() throws Exception {
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        createInVMNonHALocator.setConfirmationWindowSize(256);
        ClientSession createSession = createInVMNonHALocator.createSessionFactory().createSession();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        createSession.setSendAcknowledgementHandler(message -> {
            atomicInteger.incrementAndGet();
        });
        ClientProducer createProducer = createSession.createProducer();
        for (int i = 0; i < 750; i++) {
            createProducer.send(this.address, createSession.createMessage(true));
        }
        Wait.assertEquals(750, () -> {
            return atomicInteger.get();
        }, 2000L, 100L);
    }
}
