package org.apache.activemq.artemis.tests.integration.amqp;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.class */
public class AmqpFlowControlFailTest {

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest$AmqpFlowControlFailDispositionTests.class */
    public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport {

        @Parameterized.Parameter
        public boolean useModified;

        @Parameterized.Parameter(1)
        public Symbol[] outcomes;

        @Parameterized.Parameter(2)
        public String expectedMessage;

        @Parameterized.Parameters(name = "useModified={0}")
        public static Collection<Object[]> parameters() {
            return Arrays.asList(new Object[]{true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "failure at remote"}, new Object[]{true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, new Object[]{false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, new Object[]{false, new Symbol[0], "[condition = amqp:resource-limit-exceeded]"});
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
        public void configureAddressPolicy(ActiveMQServer activeMQServer) {
            AmqpFlowControlFailTest.configureAddressPolicy(activeMQServer);
        }

        @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
        protected void configureAMQPAcceptorParameters(Map<String, Object> map) {
            map.put("amqpUseModifiedForTransientDeliveryErrors", Boolean.valueOf(this.useModified));
        }

        @Test(timeout = 60000)
        public void testAddressFullDisposition() throws Exception {
            AmqpConnection addConnection = addConnection(createAmqpClient(getBrokerAmqpConnectionURI()).connect());
            try {
                AmqpSender createSender = addConnection.createSession().createSender(getQueueName(), (SenderSettleMode) null, (ReceiverSettleMode) null, this.outcomes);
                boolean z = false;
                for (int i = 0; i < 1000; i++) {
                    AmqpMessage amqpMessage = new AmqpMessage();
                    amqpMessage.setBytes(new byte[10]);
                    try {
                        createSender.send(amqpMessage);
                    } catch (IOException e) {
                        z = true;
                        assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), this.expectedMessage), e.getMessage().contains(this.expectedMessage));
                    }
                }
                assertTrue("Expected messages to be refused by broker", z);
                addConnection.close();
            } catch (Throwable th) {
                addConnection.close();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest$AmqpFlowControlFailOrdinaryTests.class */
    public static class AmqpFlowControlFailOrdinaryTests extends JMSClientTestSupport {
        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
        public void configureAddressPolicy(ActiveMQServer activeMQServer) {
            AmqpFlowControlFailTest.configureAddressPolicy(activeMQServer);
        }

        @Test(timeout = 60000)
        public void testMesagesNotSent() throws Exception {
            AmqpConnection addConnection = addConnection(createAmqpClient(getBrokerAmqpConnectionURI()).connect());
            int i = 0;
            try {
                AmqpSender createSender = addConnection.createSession().createSender(getQueueName());
                boolean z = false;
                for (int i2 = 0; i2 < 1000; i2++) {
                    AmqpMessage amqpMessage = new AmqpMessage();
                    amqpMessage.setBytes(new byte[10]);
                    try {
                        createSender.send(amqpMessage);
                        i++;
                    } catch (IOException e) {
                        z = true;
                    }
                }
                assertTrue(z);
                boolean z2 = false;
                assertEquals(0L, createSender.getSender().getCredit());
                AmqpSession createSession = addConnection.createSession();
                AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
                createReceiver.flow(i);
                for (int i3 = 0; i3 < i; i3++) {
                    createReceiver.receive().accept();
                }
                createReceiver.close();
                createSession.close();
                Sender sender = createSender.getSender();
                sender.getClass();
                Wait.assertEquals(1000, sender::getCredit);
                for (int i4 = 0; i4 < 1000; i4++) {
                    AmqpMessage amqpMessage2 = new AmqpMessage();
                    amqpMessage2.setBytes(new byte[100]);
                    try {
                        createSender.send(amqpMessage2);
                    } catch (IOException e2) {
                        z2 = true;
                    }
                }
                assertTrue(z2);
                assertEquals(0L, createSender.getSender().getCredit());
                addConnection.close();
            } catch (Throwable th) {
                addConnection.close();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void configureAddressPolicy(ActiveMQServer activeMQServer) {
        AddressSettings addressSettings = (AddressSettings) activeMQServer.getAddressSettingsRepository().getMatch("#");
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
        addressSettings.setMaxSizeBytes(1000L);
        activeMQServer.getAddressSettingsRepository().addMatch("#", addressSettings);
    }
}
