package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.ResourceAllocationException;
import jakarta.jms.Session;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.class */
public class AmqpFlowControlTest extends JMSClientTestSupport {
    private static final long MAX_SIZE_BYTES = 1048576;
    private static final long MAX_SIZE_BYTES_REJECT_THRESHOLD = 2097152;
    private String singleCreditAcceptorURI = new String("tcp://localhost:5680");
    private int messagesSent;

    @Override // org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.messagesSent = 0;
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void addAdditionalAcceptors(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getConfiguration().addAcceptorConfiguration("flow", this.singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpLowCredits=1");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    public void configureAddressPolicy(ActiveMQServer activeMQServer) {
        AddressSettings addressSettings = (AddressSettings) activeMQServer.getAddressSettingsRepository().getMatch("#");
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
        addressSettings.setMaxSizeBytes(MAX_SIZE_BYTES);
        addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
        activeMQServer.getAddressSettingsRepository().addMatch("#", addressSettings);
    }

    @Timeout(60)
    @Test
    public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient(new URI(this.singleCreditAcceptorURI)).connect());
        try {
            Assertions.assertEquals(1, addConnection.createSession().createSender(getQueueName()).getSender().getCredit(), "Should only be issued one credit");
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testCreditIsNotGivenOnLinkCreationWhileBlockedAndIsGivenOnceThenUnblocked() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient(new URI(this.singleCreditAcceptorURI)).connect());
        try {
            AddressControl createAddressControl = ManagementControlHelper.createAddressControl(SimpleString.of(getQueueName()), this.mBeanServer);
            createAddressControl.block();
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName());
            Assertions.assertTrue(Wait.waitFor(() -> {
                return 0 == createSender.getSender().getCredit();
            }, 5000L, 20L), "Should get 0 credit");
            createAddressControl.unblock();
            Assertions.assertTrue(Wait.waitFor(() -> {
                return 1 == createSender.getSender().getCredit();
            }, 5000L, 20L), "Should now get issued one credit");
            createSender.close();
            Assertions.assertEquals(1, createSession.createSender(getQueueName()).getSender().getCredit(), "Should only be issued one credit");
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient(new URI(this.singleCreditAcceptorURI)).connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
            createSender.setSendTimeout(-1L);
            sendUntilFull(createSender);
            Assertions.assertTrue(createSender.getSender().getCredit() == -1);
            long addressSize = this.server.getPagingManager().getPageStore(SimpleString.of(getQueueName())).getAddressSize();
            Assertions.assertTrue(addressSize >= MAX_SIZE_BYTES && addressSize <= MAX_SIZE_BYTES_REJECT_THRESHOLD);
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testAddressIsBlockedForOtherProducersWhenFull() throws Exception {
        Session createSession = createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
        fillAddress(getQueueName());
        Exception exc = null;
        try {
            createProducer.send(createSession.createBytesMessage());
        } catch (ResourceAllocationException e) {
            exc = e;
        }
        Assertions.assertTrue(exc instanceof ResourceAllocationException);
        Assertions.assertTrue(exc.getMessage().contains("resource-limit-exceeded"));
        Assertions.assertTrue(this.server.getPagingManager().getPageStore(SimpleString.of(getQueueName())).getAddressSize() >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }

    @Timeout(60)
    @Test
    public void testSendBlocksWhenAddressBlockedAndCompletesAfterUnblocked() throws Exception {
        Session createSession = createConnection(new URI(this.singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true).createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(getQueueName()));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AddressControl createAddressControl = ManagementControlHelper.createAddressControl(SimpleString.of(getQueueName()), this.mBeanServer);
        Assertions.assertTrue(createAddressControl.block(), "blocked ok");
        createProducer.send(createSession.createBytesMessage());
        new Thread(() -> {
            try {
                countDownLatch.countDown();
                createProducer.send(createSession.createBytesMessage());
                countDownLatch2.countDown();
            } catch (JMSException e) {
                countDownLatch2.countDown();
            } catch (Throwable th) {
                countDownLatch2.countDown();
                throw th;
            }
        }).start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assertions.assertFalse(countDownLatch2.await(200L, TimeUnit.MILLISECONDS));
        createAddressControl.unblock();
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        createProducer.send(createSession.createBytesMessage());
        Assertions.assertEquals(3L, createAddressControl.getMessageCount());
    }

    @Timeout(60)
    @Test
    public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
        fillAddress(getQueueName());
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName());
            Thread.sleep(500L);
            Assertions.assertEquals(0, createSender.getSender().getCredit());
            AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
            createReceiver.flow(100);
            for (int i = 0; i < this.messagesSent - 1; i++) {
                createReceiver.receive(5000L, TimeUnit.MILLISECONDS).accept();
            }
            Thread.sleep(500L);
            Assertions.assertTrue(createSender.getSender().getCredit() >= 0);
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
        fillAddress(getQueueName());
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
            Thread.sleep(1000L);
            Assertions.assertEquals(0, createSender.getSender().getCredit());
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender(getQueueName());
        createSender.setPresettle(true);
        fillAddress(getQueueName());
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setBytes(new byte[51200]);
        Exception exc = null;
        try {
            createSession.begin();
            createSender.send(amqpMessage);
            createSession.commit();
            addConnection.close();
        } catch (Exception e) {
            exc = e;
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
        Assertions.assertNotNull(exc);
        Assertions.assertTrue(exc.getMessage().contains("resource-limit-exceeded"));
        Assertions.assertTrue(exc.getMessage().contains("Address is full: " + getQueueName()));
    }

    private void fillAddress(String str) throws Exception {
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        Exception exc = null;
        try {
            sendUntilFull(addConnection.createSession().createSender(str));
            addConnection.close();
        } catch (Exception e) {
            exc = e;
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
        Assertions.assertNotNull(exc);
        Assertions.assertTrue(exc.getMessage().contains("amqp:resource-limit-exceeded"));
    }

    private void sendUntilFull(AmqpSender amqpSender) throws Exception {
        AmqpMessage amqpMessage = new AmqpMessage();
        amqpMessage.setBytes(new byte[51200]);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Exception[] excArr = new Exception[1];
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 50; i++) {
                try {
                    amqpSender.send(amqpMessage);
                    atomicInteger.getAndIncrement();
                } catch (IOException e) {
                    excArr[0] = e;
                    return;
                }
            }
            countDownLatch.countDown();
        });
        try {
            thread.start();
            countDownLatch.await(1L, TimeUnit.SECONDS);
            this.messagesSent = atomicInteger.get();
            if (excArr[0] != null) {
                throw excArr[0];
            }
        } finally {
            thread.interrupt();
            thread.join(1000L);
            Assertions.assertFalse(thread.isAlive());
        }
    }
}
