package org.apache.activemq.artemis.tests.integration.amqp.largemessages;

import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.class */
public class InterruptedAMQPLargeMessage extends AmqpClientTestSupport {
    private static final int NUMBER_OF_THREADS = 10;
    private static final int MINIMAL_SEND = 2;
    private static final int MESSAGE_SIZE = 307200;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String smallFrameAcceptor = new String("tcp://localhost:5680");

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void addAdditionalAcceptors(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getConfiguration().addAcceptorConfiguration("flow", smallFrameAcceptor + "?protocols=AMQP;useEpoll=false;maxFrameSize=512;amqpMinLargeMessageSize=10000");
    }

    public static void main(String[] strArr) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        Runnable runnable = () -> {
            try {
                AmqpConnection createConnection = createLocalClient().createConnection();
                createConnection.setMaxFrameSize(2048);
                createConnection.connect();
                AmqpSender createSender = createConnection.createSession().createSender(strArr[0]);
                cyclicBarrier.await();
                for (int i = 0; i < 1000; i++) {
                    AmqpMessage amqpMessage = new AmqpMessage();
                    amqpMessage.setDurable(true);
                    byte[] bArr = new byte[MESSAGE_SIZE];
                    for (int i2 = 0; i2 < bArr.length; i2++) {
                        bArr[i2] = 122;
                    }
                    amqpMessage.setBytes(bArr);
                    createSender.send(amqpMessage);
                    countDownLatch.countDown();
                }
                createConnection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 10; i++) {
            new Thread(runnable).start();
        }
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(-1);
    }

    private static AmqpClient createLocalClient() throws URISyntaxException {
        return new AmqpClient(new URI(smallFrameAcceptor), (String) null, (String) null);
    }

    @Test
    public void testInterruptedLargeMessage() throws Exception {
        SpawnedVMSupport.spawnVM(InterruptedAMQPLargeMessage.class.getName(), new String[]{getQueueName()}).waitFor();
        Queue locateQueue = this.server.locateQueue(getQueueName());
        Assert.assertTrue(locateQueue.getMessageCount() >= 20);
        LinkedListIterator browserIterator = locateQueue.browserIterator();
        while (browserIterator.hasNext()) {
            Message message = ((MessageReference) browserIterator.next()).getMessage();
            Assert.assertNotNull(message);
            Assert.assertTrue(message instanceof LargeServerMessage);
        }
        browserIterator.close();
        logger.debug("There are {} on the queue", Long.valueOf(locateQueue.getMessageCount()));
        int messageCount = (int) locateQueue.getMessageCount();
        AmqpConnection addConnection = addConnection(createLocalClient().createConnection());
        addConnection.setMaxFrameSize(2048);
        addConnection.connect();
        AmqpReceiver createReceiver = addConnection.createSession().createReceiver(getQueueName());
        int i = 0;
        createReceiver.flow(messageCount + 10);
        for (int i2 = 0; i2 < messageCount; i2++) {
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            receive.accept(true);
            i++;
            logger.debug("Received {}", Integer.valueOf(i));
            byte[] array = receive.getWrappedMessage().getBody().getValue().getArray();
            Assert.assertEquals(307200L, array.length);
            for (byte b : array) {
                Assert.assertEquals(122L, b);
            }
        }
        Assert.assertNull(createReceiver.receiveNoWait());
        validateNoFilesOnLargeDir();
    }
}
