package org.apache.activemq.artemis.tests.integration.amqp.largemessages;

import java.net.URI;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.class */
public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
    private String smallFrameLive = new String("tcp://localhost:5682");
    private String smallFrameBackup = new String("tcp://localhost:5682");

    @Override // org.apache.activemq.artemis.tests.integration.amqp.largemessages.AmqpReplicatedTestSupport
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMAcceptor(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.largemessages.AmqpReplicatedTestSupport
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return TransportConfigurationUtils.getInVMConnector(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.largemessages.AmqpReplicatedTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        createReplicatedConfigs();
        this.primaryConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", this.smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512");
        this.backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", this.smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512");
        this.primaryServer.start();
        this.backupServer.start();
        this.primaryServer.getServer().addAddressInfo(new AddressInfo(getQueueName(), RoutingType.ANYCAST));
        this.primaryServer.getServer().createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
        waitForRemoteBackupSynchronization(this.backupServer.getServer());
    }

    public SimpleString getQueueName() {
        return SimpleString.of("replicatedTest");
    }

    @Timeout(60)
    @Test
    public void testSimpleSend() throws Exception {
        try {
            ActiveMQServer server = this.primaryServer.getServer();
            AmqpConnection createConnection = createAmqpClient(new URI(this.smallFrameLive)).createConnection();
            addConnection(createConnection);
            createConnection.setMaxFrameSize(2048);
            createConnection.connect();
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName().toString());
            Queue locateQueue = server.locateQueue(getQueueName());
            Assertions.assertNotNull(locateQueue);
            Assertions.assertEquals(0L, locateQueue.getMessageCount());
            createSession.begin();
            for (int i = 0; i < 100; i++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setDurable(true);
                amqpMessage.setApplicationProperty("i", "m " + i);
                byte[] bArr = new byte[102400];
                for (int i2 = 0; i2 < bArr.length; i2++) {
                    bArr[i2] = 122;
                }
                amqpMessage.setBytes(bArr);
                createSender.send(amqpMessage);
            }
            createSession.commit();
            AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
            createConnection.close();
            this.primaryServer.crash(new ClientSession[0]);
            TestableServer testableServer = this.backupServer;
            Objects.requireNonNull(testableServer);
            Wait.assertTrue(testableServer::isActive);
            ActiveMQServer server2 = this.backupServer.getServer();
            AmqpConnection createConnection2 = createAmqpClient(new URI(this.smallFrameBackup)).createConnection();
            addConnection(createConnection2);
            createConnection2.setMaxFrameSize(2048);
            createConnection2.connect();
            AmqpSession createSession2 = createConnection2.createSession();
            Queue locateQueue2 = server2.locateQueue(getQueueName());
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(100L, locateQueue2::getMessageCount);
            AmqpReceiver createReceiver = createSession2.createReceiver(getQueueName().toString());
            createReceiver.flow(100);
            for (int i3 = 0; i3 < 100; i3++) {
                AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(receive);
                byte[] array = receive.getWrappedMessage().getBody().getValue().getArray();
                for (int i4 = 0; i4 < 102400; i4++) {
                    Assertions.assertEquals((byte) 122, array[i4]);
                }
                receive.accept(true);
            }
            createReceiver.flow(1);
            Assertions.assertNull(createReceiver.receiveNoWait());
            createReceiver.close();
            createConnection2.close();
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(0L, locateQueue2::getMessageCount);
            validateNoFilesOnLargeDir(getLargeMessagesDir(0, true), 0);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @Timeout(60)
    @Test
    public void testCloseFilesOnTarget() throws Exception {
        try {
            ActiveMQServer server = this.primaryServer.getServer();
            AmqpConnection createConnection = createAmqpClient(new URI(this.smallFrameLive)).createConnection();
            addConnection(createConnection);
            createConnection.setMaxFrameSize(2048);
            createConnection.connect();
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName().toString());
            Queue locateQueue = server.locateQueue(getQueueName());
            Assertions.assertNotNull(locateQueue);
            Assertions.assertEquals(0L, locateQueue.getMessageCount());
            createSession.begin();
            for (int i = 0; i < 100; i++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setDurable(true);
                amqpMessage.setApplicationProperty("i", "m " + i);
                byte[] bArr = new byte[102400];
                for (int i2 = 0; i2 < bArr.length; i2++) {
                    bArr[i2] = 122;
                }
                amqpMessage.setBytes(bArr);
                createSender.send(amqpMessage);
            }
            createSession.commit();
            AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
            Queue locateQueue2 = server.locateQueue(getQueueName());
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(100L, locateQueue2::getMessageCount);
            SharedNothingBackupActivation activation = this.backupServer.getServer().getActivation();
            Wait.assertEquals(0, () -> {
                return activation.getReplicationEndpoint().getLargeMessages().size();
            }, 5000L);
            AmqpReceiver createReceiver = createSession.createReceiver(getQueueName().toString());
            createReceiver.flow(100);
            for (int i3 = 0; i3 < 100; i3++) {
                AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(receive);
                byte[] array = receive.getWrappedMessage().getBody().getValue().getArray();
                for (int i4 = 0; i4 < 102400; i4++) {
                    Assertions.assertEquals((byte) 122, array[i4]);
                }
                receive.accept(true);
            }
            createReceiver.flow(1);
            Assertions.assertNull(createReceiver.receiveNoWait());
            createReceiver.close();
            createConnection.close();
            Objects.requireNonNull(locateQueue2);
            Wait.assertEquals(0L, locateQueue2::getMessageCount);
            validateNoFilesOnLargeDir(getLargeMessagesDir(0, false), 0);
            validateNoFilesOnLargeDir(getLargeMessagesDir(0, true), 0);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }
}
