package org.apache.activemq.artemis.tests.integration.largemessage;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DeflaterReader;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.class */
public abstract class LargeMessageTestBase extends ActiveMQTestBase {
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
    protected StoreConfiguration.StoreType storeType;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase$TestLargeMessageInputStream.class */
    public static class TestLargeMessageInputStream extends InputStream {
        private final int minLarge;
        private int size;
        private int pos;
        private boolean random;

        public TestLargeMessageInputStream(int i) {
            this(i, false);
        }

        public TestLargeMessageInputStream(int i, boolean z) {
            this.pos = 0;
            this.minLarge = i;
            this.size = i + 1024;
            this.random = z;
        }

        public int getChar(int i) {
            return this.random ? 65 + new Random().nextInt(26) : 65 + (i % 26);
        }

        public void setSize(int i) {
            this.size = i;
        }

        public TestLargeMessageInputStream(TestLargeMessageInputStream testLargeMessageInputStream) {
            this.minLarge = testLargeMessageInputStream.minLarge;
            this.size = testLargeMessageInputStream.size;
            this.pos = testLargeMessageInputStream.pos;
        }

        public int getSize() {
            return this.size;
        }

        public int getMinLarge() {
            return this.minLarge;
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            if (this.pos == this.size) {
                return -1;
            }
            this.pos++;
            return getChar(this.pos - 1);
        }

        public void resetAdjust(int i) {
            this.size += i;
            if (this.size <= this.minLarge) {
                throw new IllegalStateException("Couldn't adjust anymore, size smaller than minLarge " + this.minLarge);
            }
            this.pos = 0;
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public TestLargeMessageInputStream m116clone() {
            return new TestLargeMessageInputStream(this);
        }

        public char[] toArray() throws IOException {
            char[] cArr = new char[this.size];
            for (int i = 0; i < cArr.length; i++) {
                cArr[i] = (char) read();
            }
            return cArr;
        }
    }

    public LargeMessageTestBase(StoreConfiguration.StoreType storeType) {
        this.storeType = storeType;
    }

    public void tearDown() throws Exception {
        super.tearDown();
        if (this.storeType == StoreConfiguration.StoreType.DATABASE) {
            destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE", "NODE_MANAGER_STORE"));
        }
    }

