package org.apache.activemq.artemis.tests.integration.client;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.class */
public class LargeMessageTest extends LargeMessageTestBase {
    static final int RECEIVE_WAIT_TIME = 10000;
    protected ServerLocator locator;
    private final int LARGE_MESSAGE_SIZE = 20480;
    private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    protected boolean isCompressedTest = false;

    protected boolean isNetty() {
        return false;
    }

    @Test
    public void testRollbackPartiallyConsumedBuffer() throws Exception {
        for (int i = 0; i < 1; i++) {
            this.log.info("#test " + i);
            internalTestRollbackPartiallyConsumedBuffer(false);
            tearDown();
            setUp();
        }
    }

    @Test
    public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception {
        internalTestRollbackPartiallyConsumedBuffer(true);
    }

    private void internalTestRollbackPartiallyConsumedBuffer(boolean z) throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        AddressSettings addressSettings = new AddressSettings();
        if (z) {
            addressSettings.setRedeliveryDelay(100L);
            if (this.locator.isCompressLargeMessage()) {
                this.locator.setConsumerWindowSize(0);
            }
        }
        addressSettings.setMaxDeliveryAttempts(-1);
        createServer.getAddressSettingsRepository().addMatch("#", addressSettings);
        createServer.start();
        final ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession(false, false, false);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 20; i++) {
            ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 102400L, true);
            createLargeClientMessageStreaming.putIntProperty("value", i);
            createProducer.send(createLargeClientMessageStreaming);
        }
        createSession.commit();
        createSession.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
        createConsumer.setMessageHandler(new MessageHandler() { // from class: org.apache.activemq.artemis.tests.integration.client.LargeMessageTest.1
            int counter = 0;

            public void onMessage(ClientMessage clientMessage) {
                clientMessage.getBodyBuffer().readByte();
                try {
                    int i2 = this.counter;
                    this.counter = i2 + 1;
                    if (i2 < 20) {
                        Thread.sleep(100L);
                        clientMessage.acknowledge();
                        createSession.rollback();
                    } else {
                        clientMessage.acknowledge();
                        createSession.commit();
                    }
                    if (this.counter == 40) {
                        countDownLatch.countDown();
                    }
                } catch (Exception e) {
                    countDownLatch.countDown();
                    e.printStackTrace();
                    atomicInteger.incrementAndGet();
                }
            }
        });
        assertTrue(countDownLatch.await(40L, TimeUnit.SECONDS));
        createConsumer.close();
        createSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testCloseConsumer() throws Exception {
        createServer(true, isNetty()).start();
        ClientSession addClientSession = addClientSession(addSessionFactory(createSessionFactory(this.locator)).createSession(false, false, false));
        addClientSession.createTemporaryQueue(this.ADDRESS, this.ADDRESS);
        addClientSession.createProducer(this.ADDRESS).send(createLargeClientMessageStreaming(addClientSession, 358400L, true));
        addClientSession.commit();
        addClientSession.start();
        ClientConsumer createConsumer = addClientSession.createConsumer(this.ADDRESS);
        ClientMessage receive = createConsumer.receive(1000L);
        receive.acknowledge();
        addClientSession.commit();
        Assert.assertNotNull(receive);
        createConsumer.close();
        try {
            receive.getBodyBuffer().readByte();
            Assert.fail("Exception was expected");
        } catch (Exception e) {
        }
        addClientSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testDeleteOnNoBinding() throws Exception {
        createServer(true, isNetty()).start();
        ClientSession addClientSession = addClientSession(addSessionFactory(createSessionFactory(this.locator)).createSession(false, true, false));
        addClientSession.createProducer(UUID.randomUUID().toString()).send(createLargeClientMessageStreaming(addClientSession, 358400L, true));
        addClientSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testDeleteOnDrop() throws Exception {
        fillAddress();
        ClientSession addClientSession = addClientSession(addSessionFactory(createSessionFactory(this.locator)).createSession(false, true, false));
        addClientSession.createProducer(this.ADDRESS).send(createLargeClientMessageStreaming(addClientSession, 358400L, true));
        validateNoFilesOnLargeDir();
    }

    private void fillAddress() throws Exception {
        ActiveMQServer createServer = createServer(true, createDefaultInVMConfig().setJournalSyncNonTransactional(false), 10240L, 102400L, new HashMap());
        createServer.start();
        ((AddressSettings) createServer.getAddressSettingsRepository().getMatch("#")).setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
        this.locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false, false);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        byte[] bArr = new byte[1024];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        for (int i = 1; i <= 1024; i++) {
            wrap.put(getSamplebyte(i));
        }
        for (int i2 = 0; i2 < 5000; i2++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.getBodyBuffer().writeBytes(bArr);
            createMessage.putIntProperty(new SimpleString("id"), i2);
            createProducer.send(createMessage);
            if (i2 % 1000 == 0) {
                createSession.commit();
            }
        }
        createSession.commit();
        createSession.close();
    }

    @Test
    public void testLargeBufferTransacted() throws Exception {
        doTestLargeBuffer(true);
    }

    @Test
    public void testLargeBufferNotTransacted() throws Exception {
        doTestLargeBuffer(false);
    }

    public void doTestLargeBuffer(boolean z) throws Exception {
        createServer(true, createDefaultConfig(isNetty()).setJournalFileSize(102400).setJournalBufferSize_AIO(10240).setJournalBufferSize_NIO(10240)).start();
        ClientSession addClientSession = addClientSession(addSessionFactory(createSessionFactory(this.locator)).createSession(!z, !z, 0));
        addClientSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        ClientProducer createProducer = addClientSession.createProducer(this.ADDRESS);
        ClientMessage createMessage = addClientSession.createMessage(true);
        for (int i = 0; i < 307200; i++) {
            createMessage.getBodyBuffer().writeByte(getSamplebyte(i));
        }
        createProducer.send(createMessage);
        if (z) {
            addClientSession.commit();
        }
        addClientSession.start();
        ClientConsumer createConsumer = addClientSession.createConsumer(this.ADDRESS);
        ClientMessage receive = createConsumer.receive(5000L);
        assertNotNull(receive);
        Assert.assertNotNull(receive);
        for (int i2 = 0; i2 < 307200; i2++) {
            assertEquals("position = " + i2, getSamplebyte(i2), receive.getBodyBuffer().readByte());
        }
        receive.acknowledge();
        createConsumer.close();
        if (z) {
            addClientSession.commit();
        }
        addClientSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testDLALargeMessage() throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.start();
        ClientSession addClientSession = addClientSession(addSessionFactory(createSessionFactory(this.locator)).createSession(false, false, false));
        addClientSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        addClientSession.createQueue(this.ADDRESS, this.ADDRESS.concat("-2"), true);
        SimpleString concat = this.ADDRESS.concat("-dla");
        createServer.getAddressSettingsRepository().addMatch("*", new AddressSettings().setDeadLetterAddress(concat).setMaxDeliveryAttempts(1));
        addClientSession.createQueue(concat, concat, true);
        addClientSession.createProducer(this.ADDRESS).send(createLargeClientMessageStreaming(addClientSession, 358400L, true));
        addClientSession.commit();
        addClientSession.start();
        ClientConsumer createConsumer = addClientSession.createConsumer(concat);
        ClientConsumer createConsumer2 = addClientSession.createConsumer(this.ADDRESS);
        ClientMessage receive = createConsumer2.receive(1000L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        addClientSession.rollback();
        createConsumer2.close();
        Assert.assertNotNull(createConsumer.receive(10000L));
        for (int i = 0; i < 358400; i++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), r0.getBodyBuffer().readByte());
        }
        addClientSession.close();
        createServer.stop();
        ActiveMQServer createServer2 = createServer(true, isNetty());
        createServer2.start();
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, false, false);
        createSession.start();
        ClientMessage receive2 = createSession.createConsumer(concat).receive(10000L);
        Assert.assertNotNull(receive2);
        for (int i2 = 0; i2 < 358400; i2++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i2), receive2.getBodyBuffer().readByte());
        }
        receive2.acknowledge();
        createSession.commit();
        validateNoFilesOnLargeDir(createServer2.getConfiguration().getLargeMessagesDirectory(), this.isCompressedTest ? 0 : 1);
        ClientMessage receive3 = createSession.createConsumer(this.ADDRESS.concat("-2")).receive(10000L);
        Assert.assertNotNull(receive3);
        for (int i3 = 0; i3 < 358400; i3++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i3), receive3.getBodyBuffer().readByte());
        }
        receive3.acknowledge();
        createSession.commit();
        createSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testDeliveryCount() throws Exception {
        createServer(true, isNetty()).start();
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(this.locator));
        ClientSession createSession = addSessionFactory.createSession(false, false, false);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        createSession.createProducer(this.ADDRESS).send(createLargeClientMessageStreaming(createSession, 358400L, true));
        createSession.commit();
        createSession.start();
        ClientMessage receive = createSession.createConsumer(this.ADDRESS).receive(10000L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        Assert.assertEquals(1L, receive.getDeliveryCount());
        this.log.info("body buffer is " + receive.getBodyBuffer());
        for (int i = 0; i < 358400; i++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), receive.getBodyBuffer().readByte());
        }
        createSession.rollback();
        createSession.close();
        ClientSession createSession2 = addSessionFactory.createSession(false, false, false);
        createSession2.start();
        ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS);
        ClientMessage receive2 = createConsumer.receive(10000L);
        Assert.assertNotNull(receive2);
        receive2.acknowledge();
        for (int i2 = 0; i2 < 358400; i2++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i2), receive2.getBodyBuffer().readByte());
        }
        Assert.assertEquals(2L, receive2.getDeliveryCount());
        receive2.acknowledge();
        createConsumer.close();
        createSession2.commit();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testDLAOnExpiryNonDurableMessage() throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.start();
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(this.locator));
        SimpleString concat = this.ADDRESS.concat("-dla");
        SimpleString concat2 = this.ADDRESS.concat("-expiry");
        createServer.getAddressSettingsRepository().addMatch("*", new AddressSettings().setDeadLetterAddress(concat).setExpiryAddress(concat2).setMaxDeliveryAttempts(1));
        ClientSession createSession = addSessionFactory.createSession(false, false, false);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        createSession.createQueue(concat, concat, true);
        createSession.createQueue(concat2, concat2, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 358400L, false);
        createLargeClientMessageStreaming.setExpiration(System.currentTimeMillis());
        createProducer.send(createLargeClientMessageStreaming);
        createSession.commit();
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
        Assert.assertNull(createConsumer.receiveImmediate());
        createConsumer.close();
        ClientConsumer createConsumer2 = createSession.createConsumer(concat2);
        ClientMessage receive = createConsumer2.receive(5000L);
        assertTrue(receive.isLargeMessage());
        Assert.assertNotNull(receive);
        receive.acknowledge();
        for (int i = 0; i < 358400; i++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), receive.getBodyBuffer().readByte());
        }
        createSession.rollback();
        createConsumer2.close();
        for (int i2 = 0; i2 < 10; i2++) {
            ClientConsumer createConsumer3 = createSession.createConsumer(concat);
            ClientMessage receive2 = createConsumer3.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
            for (int i3 = 0; i3 < 358400; i3++) {
                Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i3), receive2.getBodyBuffer().readByte());
            }
            createSession.rollback();
            createConsumer3.close();
        }
        createSession.close();
        ClientSession createSession2 = addSessionFactory.createSession(false, false, false);
        createSession2.start();
        ClientConsumer createConsumer4 = createSession2.createConsumer(concat);
        ClientMessage receive3 = createConsumer4.receive(5000L);
        Assert.assertNotNull(receive3);
        receive3.acknowledge();
        for (int i4 = 0; i4 < 358400; i4++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i4), receive3.getBodyBuffer().readByte());
        }
        createSession2.commit();
        createConsumer4.close();
        createSession2.commit();
        createSession2.close();
        createServer.stop();
        createServer.start();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testDLAOnExpiry() throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.start();
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(this.locator));
        SimpleString concat = this.ADDRESS.concat("-dla");
        SimpleString concat2 = this.ADDRESS.concat("-expiry");
        createServer.getAddressSettingsRepository().addMatch("*", new AddressSettings().setDeadLetterAddress(concat).setExpiryAddress(concat2).setMaxDeliveryAttempts(1));
        ClientSession createSession = addSessionFactory.createSession(false, false, false);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        createSession.createQueue(concat, concat, true);
        createSession.createQueue(concat2, concat2, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 358400L, true);
        createLargeClientMessageStreaming.setExpiration(System.currentTimeMillis());
        createProducer.send(createLargeClientMessageStreaming);
        createSession.commit();
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
        Assert.assertNull(createConsumer.receiveImmediate());
        createConsumer.close();
        ClientConsumer createConsumer2 = createSession.createConsumer(concat2);
        ClientMessage receive = createConsumer2.receive(5000L);
        Assert.assertNotNull(receive);
        receive.acknowledge();
        for (int i = 0; i < 358400; i++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), receive.getBodyBuffer().readByte());
        }
        createSession.rollback();
        createConsumer2.close();
        for (int i2 = 0; i2 < 10; i2++) {
            ClientConsumer createConsumer3 = createSession.createConsumer(concat);
            ClientMessage receive2 = createConsumer3.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
            for (int i3 = 0; i3 < 358400; i3++) {
                Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i3), receive2.getBodyBuffer().readByte());
            }
            createSession.rollback();
            createConsumer3.close();
        }
        createSession.close();
        createServer.stop();
        createServer(true, isNetty()).start();
        ClientSession createSession2 = createSessionFactory(this.locator).createSession(false, false, false);
        createSession2.start();
        ClientConsumer createConsumer4 = createSession2.createConsumer(concat);
        ClientMessage receive3 = createConsumer4.receive(5000L);
        Assert.assertNotNull(receive3);
        receive3.acknowledge();
        for (int i4 = 0; i4 < 358400; i4++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i4), receive3.getBodyBuffer().readByte());
        }
        createSession2.commit();
        createConsumer4.close();
        createSession2.commit();
        createSession2.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testExpiryLargeMessage() throws Exception {
        ClientSession clientSession = null;
        try {
            ActiveMQServer createServer = createServer(true, isNetty());
            createServer.start();
            SimpleString concat = this.ADDRESS.concat("-expiry");
            createServer.getAddressSettingsRepository().addMatch("*", new AddressSettings().setExpiryAddress(concat));
            ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession(false, false, false);
            createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
            createSession.createQueue(concat, concat, true);
            ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
            ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 307200L, true);
            createLargeClientMessageStreaming.setExpiration(System.currentTimeMillis());
            createProducer.send(createLargeClientMessageStreaming);
            createSession.commit();
            createSession.start();
            ClientConsumer createConsumer = createSession.createConsumer(concat);
            Assert.assertNull(createSession.createConsumer(this.ADDRESS).receiveImmediate());
            Assert.assertNotNull(createConsumer.receive(50000L));
            for (int i = 0; i < 307200; i++) {
                Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), r0.getBodyBuffer().readByte());
            }
            createSession.close();
            createServer.stop();
            createServer(true, isNetty()).start();
            clientSession = createSessionFactory(this.locator).createSession(false, false, false);
            clientSession.start();
            ClientMessage receive = clientSession.createConsumer(concat).receive(10000L);
            Assert.assertNotNull(receive);
            for (int i2 = 0; i2 < 307200; i2++) {
                Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i2), receive.getBodyBuffer().readByte());
            }
            receive.acknowledge();
            clientSession.commit();
            clientSession.close();
            validateNoFilesOnLargeDir();
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                clientSession.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    @Test
    public void testSentWithDuplicateIDBridge() throws Exception {
        internalTestSentWithDuplicateID(true);
    }

    @Test
    public void testSentWithDuplicateID() throws Exception {
        internalTestSentWithDuplicateID(false);
    }

    private void internalTestSentWithDuplicateID(boolean z) throws Exception {
        ClientSession clientSession = null;
        try {
            createServer(true, isNetty()).start();
            clientSession = addSessionFactory(createSessionFactory(this.locator)).createSession(true, true, 0);
            clientSession.createQueue(this.ADDRESS, this.ADDRESS, true);
            ClientProducer createProducer = clientSession.createProducer(this.ADDRESS);
            for (int i = 0; i < 10; i++) {
                ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(clientSession, 307200L, true);
                if (z) {
                    createLargeClientMessageStreaming.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, "Anything".getBytes());
                } else {
                    createLargeClientMessageStreaming.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, "Anything".getBytes());
                }
                createProducer.send(createLargeClientMessageStreaming);
            }
            ClientConsumer createConsumer = clientSession.createConsumer(this.ADDRESS);
            clientSession.start();
            ClientMessage receive = createConsumer.receive(10000L);
            for (int i2 = 0; i2 < 307200; i2++) {
                assertEquals(getSamplebyte(i2), receive.getBodyBuffer().readByte());
            }
            assertNotNull(receive);
            receive.acknowledge();
            assertNull(createConsumer.receiveImmediate());
            clientSession.commit();
            validateNoFilesOnLargeDir();
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                clientSession.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    @Test
    public void testResendSmallStreamMessage() throws Exception {
        internalTestResendMessage(50000L);
    }

    @Test
    public void testResendLargeStreamMessage() throws Exception {
        internalTestResendMessage(153600L);
    }

    public void internalTestResendMessage(long j) throws Exception {
        clearDataRecreateServerDirs();
        ClientSession clientSession = null;
        try {
            createServer(true, isNetty()).start();
            clientSession = addSessionFactory(createSessionFactory(this.locator)).createSession(false, false, false);
            clientSession.createQueue(this.ADDRESS, this.ADDRESS, true);
            SimpleString concat = this.ADDRESS.concat("-2");
            clientSession.createQueue(concat, concat, true);
            ClientProducer createProducer = clientSession.createProducer(this.ADDRESS);
            ClientProducer createProducer2 = clientSession.createProducer(concat);
            createProducer.send(createLargeClientMessageStreaming(clientSession, j, false));
            clientSession.commit();
            clientSession.start();
            ClientConsumer createConsumer = clientSession.createConsumer(this.ADDRESS);
            ClientConsumer createConsumer2 = clientSession.createConsumer(concat);
            ClientMessage receive = createConsumer.receive(10000L);
            receive.acknowledge();
            createProducer2.send(receive);
            clientSession.commit();
            ClientMessage receive2 = createConsumer2.receive(10000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
            clientSession.commit();
            Assert.assertEquals(j, receive2.getBodySize());
            compareString(j, receive2);
            clientSession.close();
            validateNoFilesOnLargeDir();
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                clientSession.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    @Test
    public void testResendCachedSmallStreamMessage() throws Exception {
        internalTestResendMessage(50000L);
    }

    @Test
    public void testResendCachedLargeStreamMessage() throws Exception {
        internalTestCachedResendMessage(153600L);
    }

    public void internalTestCachedResendMessage(long j) throws Exception {
        ClientSession clientSession = null;
        try {
            createServer(true, isNetty()).start();
            this.locator.setMinLargeMessageSize(200).setCacheLargeMessagesClient(true);
            clientSession = addSessionFactory(createSessionFactory(this.locator)).createSession(false, false, false);
            clientSession.createQueue(this.ADDRESS, this.ADDRESS, true);
            ClientProducer createProducer = clientSession.createProducer(this.ADDRESS);
            createProducer.send(createLargeClientMessageStreaming(clientSession, j, false));
            clientSession.commit();
            ClientConsumer createConsumer = clientSession.createConsumer(this.ADDRESS);
            clientSession.start();
            ClientMessage receive = createConsumer.receive(10000L);
            receive.acknowledge();
            clientSession.commit();
            compareString(j, receive);
            receive.getBodyBuffer().readerIndex(0);
            createProducer.send(receive);
            clientSession.commit();
            ClientMessage receive2 = createConsumer.receive(10000L);
            receive2.acknowledge();
            compareString(j, receive2);
            clientSession.commit();
            clientSession.close();
            validateNoFilesOnLargeDir();
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                clientSession.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    private void compareString(long j, ClientMessage clientMessage) {
        assertNotNull(clientMessage);
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                return;
            }
            Assert.assertEquals("position " + j3, ActiveMQTestBase.getSamplebyte(j3), clientMessage.getBodyBuffer().readByte());
            j2 = j3 + 1;
        }
    }

    @Test
    public void testFilePersistenceOneHugeMessage() throws Exception {
        testChunks(false, false, false, true, true, false, false, false, false, 1, 104857600L, RECEIVE_WAIT_TIME, 0L, 10485760, 1048576);
    }

    @Test
    public void testFilePersistenceOneMessageStreaming() throws Exception {
        testChunks(false, false, false, true, true, false, false, false, false, 1, 104857600L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceSmallMessageStreaming() throws Exception {
        testChunks(false, false, false, true, true, false, false, false, false, 100, 1024L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceOneHugeMessageConsumer() throws Exception {
        testChunks(false, false, false, true, true, false, false, false, true, 1, 104857600L, 120000, 0L, 10485760, 1048576);
    }

    @Test
    public void testFilePersistence() throws Exception {
        testChunks(false, false, true, false, true, false, false, true, false, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceConsumer() throws Exception {
        testChunks(false, false, true, false, true, false, false, true, true, 2, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceXA() throws Exception {
        testChunks(true, false, true, false, true, false, false, true, false, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceXAStream() throws Exception {
        testChunks(true, false, false, true, true, false, false, false, false, 1, 1048576L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceXAStreamRestart() throws Exception {
        testChunks(true, true, false, true, true, false, false, false, false, 1, 1048576L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceXAConsumer() throws Exception {
        testChunks(true, false, true, false, true, false, false, true, true, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceXAConsumerRestart() throws Exception {
        testChunks(true, true, true, false, true, false, false, true, true, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlocked() throws Exception {
        testChunks(false, false, true, false, true, false, true, true, false, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedConsumer() throws Exception {
        testChunks(false, false, true, false, true, false, true, true, true, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedXA() throws Exception {
        testChunks(true, false, true, false, true, false, true, true, false, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedXAConsumer() throws Exception {
        testChunks(true, false, true, false, true, false, true, true, true, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedPreACK() throws Exception {
        testChunks(false, false, true, false, true, true, true, true, false, 1, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedPreACKConsumer() throws Exception {
        testChunks(false, false, true, false, true, true, true, true, true, 1, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedPreACKXA() throws Exception {
        testChunks(true, false, true, false, true, true, true, true, false, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedPreACKXARestart() throws Exception {
        testChunks(true, true, true, false, true, true, true, true, false, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedPreACKXAConsumer() throws Exception {
        testChunks(true, false, true, false, true, true, true, true, true, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceBlockedPreACKXAConsumerRestart() throws Exception {
        testChunks(true, true, true, false, true, true, true, true, true, 100, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testFilePersistenceDelayed() throws Exception {
        testChunks(false, false, true, false, true, false, false, false, false, 1, 20480L, RECEIVE_WAIT_TIME, 2000L);
    }

    @Test
    public void testFilePersistenceDelayedConsumer() throws Exception {
        testChunks(false, false, true, false, true, false, false, false, true, 1, 20480L, RECEIVE_WAIT_TIME, 2000L);
    }

    @Test
    public void testFilePersistenceDelayedXA() throws Exception {
        testChunks(true, false, true, false, true, false, false, false, false, 1, 20480L, RECEIVE_WAIT_TIME, 2000L);
    }

    @Test
    public void testFilePersistenceDelayedXAConsumer() throws Exception {
        testChunks(true, false, true, false, true, false, false, false, true, 1, 20480L, RECEIVE_WAIT_TIME, 2000L);
    }

    @Test
    public void testNullPersistence() throws Exception {
        testChunks(false, false, true, false, false, false, false, true, true, 1, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testNullPersistenceConsumer() throws Exception {
        testChunks(false, false, true, false, false, false, false, true, true, 1, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testNullPersistenceXA() throws Exception {
        testChunks(true, false, true, false, false, false, false, true, false, 1, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testNullPersistenceXAConsumer() throws Exception {
        testChunks(true, false, true, false, false, false, false, true, true, 1, 20480L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testNullPersistenceDelayed() throws Exception {
        testChunks(false, false, true, false, false, false, false, false, false, 100, 20480L, RECEIVE_WAIT_TIME, 100L);
    }

    @Test
    public void testNullPersistenceDelayedConsumer() throws Exception {
        testChunks(false, false, true, false, false, false, false, false, true, 100, 20480L, RECEIVE_WAIT_TIME, 100L);
    }

    @Test
    public void testNullPersistenceDelayedXA() throws Exception {
        testChunks(true, false, true, false, false, false, false, false, false, 100, 20480L, RECEIVE_WAIT_TIME, 100L);
    }

    @Test
    public void testNullPersistenceDelayedXAConsumer() throws Exception {
        testChunks(true, false, true, false, false, false, false, false, true, 100, 20480L, RECEIVE_WAIT_TIME, 100L);
    }

    @Test
    public void testPageOnLargeMessage() throws Exception {
        testPageOnLargeMessage(true, false);
    }

    @Test
    public void testSendSmallMessageXA() throws Exception {
        testChunks(true, false, true, false, true, false, false, true, false, 100, 4L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendSmallMessageXAConsumer() throws Exception {
        testChunks(true, false, true, false, true, false, false, true, true, 100, 4L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendSmallMessageNullPersistenceXA() throws Exception {
        testChunks(true, false, true, false, false, false, false, true, false, 100, 100L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendSmallMessageNullPersistenceXAConsumer() throws Exception {
        testChunks(true, false, true, false, false, false, false, true, true, 100, 100L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception {
        testChunks(false, false, true, false, false, false, false, false, false, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception {
        testChunks(false, false, true, false, false, false, false, false, true, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception {
        testChunks(true, false, true, false, false, false, false, false, false, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception {
        testChunks(true, false, true, false, false, false, false, false, true, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessagePersistence() throws Exception {
        testChunks(false, false, true, false, true, false, false, true, false, 100, 100L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendRegularMessagePersistenceConsumer() throws Exception {
        testChunks(false, false, true, false, true, false, false, true, true, 100, 100L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendRegularMessagePersistenceXA() throws Exception {
        testChunks(true, false, true, false, true, false, false, true, false, 100, 100L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendRegularMessagePersistenceXAConsumer() throws Exception {
        testChunks(true, false, true, false, true, false, false, true, true, 100, 100L, RECEIVE_WAIT_TIME, 0L);
    }

    @Test
    public void testSendRegularMessagePersistenceDelayed() throws Exception {
        testChunks(false, false, true, false, true, false, false, false, false, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception {
        testChunks(false, false, true, false, true, false, false, false, true, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessagePersistenceDelayedXA() throws Exception {
        testChunks(false, false, true, false, true, false, false, false, false, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception {
        testChunks(false, false, true, false, true, false, false, false, true, 100, 100L, RECEIVE_WAIT_TIME, 1000L);
    }

    @Test
    public void testTwoBindingsTwoStartedConsumers() throws Exception {
        createServer(true, isNetty()).start();
        SimpleString[] simpleStringArr = {new SimpleString("queue1"), new SimpleString("queue2")};
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.createQueue(this.ADDRESS, simpleStringArr[0], (SimpleString) null, true);
        createSession.createQueue(this.ADDRESS, simpleStringArr[1], (SimpleString) null, true);
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 400000);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        createSession.start();
        createProducer.send(createLargeClientMessageStreaming);
        createProducer.close();
        ClientConsumer createConsumer = createSession.createConsumer(simpleStringArr[1]);
        ClientMessage receive = createConsumer.receive(10000L);
        receive.getBodyBuffer().readByte();
        Assert.assertNull(createConsumer.receiveImmediate());
        Assert.assertNotNull(receive);
        receive.acknowledge();
        createConsumer.close();
        this.log.debug("Stopping");
        createSession.stop();
        ClientConsumer createConsumer2 = createSession.createConsumer(simpleStringArr[0]);
        createSession.start();
        ClientMessage receive2 = createConsumer2.receive(10000L);
        Assert.assertNotNull(receive2);
        receive2.acknowledge();
        createConsumer2.close();
        createSession.commit();
        createSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testTwoBindingsAndRestart() throws Exception {
        testTwoBindings(true);
    }

    @Test
    public void testTwoBindingsNoRestart() throws Exception {
        testTwoBindings(false);
    }

    public void testTwoBindings(boolean z) throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.start();
        SimpleString[] simpleStringArr = {new SimpleString("queue1"), new SimpleString("queue2")};
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.createQueue(this.ADDRESS, simpleStringArr[0], (SimpleString) null, true);
        createSession.createQueue(this.ADDRESS, simpleStringArr[1], (SimpleString) null, true);
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 400000);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        createProducer.send(createLargeClientMessageStreaming);
        createProducer.close();
        readMessage(createSession, simpleStringArr[1], 400000);
        if (z) {
            createSession.close();
            createServer.stop();
            createServer(true, isNetty()).start();
            createSession = createSessionFactory(this.locator).createSession((String) null, (String) null, false, true, true, false, 0);
        }
        readMessage(createSession, simpleStringArr[0], 400000);
        createSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testSendRollbackXADurable() throws Exception {
        internalTestSendRollback(true, true);
    }

    @Test
    public void testSendRollbackXANonDurable() throws Exception {
        internalTestSendRollback(true, false);
    }

    @Test
    public void testSendRollbackDurable() throws Exception {
        internalTestSendRollback(false, true);
    }

    @Test
    public void testSendRollbackNonDurable() throws Exception {
        internalTestSendRollback(false, false);
    }

    private void internalTestSendRollback(boolean z, boolean z2) throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.start();
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession(z, false, false);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        Xid xid = null;
        if (z) {
            xid = RandomUtil.randomXid();
            createSession.start(xid, 0);
        }
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 50000L, z2);
        for (int i = 0; i < 1; i++) {
            createProducer.send(createLargeClientMessageStreaming);
        }
        if (z) {
            createSession.end(xid, 67108864);
            createSession.prepare(xid);
            createSession.close();
            createServer.stop();
            createServer.start();
            createSession = createSessionFactory(this.locator).createSession(z, false, false);
            createSession.rollback(xid);
        } else {
            createSession.rollback();
        }
        createSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testSimpleRollback() throws Exception {
        simpleRollbackInternalTest(false);
    }

    @Test
    public void testSimpleRollbackXA() throws Exception {
        simpleRollbackInternalTest(true);
    }

    public void simpleRollbackInternalTest(boolean z) throws Exception {
        createServer(true, isNetty()).start();
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession(z, false, false);
        Xid xid = null;
        if (z) {
            xid = newXID();
            createSession.start(xid, 0);
        }
        createSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
        createSession.start();
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
        for (int i = 0; i < 10; i++) {
            createProducer.send(createLargeClientMessageStreaming(createSession, 200000, i % 2 == 0));
            Assert.assertNull(createConsumer.receiveImmediate());
            if (z) {
                createSession.end(xid, 67108864);
                createSession.rollback(xid);
                xid = newXID();
                createSession.start(xid, 0);
            } else {
                createSession.rollback();
            }
            createProducer.send(createLargeClientMessageStreaming(createSession, 200000, i % 2 == 0));
            Assert.assertNull(createConsumer.receiveImmediate());
            if (z) {
                createSession.end(xid, 67108864);
                createSession.commit(xid, true);
                xid = newXID();
                createSession.start(xid, 0);
            } else {
                createSession.commit();
            }
            for (int i2 = 0; i2 < 2; i2++) {
                ClientMessage receive = createConsumer.receive(5000L);
                Assert.assertNotNull(receive);
                Assert.assertEquals(200000, receive.getBodySize());
                receive.acknowledge();
                if (z) {
                    if (i2 == 0) {
                        createSession.end(xid, 67108864);
                        createSession.prepare(xid);
                        createSession.rollback(xid);
                        xid = newXID();
                        createSession.start(xid, 0);
                    } else {
                        createSession.end(xid, 67108864);
                        createSession.commit(xid, true);
                        xid = newXID();
                        createSession.start(xid, 0);
                    }
                } else if (i2 == 0) {
                    createSession.rollback();
                } else {
                    createSession.commit();
                }
            }
        }
        createSession.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testBufferMultipleLargeMessages() throws Exception {
        ClientSession clientSession = null;
        ActiveMQServer activeMQServer = null;
        try {
            activeMQServer = createServer(true, isNetty());
            activeMQServer.start();
            this.locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1048576);
            clientSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, false, false, false, 0);
            clientSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
            ClientProducer createProducer = clientSession.createProducer(this.ADDRESS);
            for (int i = 0; i < 30; i++) {
                ClientMessage createMessage = clientSession.createMessage(true);
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
                createProducer.send(createMessage);
            }
            clientSession.commit();
            createProducer.close();
            clientSession.start();
            ClientConsumerInternal createConsumer = clientSession.createConsumer(this.ADDRESS);
            long currentTimeMillis = System.currentTimeMillis() + 10000;
            while (createConsumer.getBufferSize() < 30 && currentTimeMillis > System.currentTimeMillis()) {
                Thread.sleep(10L);
            }
            Assert.assertEquals(30L, createConsumer.getBufferSize());
            for (int i2 = 0; i2 < 2; i2++) {
                for (int i3 = 0; i3 < 30; i3++) {
                    ClientMessage receive = createConsumer.receive(10000L);
                    Assert.assertNotNull(receive);
                    if (i2 == 0) {
                        for (int i4 = 0; i4 < 10240; i4++) {
                            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i4), receive.getBodyBuffer().readByte());
                        }
                    }
                    receive.acknowledge();
                }
                if (i2 == 0) {
                    clientSession.rollback();
                } else {
                    clientSession.commit();
                }
            }
            Assert.assertEquals(0L, activeMQServer.getPostOffice().getBinding(this.ADDRESS).getBindable().getDeliveringCount());
            Assert.assertEquals(0L, getMessageCount((Queue) activeMQServer.getPostOffice().getBinding(this.ADDRESS).getBindable()));
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th2) {
            }
        } catch (Throwable th3) {
            try {
                clientSession.close();
            } catch (Throwable th4) {
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th5) {
            }
            throw th3;
        }
    }

    @Test
    public void testReceiveMultipleMessages() throws Exception {
        ClientSession clientSession = null;
        ActiveMQServer activeMQServer = null;
        try {
            activeMQServer = createServer(true, isNetty());
            activeMQServer.start();
            this.locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1048576);
            clientSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, false, false, false, 0);
            clientSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
            ClientProducer createProducer = clientSession.createProducer(this.ADDRESS);
            for (int i = 0; i < 1000; i++) {
                ClientMessage createMessage = clientSession.createMessage(true);
                createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
                createProducer.send(createMessage);
            }
            clientSession.commit();
            createProducer.close();
            clientSession.start();
            for (int i2 = 0; i2 < 2; i2++) {
                ClientConsumerInternal createConsumer = clientSession.createConsumer(this.ADDRESS);
                long currentTimeMillis = System.currentTimeMillis() + 10000;
                while (createConsumer.getBufferSize() < 10 && currentTimeMillis > System.currentTimeMillis()) {
                    Thread.sleep(10L);
                }
                for (int i3 = 0; i3 < 1000; i3++) {
                    ClientMessage receive = createConsumer.receive(10000L);
                    Assert.assertNotNull(receive);
                    if (i2 == 0) {
                        for (int i4 = 0; i4 < 10240; i4++) {
                            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i4), receive.getBodyBuffer().readByte());
                        }
                    }
                    receive.acknowledge();
                }
                if (i2 == 0) {
                    clientSession.rollback();
                } else {
                    clientSession.commit();
                }
                createConsumer.close();
            }
            Assert.assertEquals(0L, activeMQServer.getPostOffice().getBinding(this.ADDRESS).getBindable().getDeliveringCount());
            Assert.assertEquals(0L, getMessageCount((Queue) activeMQServer.getPostOffice().getBinding(this.ADDRESS).getBindable()));
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th2) {
            }
        } catch (Throwable th3) {
            try {
                clientSession.close();
            } catch (Throwable th4) {
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th5) {
            }
            throw th3;
        }
    }

    @Test
    public void testPageOnLargeMessageMultipleQueues() throws Exception {
        Configuration createDefaultConfig = createDefaultConfig(isNetty());
        HashMap hashMap = new HashMap();
        hashMap.put(this.ADDRESS.toString(), new AddressSettings());
        ActiveMQServer createServer = createServer(true, createDefaultConfig, 10240L, 20480L, hashMap);
        createServer.start();
        this.locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.createQueue(this.ADDRESS, this.ADDRESS.concat("-0"), (SimpleString) null, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS.concat("-1"), (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.getBodyBuffer().writerIndex(0);
            createMessage.getBodyBuffer().writeBytes(new byte[1024]);
            for (int i2 = 1; i2 <= 1024; i2++) {
                createMessage.getBodyBuffer().writeInt(i2);
            }
            createProducer.send(createMessage);
        }
        ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 400000);
        createLargeClientMessageStreaming.putBooleanProperty("TestLarge", true);
        createProducer.send(createLargeClientMessageStreaming);
        for (int i3 = 0; i3 < 100; i3++) {
            ClientMessage createMessage2 = createSession.createMessage(true);
            createMessage2.getBodyBuffer().writeBytes(new byte[1024]);
            createProducer.send(createMessage2);
        }
        createSession.close();
        createServer.stop();
        createServer(true, createDefaultConfig, 10240L, 20480L, hashMap).start();
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        for (int i4 = 0; i4 < 2; i4++) {
            ClientSession createSession2 = createSessionFactory.createSession(false, false, false);
            ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS.concat("-" + i4));
            createSession2.start();
            for (int i5 = 0; i5 < 100; i5++) {
                ClientMessage receive = createConsumer.receive(10000L);
                Assert.assertNotNull(receive);
                receive.acknowledge();
                Assert.assertNotNull(receive);
            }
            createSession2.commit();
            for (int i6 = 0; i6 < 5; i6++) {
                ClientMessage receive2 = createConsumer.receive(10000L);
                assertTrue(receive2.getBooleanProperty("TestLarge").booleanValue());
                Assert.assertNotNull(receive2);
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                receive2.acknowledge();
                receive2.saveToOutputStream(byteArrayOutputStream);
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                assertEquals(400000L, byteArray.length);
                for (int i7 = 0; i7 < byteArray.length; i7++) {
                    assertEquals(getSamplebyte(i7), byteArray[i7]);
                }
                if (i6 < 4) {
                    createSession2.rollback();
                } else {
                    createSession2.commit();
                }
            }
            for (int i8 = 0; i8 < 100; i8++) {
                ClientMessage receive3 = createConsumer.receive(10000L);
                Assert.assertNotNull(receive3);
                receive3.acknowledge();
                Assert.assertNotNull(receive3);
            }
            createSession2.commit();
            createConsumer.close();
            createSession2.close();
        }
    }

    @Test
    public void testPageOnLargeMessageMultipleQueues2() throws Exception {
        Configuration createDefaultConfig = createDefaultConfig(isNetty());
        HashMap hashMap = new HashMap();
        hashMap.put(this.ADDRESS.toString(), new AddressSettings());
        createServer(true, createDefaultConfig, 10240L, 20480L, hashMap).start();
        this.locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false).setCompressLargeMessage(true);
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(this.locator));
        ClientSession createSession = addSessionFactory.createSession(false, true, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS.concat("-0"), (SimpleString) null, true);
        createSession.createQueue(this.ADDRESS, this.ADDRESS.concat("-1"), (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        int i = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage createMessage = createSession.createMessage(true);
            int i3 = i;
            i++;
            createMessage.putIntProperty("msgID", i3);
            createMessage.putBooleanProperty("TestLarge", false);
            createMessage.getBodyBuffer().writerIndex(0);
            createMessage.getBodyBuffer().writeBytes(new byte[1024]);
            for (int i4 = 1; i4 <= 1024; i4++) {
                createMessage.getBodyBuffer().writeInt(i4);
            }
            createProducer.send(createMessage);
        }
        for (int i5 = 0; i5 < 10; i5++) {
            ClientMessage createLargeClientMessageStreaming = createLargeClientMessageStreaming(createSession, 400000);
            createLargeClientMessageStreaming.putBooleanProperty("TestLarge", true);
            createProducer.send(createLargeClientMessageStreaming);
        }
        createSession.close();
        for (int i6 = 0; i6 < 2; i6++) {
            ClientSession createSession2 = addSessionFactory.createSession(false, false, false);
            ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS.concat("-" + i6));
            createSession2.start();
            for (int i7 = 0; i7 < 5; i7++) {
                for (int i8 = 0; i8 < 100; i8++) {
                    ClientMessage receive = createConsumer.receive(10000L);
                    Assert.assertNotNull(receive);
                    assertFalse(receive.getBooleanProperty("TestLarge").booleanValue());
                    receive.acknowledge();
                    Assert.assertNotNull(receive);
                }
                for (int i9 = 0; i9 < 10; i9++) {
                    ClientMessage receive2 = createConsumer.receive(10000L);
                    Assert.assertNotNull(receive2);
                    assertTrue(receive2.getBooleanProperty("TestLarge").booleanValue());
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    receive2.acknowledge();
                    receive2.saveToOutputStream(byteArrayOutputStream);
                    assertEquals(400000L, byteArrayOutputStream.toByteArray().length);
                }
                createSession2.rollback();
            }
            createSession2.commit();
            createConsumer.close();
            createSession2.close();
        }
    }

    @Test
    public void testSendStreamingSingleMessage() throws Exception {
        ClientSession clientSession = null;
        ActiveMQServer activeMQServer = null;
        try {
            activeMQServer = createServer(true, isNetty());
            activeMQServer.start();
            this.locator.setMinLargeMessageSize(102400);
            clientSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, true, true, false, 0);
            clientSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
            ClientMessage createMessage = clientSession.createMessage(true);
            createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10485760L));
            ClientProducer createProducer = clientSession.createProducer(this.ADDRESS);
            clientSession.start();
            this.log.debug("Sending");
            createProducer.send(createMessage);
            createProducer.close();
            this.log.debug("Waiting");
            ClientMessage receive = clientSession.createConsumer(this.ADDRESS).receive(10000L);
            receive.acknowledge();
            receive.setOutputStream(createFakeOutputStream());
            Assert.assertTrue(receive.waitOutputStreamCompletion(60000L));
            clientSession.commit();
            Assert.assertEquals(0L, activeMQServer.getPostOffice().getBinding(this.ADDRESS).getBindable().getDeliveringCount());
            Assert.assertEquals(0L, getMessageCount((Queue) activeMQServer.getPostOffice().getBinding(this.ADDRESS).getBindable()));
            try {
                clientSession.close();
            } catch (Throwable th) {
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th2) {
            }
        } catch (Throwable th3) {
            try {
                clientSession.close();
            } catch (Throwable th4) {
            }
            try {
                activeMQServer.stop();
            } catch (Throwable th5) {
            }
            throw th3;
        }
    }

    @Test
    public void testIgnoreStreaming() throws Exception {
        createServer(true, isNetty()).start();
        this.locator.setMinLargeMessageSize(1024);
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        for (int i = 0; i < 1; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10240L));
            createMessage.putIntProperty(new SimpleString("key"), i);
            createProducer.send(createMessage);
            this.log.debug("Sent msg " + i);
        }
        createSession.start();
        this.log.debug("Sending");
        createProducer.close();
        this.log.debug("Waiting");
        ClientConsumer createConsumer = createSession.createConsumer(this.ADDRESS);
        for (int i2 = 0; i2 < 1; i2++) {
            ClientMessage receive = createConsumer.receive(50000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(Integer.valueOf(i2), receive.getObjectProperty(new SimpleString("key")));
            receive.acknowledge();
        }
        createConsumer.close();
        createSession.commit();
        Assert.assertEquals(0L, r0.getPostOffice().getBinding(this.ADDRESS).getBindable().getDeliveringCount());
        Assert.assertEquals(0L, getMessageCount((Queue) r0.getPostOffice().getBinding(this.ADDRESS).getBindable()));
        this.log.debug("Thread done");
    }

    @Test
    public void testSendServerMessage() throws Exception {
        ActiveMQServer createServer = createServer(true);
        createServer.start();
        ClientSession createSession = addSessionFactory(createSessionFactory(this.locator)).createSession(false, false);
        LargeServerMessageImpl largeServerMessageImpl = new LargeServerMessageImpl(createServer.getStorageManager());
        largeServerMessageImpl.setMessageID(1005L);
        for (int i = 0; i < 20480; i++) {
            largeServerMessageImpl.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
        }
        largeServerMessageImpl.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 20480L);
        largeServerMessageImpl.releaseResources();
        createSession.createQueue(this.ADDRESS, this.ADDRESS, true);
        createSession.createProducer(this.ADDRESS).send(largeServerMessageImpl);
        largeServerMessageImpl.deleteFile();
        createSession.commit();
        createSession.start();
        ClientMessage receive = createSession.createConsumer(this.ADDRESS).receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals(receive.getBodySize(), 20480L);
        for (int i2 = 0; i2 < 20480; i2++) {
            Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i2), receive.getBodyBuffer().readByte());
        }
        receive.acknowledge();
        createSession.commit();
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.locator = createFactory(isNetty());
    }

    protected void testPageOnLargeMessage(boolean z, boolean z2) throws Exception {
        Configuration createDefaultConfig = createDefaultConfig(isNetty());
        HashMap hashMap = new HashMap();
        hashMap.put(this.ADDRESS.toString(), new AddressSettings());
        ActiveMQServer createServer = createServer(z, createDefaultConfig, 10240L, 20480L, hashMap);
        createServer.start();
        ClientSessionFactory addSessionFactory = addSessionFactory(createSessionFactory(this.locator));
        if (z2) {
            addSessionFactory.getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
        }
        ClientSession createSession = addSessionFactory.createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.createQueue(this.ADDRESS, this.ADDRESS, (SimpleString) null, true);
        ClientProducer createProducer = createSession.createProducer(this.ADDRESS);
        ClientMessage clientMessage = null;
        for (int i = 0; i < 100; i++) {
            clientMessage = createSession.createMessage(true);
            clientMessage.getBodyBuffer().writerIndex(0);
            for (int i2 = 1; i2 <= 1024; i2++) {
                clientMessage.getBodyBuffer().writeInt(i2);
            }
            createProducer.send(clientMessage);
        }
        createProducer.send(createLargeClientMessageStreaming(createSession, 400000));
        createSession.close();
        if (z) {
            createServer.stop();
            createServer(true, createDefaultConfig, 10240L, 20480L, hashMap).start();
            addSessionFactory = createSessionFactory(this.locator);
        }
        ClientSession createSession2 = addSessionFactory.createSession((String) null, (String) null, false, true, true, false, 0);
        ClientConsumer createConsumer = createSession2.createConsumer(this.ADDRESS);
        createSession2.start();
        for (int i3 = 0; i3 < 100; i3++) {
            ClientMessage receive = createConsumer.receive(10000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
            Assert.assertNotNull(receive);
            clientMessage.getBodyBuffer().readerIndex(0);
            for (int i4 = 1; i4 <= 1024; i4++) {
                Assert.assertEquals(i4, clientMessage.getBodyBuffer().readInt());
            }
        }
        createConsumer.close();
        createSession2.close();
        ClientSession createSession3 = addSessionFactory.createSession((String) null, (String) null, false, true, true, false, 0);
        readMessage(createSession3, this.ADDRESS, 400000);
        createSession3.close();
    }
}
