package org.apache.activemq.artemis.tests.integration.client;

import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionContinuationMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.class */
public class InterruptedLargeMessageTest extends LargeMessageTestBase {
    static final int RECEIVE_WAIT_TIME = 60000;
    private final int LARGE_MESSAGE_SIZE = 307200;
    private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    protected ServerLocator locator;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest$LargeMessageTestInterceptorIgnoreLastPacket.class */
    public static class LargeMessageTestInterceptorIgnoreLastPacket implements Interceptor {
        private static boolean intMessages = false;
        private static CountDownLatch latch = new CountDownLatch(1);

        public static void clearInterrupt() {
            intMessages = true;
            latch = new CountDownLatch(1);
        }

        public static void disableInterrupt() {
            intMessages = false;
        }

        public static void awaitInterrupt() throws Exception {
            latch.await();
        }

        public boolean intercept(Packet packet, RemotingConnection remotingConnection) throws ActiveMQException {
            if (!(packet instanceof SessionContinuationMessage) || ((SessionContinuationMessage) packet).isContinues() || !intMessages) {
                return true;
            }
            System.out.println("Ignored a message");
            latch.countDown();
            return false;
        }
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        this.locator = createFactory(isNetty());
    }

    protected boolean isNetty() {
        return false;
    }

    @Test
    public void testInterruptLargeMessageSend() throws Exception {
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
        createServer.start();
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false);
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 307200L, true);
        createLargeClientMessageStreaming.setExpiration(System.currentTimeMillis());
        createProducer.send(createLargeClientMessageStreaming);
        Thread.sleep(500L);
        Iterator it = createServer.getSessions().iterator();
        while (it.hasNext()) {
            ((ServerSession) it.next()).clearLargeMessage();
        }
        createServer.stop(false);
        ActiveMQTestBase.forceGC();
        createServer.start();
        createServer.stop();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testCloseConsumerDuringTransmission() throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        createServer.start();
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).addIncomingInterceptor(new LargeMessageTestInterceptorIgnoreLastPacket());
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        createSession.createProducer(this.ADDRESS).send(createLargeClientMessageStreaming(createSession, 307200L, true));
        createSession.commit();
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
        createSession.start();
        Thread thread = new Thread() { // from class: org.apache.activemq.artemis.tests.integration.client.InterruptedLargeMessageTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    System.out.println("Receiving message");
                    ClientMessage receive = createConsumer.receive(5000L);
                    if (receive != null) {
                        receive.checkCompletion();
                    } else {
                        System.err.println("Message not received");
                        atomicInteger.incrementAndGet();
                    }
                } catch (ActiveMQException e) {
                    e.printStackTrace();
                    atomicInteger2.incrementAndGet();
                }
            }
        };
        thread.start();
        LargeMessageTestInterceptorIgnoreLastPacket.awaitInterrupt();
        createConsumer.close();
        thread.join();
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(1L, atomicInteger2.get());
        createSession.close();
        createServer.stop();
    }

    @Test
    public void testSendNonPersistentQueue() throws Exception {
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, false);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createLargeClientMessageStreaming(createSession, 307200L, true));
        }
        createSession.commit();
        createSession.close();
        ClientSession createSession2 = createSessionFactory.createSession(false, false);
        ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS);
        createSession2.start();
        for (int i2 = 0; i2 < 5; i2++) {
            for (int i3 = 0; i3 < 10; i3++) {
                ClientMessage receive = createConsumer.receive(5000L);
                Assert.assertNotNull(receive);
                for (int i4 = 0; i4 < 307200; i4++) {
                    Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i4), receive.getBodyBuffer().readByte());
                }
                receive.acknowledge();
            }
            createSession2.rollback();
        }
        createServer.stop(false);
        createServer.start();
        createServer.stop();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testSendPaging() throws Exception {
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer createServer = createServer(true, createDefaultConfig(isNetty()), 10000L, 20000L, new HashMap());
        createServer.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        createServer.getPagingManager().getPageStore(this.ADDRESS).startPaging();
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createLargeClientMessageStreaming(createSession, 307200L, true));
        }
        createSession.commit();
        validateNoFilesOnLargeDir(createServer.getConfiguration().getLargeMessagesDirectory(), 10);
        for (int i2 = 0; i2 < 5; i2++) {
            createSession.close();
            createSessionFactory.close();
            createServer.stop();
            createServer.start();
            createSessionFactory = createSessionFactory(this.locator);
            createSession = createSessionFactory.createSession(false, false);
            ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
            createSession.start();
            for (int i3 = 0; i3 < 10; i3++) {
                ClientMessage receive = createConsumer.receive(5000L);
                Assert.assertNotNull(receive);
                for (int i4 = 0; i4 < 307200; i4++) {
                    Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i4), receive.getBodyBuffer().readByte());
                }
                receive.acknowledge();
            }
            if (i2 == 4) {
                createSession.commit();
            } else {
                createSession.rollback();
            }
            createSession.close();
            createSessionFactory.close();
        }
        createServer.stop(false);
        createServer.start();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testSendPreparedXA() throws Exception {
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServer createServer = createServer(true, createDefaultConfig(isNetty()), 10000L, 20000L, new HashMap());
        createServer.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
        createServer.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(true, false, false);
        XidImpl newXID = newXID();
        XidImpl newXID2 = newXID();
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        createSession.start(newXID, 0);
        for (int i = 0; i < 10; i++) {
            ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 307200L, true);
            createLargeClientMessageStreaming.putIntProperty("txid", 1);
            createProducer.send(createLargeClientMessageStreaming);
        }
        createSession.end(newXID, 67108864);
        createSession.prepare(newXID);
        createSession.start(newXID2, 0);
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage createLargeClientMessageStreaming2 = createLargeClientMessageStreaming(createSession, 307200L, true);
            createLargeClientMessageStreaming2.putIntProperty("txid", 2);
            createLargeClientMessageStreaming2.putIntProperty("i", i2);
            createProducer.send(createLargeClientMessageStreaming2);
        }
        createSession.end(newXID2, 67108864);
        createSession.prepare(newXID2);
        createSession.close();
        createSessionFactory.close();
        createServer.stop(false);
        createServer.start();
        for (int i3 = 0; i3 < 2; i3++) {
            System.out.println("Start " + i3);
            ClientSessionFactory createSessionFactory2 = createSessionFactory(this.locator);
            if (i3 == 0) {
                ClientSession createSession2 = createSessionFactory2.createSession(true, false, false);
                createSession2.commit(newXID, false);
                createSession2.close();
            }
            ClientSession createSession3 = createSessionFactory2.createSession(false, false, false);
            ClientConsumer createConsumer = createSession3.createConsumer(this.ADDRESS);
            createSession3.start();
            for (int i4 = 0; i4 < 10; i4++) {
                this.log.info("I = " + i4);
                ClientMessage receive = createConsumer.receive(5000L);
                Assert.assertNotNull(receive);
                Assert.assertEquals(1L, receive.getIntProperty("txid").intValue());
                receive.acknowledge();
            }
            if (i3 == 1) {
                createSession3.commit();
            } else {
                createSession3.rollback();
            }
            createSession3.close();
            createSessionFactory2.close();
            createServer.stop();
            createServer.start();
        }
        createServer.stop();
        validateNoFilesOnLargeDir(createServer.getConfiguration().getLargeMessagesDirectory(), 10);
        createServer.start();
        ClientSessionFactory createSessionFactory3 = createSessionFactory(this.locator);
        createSessionFactory3.createSession(true, false, false).rollback(newXID2);
        createSessionFactory3.close();
        createServer.stop();
        createServer.start();
        createServer.stop();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testRestartBeforeDelete() throws Exception {
        LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt();
        ActiveMQServerImpl createServer = createServer(true, isNetty());
        createServer.start();
        QueueFactory queueFactory = createServer.getQueueFactory();
        createServer.replaceQueueFactory(new QueueFactory(createServer.getStorageManager(), createServer.getPostOffice(), createServer.getScheduledPool(), createServer.getAddressSettingsRepository(), createServer.getExecutorFactory()) { // from class: org.apache.activemq.artemis.tests.integration.client.InterruptedLargeMessageTest.1NoPostACKQueueFactory
            final StorageManager storageManager;
            final PostOffice postOffice;
            final ScheduledExecutorService scheduledExecutor;
            final HierarchicalRepository<AddressSettings> addressSettingsRepository;
            final ExecutorFactory execFactory;

            {
                this.storageManager = r5;
                this.postOffice = r6;
                this.scheduledExecutor = r7;
                this.addressSettingsRepository = r8;
                this.execFactory = r9;
            }

            public Queue createQueue(long j, SimpleString simpleString, SimpleString simpleString2, Filter filter, PageSubscription pageSubscription, SimpleString simpleString3, boolean z, boolean z2, boolean z3) {
                final InterruptedLargeMessageTest interruptedLargeMessageTest = InterruptedLargeMessageTest.this;
                return new QueueImpl(j, simpleString, simpleString2, filter, simpleString3, pageSubscription, z, z2, z3, this.scheduledExecutor, this.postOffice, this.storageManager, this.addressSettingsRepository, this.execFactory.getExecutor()) { // from class: org.apache.activemq.artemis.tests.integration.client.InterruptedLargeMessageTest.1NoPostACKQueue
                    public void postAcknowledge(MessageReference messageReference) {
                        System.out.println("Ignoring postACK on message " + messageReference);
                    }

                    public void deliverScheduledMessages() {
                    }
                };
            }

            public void setPostOffice(PostOffice postOffice) {
            }
        });
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createLargeClientMessageStreaming(createSession, 307200L, true));
        }
        createSession.commit();
        createSession.close();
        ClientSession createSession2 = createSessionFactory.createSession(false, false);
        ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS);
        createSession2.start();
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.saveToOutputStream(new OutputStream() { // from class: org.apache.activemq.artemis.tests.integration.client.InterruptedLargeMessageTest.2
                @Override // java.io.OutputStream
                public void write(int i3) throws IOException {
                }
            });
            receive.acknowledge();
            createSession2.commit();
        }
        createServer.replaceQueueFactory(queueFactory);
        createServer.stop(false);
        createServer.start();
        createServer.stop();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testConsumeAfterRestart() throws Exception {
        LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt();
        ActiveMQServerImpl createServer = createServer(true, isNetty());
        createServer.start();
        QueueFactory queueFactory = createServer.getQueueFactory();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createLargeClientMessageStreaming(createSession, 307200L, true));
        }
        createSession.commit();
        createSession.close();
        createSessionFactory.close();
        createServer.stop();
        createServer.start();
        ClientSession createSession2 = createSessionFactory(this.locator).createSession(false, false);
        ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS);
        createSession2.start();
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.saveToOutputStream(new OutputStream() { // from class: org.apache.activemq.artemis.tests.integration.client.InterruptedLargeMessageTest.3
                @Override // java.io.OutputStream
                public void write(int i3) throws IOException {
                }
            });
            receive.acknowledge();
            createSession2.commit();
        }
        createServer.replaceQueueFactory(queueFactory);
        createServer.stop(false);
        createServer.start();
        createServer.stop();
        validateNoFilesOnLargeDir();
    }
}