    @Parameterized.Parameters(name = "storeType={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{StoreConfiguration.StoreType.FILE}, new Object[]{StoreConfiguration.StoreType.DATABASE});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChunks(boolean z, boolean z2, boolean z3, boolean z4, boolean z5, boolean z6, boolean z7, boolean z8, boolean z9, int i, long j, int i2, long j2) throws Exception {
        testChunks(z, z2, z3, z4, z5, z6, z7, z8, z9, i, j, i2, j2, -1, 10240);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testChunks(boolean z, boolean z2, boolean z3, final boolean z4, boolean z5, final boolean z6, boolean z7, boolean z8, boolean z9, int i, final long j, int i2, final long j2, int i3, int i4) throws Exception {
        clearDataRecreateServerDirs();
        Configuration createDefaultJDBCConfig = this.storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(true) : createDefaultConfig(false);
        ActiveMQServer createServer = createServer(z5, createDefaultJDBCConfig);
        createServer.start();
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        if (z7) {
            try {
                try {
                    createInVMNonHALocator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
                } catch (Throwable th) {
                    th.printStackTrace();
                    throw th;
                }
            } catch (Throwable th2) {
                createInVMNonHALocator.close();
                try {
                    createServer.stop();
                } catch (Throwable th3) {
                    th3.printStackTrace();
                }
                throw th2;
            }
        }
        if (i3 > 0) {
            createInVMNonHALocator.setConfirmationWindowSize(i3);
        }
        createInVMNonHALocator.setMinLargeMessageSize(i4);
        ClientSessionFactory createSessionFactory = createInVMNonHALocator.createSessionFactory();
        Xid xid = null;
        ClientSession createSession = createSessionFactory.createSession((String) null, (String) null, z, false, false, z6, 0);
        if (z) {
            xid = newXID();
            createSession.start(xid, 0);
        }
        createSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        if (z3) {
            sendMessages(i, j, j2, createSession, createProducer);
            if (z) {
                createSession.end(xid, 67108864);
                createSession.prepare(xid);
                createSession.close();
                if (z5 && z2) {
                    createServer.stop();
                    createServer.start();
                    createSessionFactory = createInVMNonHALocator.createSessionFactory();
                }
                createSession = createSessionFactory.createSession((String) null, (String) null, z, false, false, z6, 0);
                Xid[] recover = createSession.recover(16777216);
                Assert.assertEquals(1L, recover.length);
                Assert.assertEquals(xid, recover[0]);
                createSession.rollback(xid);
                createProducer = createSession.createProducer(this.ADDRESS);
                xid = newXID();
                createSession.start(xid, 0);
            } else {
                createSession.rollback();
            }
            validateNoFilesOnLargeDir();
        }
        sendMessages(i, j, j2, createSession, createProducer);
        if (z) {
            createSession.end(xid, 67108864);
            createSession.prepare(xid);
            createSession.close();
            if (z5 && z2) {
                createServer.stop();
                createServer.start();
                createSessionFactory = createInVMNonHALocator.createSessionFactory();
            }
            createSession = createSessionFactory.createSession((String) null, (String) null, z, false, false, z6, 0);
            Xid[] recover2 = createSession.recover(16777216);
            Assert.assertEquals(1L, recover2.length);
            Assert.assertEquals(xid, recover2[0]);
            createSession.createProducer(this.ADDRESS);
            createSession.commit(xid, false);
            xid = newXID();
            createSession.start(xid, 0);
        } else {
            createSession.commit();
        }
        createSession.close();
        if (z5) {
            createServer.stop();
            createServer = createServer(z5, createDefaultJDBCConfig);
            createServer.start();
            createSessionFactory = createInVMNonHALocator.createSessionFactory();
        }
        ClientSession createSession2 = createSessionFactory.createSession((String) null, (String) null, z, false, false, z6, 0);
        if (z) {
            xid = newXID();
            createSession2.start(xid, 0);
        }
        int i5 = z8 ? 0 : 1;
        while (i5 < 2) {
            createSession2.stop();
            ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS, (SimpleString) null, i5 == 0);
            if (z9) {
                final CountDownLatch countDownLatch = new CountDownLatch(i);
                final AtomicInteger atomicInteger = new AtomicInteger(0);
                MessageHandler messageHandler = new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase.1
                    int msgCounter;

                    public void onMessage(ClientMessage clientMessage) {
                        try {
                            try {
                                if (j2 > 0) {
                                    long longValue = ((Long) clientMessage.getObjectProperty(new SimpleString("original-time"))).longValue();
                                    Assert.assertTrue((System.currentTimeMillis() - longValue) + "<" + j2, System.currentTimeMillis() - longValue >= j2);
                                }
                                if (!z6) {
                                    clientMessage.acknowledge();
                                }
                                Assert.assertNotNull(clientMessage);
                                if (j2 <= 0) {
                                    Assert.assertEquals(this.msgCounter, ((Integer) clientMessage.getObjectProperty(new SimpleString("counter-message"))).intValue());
                                }
                                if (z4) {
                                    final AtomicLong atomicLong = new AtomicLong(0L);
                                    clientMessage.saveToOutputStream(new OutputStream() { // from class: org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase.1.1
                                        @Override // java.io.OutputStream
                                        public void write(byte[] bArr) throws IOException {
                                            if (bArr[0] != ActiveMQTestBase.getSamplebyte(atomicLong.get())) {
                                                LargeMessageTestBase.log.warn("Received invalid packet at position " + atomicLong.get());
                                            } else {
                                                atomicLong.addAndGet(bArr.length);
                                                LargeMessageTestBase.log.debug("Read position " + atomicLong.get() + " on consumer");
                                            }
                                        }

                                        @Override // java.io.OutputStream
                                        public void write(int i6) throws IOException {
                                            if (i6 == ActiveMQTestBase.getSamplebyte(atomicLong.get())) {
                                                atomicLong.incrementAndGet();
                                            } else {
                                                LargeMessageTestBase.log.warn("byte not as expected!");
                                            }
                                        }
                                    });
                                    Assert.assertEquals(j, atomicLong.get());
                                } else {
                                    ActiveMQBuffer bodyBuffer = clientMessage.getBodyBuffer();
                                    bodyBuffer.resetReaderIndex();
                                    for (long j3 = 0; j3 < j; j3++) {
                                        if (j3 % 1048576 == 0) {
                                            LargeMessageTestBase.log.debug("Read " + j3 + " bytes");
                                        }
                                        Assert.assertEquals(ActiveMQTestBase.getSamplebyte(j3), bodyBuffer.readByte());
                                    }
                                    try {
                                        bodyBuffer.readByte();
                                        Assert.fail("Supposed to throw an exception");
                                    } catch (Exception e) {
                                    }
                                }
                                countDownLatch.countDown();
                                this.msgCounter++;
                            } catch (Throwable th4) {
                                countDownLatch.countDown();
                                this.msgCounter++;
                                throw th4;
                            }
                        } catch (Throwable th5) {
                            th5.printStackTrace();
                            LargeMessageTestBase.log.warn("Got an error", th5);
                            atomicInteger.incrementAndGet();
                            countDownLatch.countDown();
                            this.msgCounter++;
                        }
                    }
                };
                createSession2.start();
                createConsumer.setMessageHandler(messageHandler);
                Assert.assertTrue(countDownLatch.await(i2, TimeUnit.MILLISECONDS));
                Assert.assertEquals(0L, atomicInteger.get());
            } else {
                createSession2.start();
                for (int i6 = 0; i6 < i; i6++) {
                    System.currentTimeMillis();
                    ClientMessage receive = createConsumer.receive(i2 + j2);
                    Assert.assertNotNull(receive);
                    System.currentTimeMillis();
                    if (j2 > 0) {
                        long longValue = ((Long) receive.getObjectProperty(new SimpleString("original-time"))).longValue();
                        Assert.assertTrue((System.currentTimeMillis() - longValue) + "<" + j2, System.currentTimeMillis() - longValue >= j2);
                    }
                    if (!z6) {
                        receive.acknowledge();
                    }
                    Assert.assertNotNull(receive);
                    if (j2 <= 0) {
                        Assert.assertEquals(i6, ((Integer) receive.getObjectProperty(new SimpleString("counter-message"))).intValue());
                    }
                    if (z4) {
                        final AtomicLong atomicLong = new AtomicLong(0L);
                        receive.saveToOutputStream(new OutputStream() { // from class: org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase.2
                            @Override // java.io.OutputStream
                            public void write(byte[] bArr) throws IOException {
                                if (bArr.length > 0) {
                                    if (bArr[0] == ActiveMQTestBase.getSamplebyte(atomicLong.get())) {
                                        atomicLong.addAndGet(bArr.length);
                                    } else {
                                        LargeMessageTestBase.log.warn("Received invalid packet at position " + atomicLong.get());
                                    }
                                }
                            }

                            @Override // java.io.OutputStream
                            public void write(int i7) throws IOException {
                                if (atomicLong.get() % 1048576 == 0) {
                                    LargeMessageTestBase.log.debug("Read " + atomicLong.get() + " bytes");
                                }
                                if (i7 == 97) {
                                    atomicLong.incrementAndGet();
                                } else {
                                    LargeMessageTestBase.log.warn("byte not as expected!");
                                }
                            }
                        });
                        Assert.assertEquals(j, atomicLong.get());
                    } else {
                        receive.getBodyBuffer().resetReaderIndex();
                        for (long j3 = 0; j3 < j; j3++) {
                            if (j3 % 1048576 == 0) {
                                log.debug("Read " + j3 + " bytes");
                            }
                            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(j3), r0.readByte());
                        }
                    }
                }
            }
            createConsumer.close();
            if (i5 == 0) {
                if (z) {
                    createSession2.end(xid, 67108864);
                    createSession2.rollback(xid);
                    xid = newXID();
                    createSession2.start(xid, 0);
                } else {
                    createSession2.rollback();
                }
            } else if (z) {
                createSession2.end(xid, 67108864);
                createSession2.commit(xid, true);
                xid = newXID();
                createSession2.start(xid, 0);
            } else {
                createSession2.commit();
            }
            i5++;
        }
        createSession2.close();
        Assert.assertEquals(0L, createServer.getPostOffice().getBinding(this.ADDRESS).getBindable().getDeliveringCount());
        Assert.assertEquals(0L, createServer.getPostOffice().getBinding(this.ADDRESS).getBindable().getMessageCount());
        validateNoFilesOnLargeDir();
        createInVMNonHALocator.close();
        try {
            createServer.stop();
        } catch (Throwable th4) {
            th4.printStackTrace();
        }
    }

    private void sendMessages(int i, long j, long j2, ClientSession clientSession, ClientProducer clientProducer) throws Exception {
        log.debug("NumberOfBytes = " + j);
        for (int i2 = 0; i2 < i; i2++) {
            ClientMessage createMessage = clientSession.createMessage(true);
            if (j > 1048576 || i2 % 2 == 0) {
                log.debug("Sending message (stream)" + i2);
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(j));
            } else {
                log.debug("Sending message (array)" + i2);
                byte[] bArr = new byte[(int) j];
                for (int i3 = 0; i3 < bArr.length; i3++) {
                    bArr[i3] = ActiveMQTestBase.getSamplebyte(i3);
                }
                createMessage.getBodyBuffer().writeBytes(bArr);
            }
            createMessage.putIntProperty(new SimpleString("counter-message"), i2);
            if (j2 > 0) {
                long currentTimeMillis = System.currentTimeMillis();
                createMessage.putLongProperty(new SimpleString("original-time"), currentTimeMillis);
                createMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, currentTimeMillis + j2);
                clientProducer.send(createMessage);
            } else {
                clientProducer.send(createMessage);
            }
        }
    }

    protected ActiveMQBuffer createLargeBuffer(int i) {
        ActiveMQBuffer fixedBuffer = ActiveMQBuffers.fixedBuffer(4 * i);
        for (int i2 = 0; i2 < i; i2++) {
            fixedBuffer.writeInt(i2);
        }
        return fixedBuffer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientMessage createLargeClientMessageStreaming(ClientSession clientSession, int i) throws Exception {
        return createLargeClientMessageStreaming(clientSession, i, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientMessage createLargeClientMessage(ClientSession clientSession, byte[] bArr, boolean z) throws Exception {
        ClientMessage createMessage = clientSession.createMessage(z);
        createMessage.getBodyBuffer().writeBytes(bArr);
        return createMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientMessage createLargeClientMessageStreaming(ClientSession clientSession, long j, boolean z) throws Exception {
        ClientMessage createMessage = clientSession.createMessage(z);
        createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(j));
        return createMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void readMessage(ClientSession clientSession, SimpleString simpleString, int i) throws ActiveMQException, IOException {
        clientSession.start();
        ClientConsumer createConsumer = clientSession.createConsumer(simpleString);
        ClientMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        clientSession.commit();
        createConsumer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OutputStream createFakeOutputStream() throws Exception {
        return new OutputStream() { // from class: org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase.3
            private boolean closed = false;
            private int count;

            @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                super.close();
                this.closed = true;
            }

            @Override // java.io.OutputStream
            public void write(int i) throws IOException {
                int i2 = this.count;
                this.count = i2 + 1;
                if ((i2 % 1024) * 1024 == 0) {
                    LargeMessageTestBase.log.debug("OutputStream received " + this.count + " bytes");
                }
                if (this.closed) {
                    throw new IOException("Stream was closed");
                }
            }
        };
    }

    public static void adjustLargeCompression(boolean z, TestLargeMessageInputStream testLargeMessageInputStream, int i) throws IOException {
        int abs = Math.abs(i);
        while (true) {
            DeflaterReader deflaterReader = new DeflaterReader(testLargeMessageInputStream, new AtomicLong());
            try {
                byte[] bArr = new byte[52400];
                int i2 = 0;
                for (int read = deflaterReader.read(bArr); read != -1; read = deflaterReader.read(bArr)) {
                    i2 += read;
                }
                if (z && i2 < testLargeMessageInputStream.getMinLarge()) {
                    testLargeMessageInputStream.resetAdjust(0);
                    deflaterReader.close();
                    return;
                } else if (!z && i2 > testLargeMessageInputStream.getMinLarge()) {
                    testLargeMessageInputStream.resetAdjust(0);
                    deflaterReader.close();
                    return;
                } else {
                    testLargeMessageInputStream.resetAdjust(z ? -abs : abs);
                    deflaterReader.close();
                }
            } catch (Throwable th) {
                deflaterReader.close();
                throw th;
            }
        }
    }
}
