package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/AMQPLargeMessageClusterTest.class */
public class AMQPLargeMessageClusterTest extends ClusterTestBase {
    private static final int RECEIVE_TIMEOUT_MILLIS = 20000;
    private static final int MESSAGE_SIZE = 1048576;
    private static final int MESSAGES = 1;
    private final boolean persistenceEnabled;

    @Parameters(name = "persistenceEnabled = {0}")
    public static Iterable<? extends Object> persistenceEnabled() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public AMQPLargeMessageClusterTest(boolean z) {
        this.persistenceEnabled = z;
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        deleteDirectory(this.temporaryFolder);
        this.temporaryFolder.mkdirs();
        start();
    }

    private void start() throws Exception {
        setupServers();
        setRedistributionDelay(0L);
    }

    protected boolean isNetty() {
        return true;
    }

    @Timeout(value = 40000, unit = TimeUnit.MILLISECONDS)
    @TestTemplate
    public void testSendReceiveLargeMessage() throws Exception {
        testSendReceiveLargeMessage(message -> {
        }, message2 -> {
        });
    }

    @Timeout(value = 40000, unit = TimeUnit.MILLISECONDS)
    @TestTemplate
    public void testSendReceiveLargeMessageWithJMSCorrelationID() throws Exception {
        testSendReceiveLargeMessage(message -> {
            try {
                message.setJMSCorrelationID("123456");
            } catch (JMSException e) {
                Assertions.fail("Exception not expected: " + e);
            }
        }, message2 -> {
            try {
                Assertions.assertEquals("123456", message2.getJMSCorrelationID());
            } catch (JMSException e) {
                Assertions.fail("Exception not expected: " + e);
            }
        });
    }

    private void testSendReceiveLargeMessage(Consumer<Message> consumer, Consumer<Message> consumer2) throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.0", "queues.0", null, false, null, null, RoutingType.ANYCAST);
        createQueue(1, "queues.0", "queues.0", null, false, null, null, RoutingType.ANYCAST);
        waitForBindings(0, "queues.0", 1, 0, true);
        waitForBindings(1, "queues.0", 1, 0, true);
        waitForBindings(0, "queues.0", 1, 0, false);
        waitForBindings(1, "queues.0", 1, 0, false);
        Connection createConnection = new JmsConnectionFactory("amqp://localhost:61616").createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            try {
                createConnection.start();
                Queue createQueue = createSession.createQueue("queues.0");
                createConnection = new JmsConnectionFactory("amqp://localhost:61617").createConnection();
                try {
                    Session createSession2 = createConnection.createSession(false, 1);
                    try {
                        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                        try {
                            MessageProducer createProducer = createSession.createProducer(createQueue);
                            try {
                                createProducer.setDeliveryMode(2);
                                createConnection.start();
                                byte[] bArr = new byte[MESSAGE_SIZE];
                                byte[] bArr2 = new byte[bArr.length];
                                ThreadLocalRandom.current().nextBytes(bArr);
                                for (int i = 0; i < 1; i++) {
                                    BytesMessage createBytesMessage = createSession.createBytesMessage();
                                    createBytesMessage.writeBytes(bArr);
                                    consumer.accept(createBytesMessage);
                                    createProducer.send(createBytesMessage);
                                    BytesMessage receive = createConsumer.receive(20000L);
                                    Assertions.assertNotNull(receive, "A message should be received in 20000 ms");
                                    MatcherAssert.assertThat(receive, IsInstanceOf.instanceOf(createBytesMessage.getClass()));
                                    try {
                                        Assertions.assertEquals(bArr.length, receive.readBytes(bArr2));
                                        Assertions.assertArrayEquals(bArr, bArr2);
                                    } catch (Throwable th) {
                                        th.printStackTrace();
                                        System.exit(-1);
                                    }
                                    consumer2.accept(receive);
                                }
                                if (createProducer != null) {
                                    createProducer.close();
                                }
                                if (createConsumer != null) {
                                    createConsumer.close();
                                }
                                if (createSession2 != null) {
                                    createSession2.close();
                                }
                                if (createConnection != null) {
                                    createConnection.close();
                                }
                                if (createSession != null) {
                                    createSession.close();
                                }
                                if (createConnection != null) {
                                    createConnection.close();
                                }
                                stopServers(0, 1);
                            } catch (Throwable th2) {
                                if (createProducer != null) {
                                    try {
                                        createProducer.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                }
                                throw th2;
                            }
                        } catch (Throwable th4) {
                            if (createConsumer != null) {
                                try {
                                    createConsumer.close();
                                } catch (Throwable th5) {
                                    th4.addSuppressed(th5);
                                }
                            }
                            throw th4;
                        }
                    } catch (Throwable th6) {
                        if (createSession2 != null) {
                            try {
                                createSession2.close();
                            } catch (Throwable th7) {
                                th6.addSuppressed(th7);
                            }
                        }
                        throw th6;
                    }
                } finally {
                    if (createConnection != null) {
                        try {
                            createConnection.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    }
                }
            } finally {
            }
        } catch (Throwable th9) {
            throw th9;
        }
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
    }

    protected void setRedistributionDelay(long j) {
        AddressSettings redistributionDelay = new AddressSettings().setRedistributionDelay(j);
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
    }

    protected void setupServers() throws Exception {
        setupServer(0, this.persistenceEnabled, isNetty());
        setupServer(1, this.persistenceEnabled, isNetty());
        this.servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
        this.servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        stopServers(0, 1);
        clearServer(0, 1);
    }
}
