package org.apache.activemq.artemis.tests.integration.client;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.class */
public class ConsumerWindowSizeTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final SimpleString addressA = new SimpleString("addressA");
    private final SimpleString queueA = new SimpleString("queueA");
    private final int TIMEOUT = 5;
    private ServerLocator locator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.tests.integration.client.ConsumerWindowSizeTest$1LocalHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest$1LocalHandler.class */
    public class C1LocalHandler implements MessageHandler {
        boolean failed = false;
        int count = 0;
        final /* synthetic */ CountDownLatch val$latchReceived;
        final /* synthetic */ CountDownLatch val$latchDone;
        final /* synthetic */ boolean val$largeMessages;
        final /* synthetic */ CountDownLatch val$latchRead;

        C1LocalHandler(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, boolean z, CountDownLatch countDownLatch3) {
            this.val$latchReceived = countDownLatch;
            this.val$latchDone = countDownLatch2;
            this.val$largeMessages = z;
            this.val$latchRead = countDownLatch3;
        }

        public synchronized void onMessage(ClientMessage clientMessage) {
            try {
                this.failed = this.failed || !ConsumerWindowSizeTest.this.getTextMessage(clientMessage).equals("Msg" + this.count);
                clientMessage.acknowledge();
                this.val$latchReceived.countDown();
                int i = this.count;
                this.count = i + 1;
                if (i == 1) {
                    if (!this.val$latchDone.await(5L, TimeUnit.SECONDS)) {
                        new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
                        this.failed = true;
                    }
                    if (this.val$largeMessages) {
                        clientMessage.getBodyBuffer().readBytes(new byte[600]);
                    }
                    this.val$latchRead.countDown();
                }
            } catch (Exception e) {
                e.printStackTrace();
                this.failed = true;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.tests.integration.client.ConsumerWindowSizeTest$2LocalHandler, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest$2LocalHandler.class */
    public class C2LocalHandler implements MessageHandler {
        boolean failed = false;
        int count = 0;
        final /* synthetic */ CountDownLatch val$latchReceived;
        final /* synthetic */ CountDownLatch val$latchReceivedBuffered;
        final /* synthetic */ CountDownLatch val$latchDone;

        C2LocalHandler(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, CountDownLatch countDownLatch3) {
            this.val$latchReceived = countDownLatch;
            this.val$latchReceivedBuffered = countDownLatch2;
            this.val$latchDone = countDownLatch3;
        }

        public synchronized void onMessage(ClientMessage clientMessage) {
            try {
                this.failed = this.failed || !ConsumerWindowSizeTest.this.getTextMessage(clientMessage).equals("Msg" + this.count);
                clientMessage.acknowledge();
                this.val$latchReceived.countDown();
                this.val$latchReceivedBuffered.countDown();
                int i = this.count;
                this.count = i + 1;
                if (i == 1 && !this.val$latchDone.await(5L, TimeUnit.SECONDS)) {
                    new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
                    this.failed = true;
                }
            } catch (Exception e) {
                e.printStackTrace();
                this.failed = true;
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest$FakeOutputStream.class */
    class FakeOutputStream extends OutputStream {
        FakeOutputStream() {
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
        }
    }

    protected boolean isNetty() {
        return false;
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.locator = createFactory(isNetty());
    }

    private int getMessageEncodeSize(SimpleString simpleString) throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator());
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientMessage createMessage = createSession.createMessage(false);
        createMessage.setAddress(simpleString);
        int encodeSize = createMessage.getEncodeSize();
        createSession.close();
        createSessionFactory.close();
        return encodeSize;
    }

    @Test
    public void testReceiveImmediateWithZeroWindow() throws Exception {
        createServer(false, isNetty()).start();
        this.locator.setConsumerWindowSize(0);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(new QueueConfiguration("testWindow"));
        createSession.close();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 5; i++) {
            ClientSession createSession2 = createSessionFactory.createSession();
            ClientConsumer createConsumer = createSession2.createConsumer("testWindow");
            arrayList.add(createConsumer);
            createSession2.start();
            arrayList2.add(createSession2);
            createConsumer.receiveImmediate();
        }
        ClientSession createSession3 = createSessionFactory.createSession(false, false);
        ClientProducer createProducer = createSession3.createProducer("testWindow");
        ClientMessage createMessage = createSession3.createMessage(true);
        createMessage.putStringProperty("hello", "world");
        createProducer.send(createMessage);
        createSession3.commit();
        createSession3.start();
        Assert.assertNotNull(((ClientConsumer) arrayList.get(2)).receive(1000L));
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((ClientSession) it.next()).close();
        }
        createSession3.close();
    }

    @Test
    public void testReceiveImmediateWithZeroWindow2() throws Exception {
        ActiveMQServer createServer = createServer(true);
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        try {
            createServer.start();
            createInVMNonHALocator.setConsumerWindowSize(0);
            ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator);
            ClientSession createSession = createSessionFactory.createSession(false, false, false);
            createSession.createQueue(new QueueConfiguration("testReceive"));
            createSession.close();
            ClientSession createSession2 = createSessionFactory.createSession(false, false);
            ClientMessage createMessage = createSession2.createMessage(true);
            createMessage.putStringProperty("hello", "world");
            ClientProducer createProducer = createSession2.createProducer("testReceive");
            createProducer.send(createMessage);
            createSession2.commit();
            ClientSession createSession3 = createSessionFactory.createSession();
            ClientConsumer createConsumer = createSession3.createConsumer("testReceive");
            createSession3.start();
            Thread.sleep(1000L);
            ClientMessage receiveImmediate = createConsumer.receiveImmediate();
            Assert.assertNotNull(receiveImmediate);
            receiveImmediate.acknowledge();
            createProducer.send(createMessage);
            createSession2.commit();
            ClientMessage receive = createConsumer.receive(10000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
            createSession.close();
            createSession3.close();
            createSession2.close();
            if (createInVMNonHALocator != null) {
                createInVMNonHALocator.close();
            }
        } catch (Throwable th) {
            if (createInVMNonHALocator != null) {
                try {
                    createInVMNonHALocator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testReceiveImmediateWithZeroWindow3() throws Exception {
        createServer(false, isNetty()).start();
        this.locator.setConsumerWindowSize(0);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(new QueueConfiguration("testWindow"));
        createSession.close();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 5; i++) {
            ClientSession createSession2 = createSessionFactory.createSession();
            ClientConsumer createConsumer = createSession2.createConsumer("testWindow");
            arrayList.add(createConsumer);
            createSession2.start();
            arrayList2.add(createSession2);
            createConsumer.receive(10L);
        }
        ClientSession createSession3 = createSessionFactory.createSession(false, false);
        ClientProducer createProducer = createSession3.createProducer("testWindow");
        ClientMessage createMessage = createSession3.createMessage(true);
        createMessage.putStringProperty("hello", "world");
        createProducer.send(createMessage);
        createSession3.commit();
        createSession3.start();
        Assert.assertNotNull(((ClientConsumer) arrayList.get(2)).receive(1000L));
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((ClientSession) it.next()).close();
        }
        createSession3.close();
    }

    @Test
    public void testReceiveImmediateWithZeroWindow4() throws Exception {
        createServer(false, isNetty()).start();
        this.locator.setConsumerWindowSize(0);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(new QueueConfiguration("testWindow"));
        createSession.close();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 5; i++) {
            ClientSession createSession2 = createSessionFactory.createSession();
            ClientConsumer createConsumer = createSession2.createConsumer("testWindow");
            arrayList.add(createConsumer);
            createSession2.start();
            arrayList2.add(createSession2);
            Assert.assertNull(createConsumer.receive(10L));
        }
        ClientSession createSession3 = createSessionFactory.createSession(false, false);
        ClientProducer createProducer = createSession3.createProducer("testWindow");
        ClientMessage createMessage = createSession3.createMessage(true);
        createMessage.putStringProperty("hello", "world");
        createProducer.send(createMessage);
        createSession3.commit();
        createSession3.start();
        Assert.assertNotNull(((ClientConsumer) arrayList.get(2)).receive(5000L));
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((ClientSession) it.next()).close();
        }
        createSession3.close();
    }

    @Test
    public void testMultipleImmediate() throws Exception {
        createServer(false, isNetty()).start();
        this.locator.setConsumerWindowSize(0);
        final ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(new QueueConfiguration("testWindow"));
        createSession.close();
        Thread[] threadArr = new Thread[10];
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = new Thread() { // from class: org.apache.activemq.artemis.tests.integration.client.ConsumerWindowSizeTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        ClientSession createSession2 = createSessionFactory.createSession(false, false);
                        ClientConsumer createConsumer = createSession2.createConsumer("testWindow");
                        createSession2.start();
                        countDownLatch.await(10L, TimeUnit.SECONDS);
                        while (atomicInteger2.incrementAndGet() <= 200) {
                            createConsumer.receive(1000L).acknowledge();
                            createSession2.commit();
                        }
                        atomicInteger2.decrementAndGet();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        atomicInteger.incrementAndGet();
                    }
                }
            };
            threadArr[i].start();
        }
        ClientSession createSession2 = createSessionFactory.createSession(false, false);
        ClientProducer createProducer = createSession2.createProducer("testWindow");
        ClientMessage createMessage = createSession2.createMessage(true);
        createMessage.putStringProperty("hello", "world");
        for (int i2 = 0; i2 < 200; i2++) {
            createProducer.send(createMessage);
            createSession2.commit();
        }
        countDownLatch.countDown();
        for (Thread thread : threadArr) {
            thread.join();
        }
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(200L, atomicInteger2.get());
    }

    @Test
    public void testSingleImmediate() throws Exception {
        createServer(false, isNetty()).start();
        this.locator.setConsumerWindowSize(0);
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, false, false);
        createSession.createQueue(new QueueConfiguration("testWindow"));
        createSession.close();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ClientSession createSession2 = createSessionFactory.createSession(false, false);
        ClientProducer createProducer = createSession2.createProducer("testWindow");
        ClientMessage createMessage = createSession2.createMessage(true);
        createMessage.putStringProperty("hello", "world");
        for (int i = 0; i < 200; i++) {
            createProducer.send(createMessage);
        }
        createSession2.commit();
        ClientSession createSession3 = createSessionFactory.createSession(false, false);
        ClientConsumer createConsumer = createSession3.createConsumer("testWindow");
        createSession3.start();
        while (true) {
            ClientMessage receiveImmediate = createConsumer.receiveImmediate();
            if (receiveImmediate == null) {
                logger.debug("Returning null");
                Assert.assertEquals(200L, atomicInteger.get());
                return;
            } else {
                receiveImmediate.acknowledge();
                createSession3.commit();
                atomicInteger.incrementAndGet();
            }
        }
    }

    @Test
    public void testSendWindowSize() throws Exception {
        ActiveMQServer createServer = createServer(false, isNetty());
        this.locator.setBlockOnNonDurableSend(false);
        createServer.start();
        this.locator.setConsumerWindowSize(100 * getMessageEncodeSize(this.addressA));
        ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
        ClientSession createSession = createSessionFactory.createSession(false, true, true);
        ClientSession createSession2 = createSessionFactory.createSession(false, true, true);
        createSession.createQueue(new QueueConfiguration(this.queueA).setAddress(this.addressA).setDurable(false));
        ClientConsumer createConsumer = createSession2.createConsumer(this.queueA);
        ClientSession createSession3 = createSessionFactory.createSession(false, true, true);
        ClientProducer createProducer = createSession.createProducer(this.addressA);
        ClientConsumer createConsumer2 = createSession3.createConsumer(this.queueA);
        createSession3.start();
        createSession2.start();
        for (int i = 0; i < 100 * 4; i++) {
            createProducer.send(createSession.createMessage(false));
        }
        for (int i2 = 0; i2 < 100 * 2; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession2.close();
        for (int i3 = 0; i3 < 100 * 2; i3++) {
            ClientMessage receive2 = createConsumer2.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
        }
        createSession3.close();
        createSession.close();
        Assert.assertEquals(0L, getMessageCount(createServer, this.queueA.toString()));
    }

    @Test
    public void testSlowConsumerBufferingOne() throws Exception {
        ClientSession clientSession = null;
        ClientSession clientSession2 = null;
        try {
            createServer(false, isNetty()).start();
            this.locator.setConsumerWindowSize(1);
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            ClientSession createSession = createSessionFactory.createSession(false, true, true);
            SimpleString simpleString = this.addressA;
            createSession.createQueue(new QueueConfiguration(simpleString));
            ClientSession createSession2 = createSessionFactory.createSession(false, true, true);
            createSession2.start();
            createSession.start();
            ClientConsumer createConsumer = createSession2.createConsumer(simpleString);
            ClientConsumer createConsumer2 = createSession.createConsumer(simpleString);
            ClientProducer createProducer = createSession.createProducer(simpleString);
            for (int i = 0; i < 100; i++) {
                createProducer.send(createTextMessage(createSession, "Msg" + i));
            }
            for (int i2 = 0; i2 < 99; i2++) {
                ClientMessage receive = createConsumer2.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i2, receive);
                receive.acknowledge();
            }
            ClientMessage receive2 = createConsumer.receive(500L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
            createSession.close();
            clientSession2 = null;
            createSession2.close();
            clientSession = null;
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            if (0 != 0) {
                try {
                    clientSession2.close();
                } catch (Exception e) {
                    return;
                }
            }
            if (0 != 0) {
                clientSession.close();
            }
        } catch (Throwable th) {
            if (clientSession2 != null) {
                try {
                    clientSession2.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            if (clientSession != null) {
                clientSession.close();
            }
            throw th;
        }
    }

    @Test
    public void testSlowConsumerNoBuffer() throws Exception {
        internalTestSlowConsumerNoBuffer(false);
    }

    @Test
    @Ignore("I believe this test became invalid after we started using another thread to deliver the large message")
    public void testSlowConsumerNoBufferLargeMessages() throws Exception {
        internalTestSlowConsumerNoBuffer(true);
    }

    private void internalTestSlowConsumerNoBuffer(boolean z) throws Exception {
        ClientSession clientSession = null;
        ClientSession clientSession2 = null;
        try {
            createServer(false, isNetty()).start();
            this.locator.setConsumerWindowSize(0);
            if (z) {
                this.locator.setMinLargeMessageSize(100);
            }
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            ClientSession createSession = createSessionFactory.createSession(false, true, true);
            SimpleString simpleString = this.addressA;
            createSession.createQueue(new QueueConfiguration(simpleString));
            ClientSession createSession2 = createSessionFactory.createSession(false, true, true);
            createSession2.start();
            createSession.start();
            ClientConsumerInternal createConsumer = createSession2.createConsumer(simpleString);
            ClientProducer createProducer = createSession.createProducer(simpleString);
            Assert.assertNull(createConsumer.receive(1L));
            ClientMessage createTextMessage = createTextMessage(createSession, "This one will expire");
            if (z) {
                createTextMessage.getBodyBuffer().writeBytes(new byte[600]);
            }
            createTextMessage.setExpiration(System.currentTimeMillis() + 100);
            createProducer.send(createTextMessage);
            createProducer.send(createTextMessage(createSession, "First-on-non-buffered"));
            Thread.sleep(110L);
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals("First-on-non-buffered", getTextMessage(receive));
            receive.acknowledge();
            ClientConsumer createConsumer2 = createSession.createConsumer(simpleString);
            for (int i = 0; i < 100; i++) {
                ClientMessage createTextMessage2 = createTextMessage(createSession, "Msg" + i);
                if (z) {
                    createTextMessage2.getBodyBuffer().writeBytes(new byte[600]);
                }
                createProducer.send(createTextMessage2);
            }
            for (int i2 = 0; i2 < 100; i2++) {
                ClientMessage receive2 = createConsumer2.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i2, receive2);
                Assert.assertEquals("Msg" + i2, getTextMessage(receive2));
                receive2.acknowledge();
            }
            Assert.assertEquals(0L, createConsumer.getBufferSize());
            createSession.close();
            clientSession2 = null;
            createSession2.close();
            clientSession = null;
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            if (0 != 0) {
                try {
                    clientSession2.close();
                } catch (Exception e) {
                    return;
                }
            }
            if (0 != 0) {
                clientSession.close();
            }
        } catch (Throwable th) {
            if (clientSession2 != null) {
                try {
                    clientSession2.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            if (clientSession != null) {
                clientSession.close();
            }
            throw th;
        }
    }

    @Test
    public void testSlowConsumerNoBuffer2() throws Exception {
        internalTestSlowConsumerNoBuffer2(false);
    }

    @Test
    public void testSlowConsumerNoBuffer2LargeMessages() throws Exception {
        internalTestSlowConsumerNoBuffer2(true);
    }

    private void internalTestSlowConsumerNoBuffer2(boolean z) throws Exception {
        ClientSession clientSession = null;
        ClientSession clientSession2 = null;
        try {
            createServer(false, isNetty()).start();
            this.locator.setConsumerWindowSize(0);
            if (z) {
                this.locator.setMinLargeMessageSize(100);
            }
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            ClientSession createSession = createSessionFactory.createSession(false, true, true);
            ClientSession createSession2 = createSessionFactory.createSession(false, true, true);
            createSession.start();
            createSession2.start();
            SimpleString simpleString = new SimpleString("some-queue");
            createSession.createQueue(new QueueConfiguration(simpleString));
            ClientConsumerInternal createConsumer = createSession.createConsumer(simpleString);
            ClientProducer createProducer = createSession.createProducer(simpleString);
            for (int i = 0; i < 10; i++) {
                ClientMessage createTextMessage = createTextMessage(createSession, "Msg" + i);
                if (z) {
                    createTextMessage.getBodyBuffer().writeBytes(new byte[600]);
                }
                createProducer.send(createTextMessage);
            }
            ClientConsumerInternal createConsumer2 = createSession2.createConsumer(simpleString);
            for (int i2 = 0; i2 < 5; i2++) {
                ClientMessage receive = createConsumer.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i2, receive);
                Assert.assertEquals("Msg" + i2, getTextMessage(receive));
                receive.acknowledge();
                Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0L, createConsumer.getBufferSize());
            }
            for (int i3 = 5; i3 < 10; i3++) {
                ClientMessage receive2 = createConsumer2.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i3, receive2);
                Assert.assertEquals("Msg" + i3, getTextMessage(receive2));
                receive2.acknowledge();
                Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0L, createConsumer2.getBufferSize());
            }
            createSession.close();
            createSession2.close();
            ClientSession createSession3 = createSessionFactory.createSession(false, true, true);
            createSession3.start();
            ClientSession createSession4 = createSessionFactory.createSession(false, true, true);
            createSession4.start();
            ClientProducer createProducer2 = createSession3.createProducer(simpleString);
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            createConsumer.close();
            createConsumer2.close();
            ClientConsumerInternal createConsumer3 = createSession3.createConsumer(simpleString);
            for (int i4 = 0; i4 < 10; i4++) {
                ClientMessage createTextMessage2 = createTextMessage(createSession3, "Msg" + i4);
                if (z) {
                    createTextMessage2.getBodyBuffer().writeBytes(new byte[600]);
                }
                createProducer2.send(createTextMessage2);
            }
            ClientConsumerInternal createConsumer4 = createSession4.createConsumer(simpleString);
            for (int i5 = 0; i5 < 5; i5++) {
                ClientMessage receive3 = createConsumer4.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i5, receive3);
                Assert.assertEquals("Msg" + i5, receive3.getBodyBuffer().readString());
                receive3.acknowledge();
                Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0L, createConsumer4.getBufferSize());
            }
            for (int i6 = 5; i6 < 10; i6++) {
                ClientMessage receive4 = createConsumer3.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i6, receive4);
                Assert.assertEquals("Msg" + i6, receive4.getBodyBuffer().readString());
                receive4.acknowledge();
                Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0L, createConsumer3.getBufferSize());
            }
            createSession3.close();
            clientSession = null;
            createSession4.close();
            clientSession2 = null;
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            if (0 != 0) {
                try {
                    clientSession.close();
                } catch (Exception e) {
                    return;
                }
            }
            if (0 != 0) {
                clientSession2.close();
            }
        } catch (Throwable th) {
            if (clientSession != null) {
                try {
                    clientSession.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            if (clientSession2 != null) {
                clientSession2.close();
            }
            throw th;
        }
    }

    @Test
    public void testSaveBuffersOnLargeMessage() throws Exception {
        ClientSession clientSession = null;
        try {
            createServer(false, isNetty()).start();
            this.locator.setConsumerWindowSize(0).setMinLargeMessageSize(100);
            ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
            createSession.start();
            SimpleString simpleString = new SimpleString("some-queue");
            createSession.createQueue(new QueueConfiguration(simpleString));
            ClientConsumerInternal createConsumer = createSession.createConsumer(simpleString);
            ClientProducer createProducer = createSession.createProducer(simpleString);
            for (int i = 0; i < 10; i++) {
                ClientMessage createMessage = createSession.createMessage(true);
                createMessage.getBodyBuffer().writeBytes(new byte[600]);
                createProducer.send(createMessage);
            }
            for (int i2 = 0; i2 < 10; i2++) {
                ClientMessage receive = createConsumer.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i2, receive);
                receive.saveToOutputStream(new FakeOutputStream());
                receive.acknowledge();
                Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0L, createConsumer.getBufferSize());
            }
            createSession.close();
            createSession.close();
            clientSession = null;
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            if (0 != 0) {
                try {
                    clientSession.close();
                } catch (Exception e) {
                }
            }
        } catch (Throwable th) {
            if (clientSession != null) {
                try {
                    clientSession.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            throw th;
        }
    }

    @Test
    public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception {
        internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
    }

    @Test
    public void testSlowConsumerOnMessageHandlerNoBuffersLargeMessage() throws Exception {
        internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
    }

    @Test
    public void testFlowControl() throws Exception {
        internalTestFlowControlOnRollback(false);
    }

    @Test
    public void testFlowControlLargeMessage() throws Exception {
        internalTestFlowControlOnRollback(true);
    }

    private void internalTestFlowControlOnRollback(boolean z) throws Exception {
        ActiveMQServer createServer = createServer(false, isNetty());
        createServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxDeliveryAttempts(-1));
        ClientSession clientSession = null;
        try {
            createServer.start();
            this.locator.setConsumerWindowSize(300000);
            if (z) {
                this.locator.setMinLargeMessageSize(100);
            } else {
                this.locator.setMinLargeMessageSize(Integer.MAX_VALUE);
            }
            clientSession = createSessionFactory(this.locator).createSession(false, false, false);
            SimpleString simpleString = new SimpleString("some-queue");
            clientSession.createQueue(new QueueConfiguration(simpleString));
            ClientProducer createProducer = clientSession.createProducer(simpleString);
            for (int i = 0; i < 100; i++) {
                ClientMessage createMessage = clientSession.createMessage(true);
                createMessage.putIntProperty("count", i);
                createMessage.getBodyBuffer().writeBytes(new byte[1024]);
                createProducer.send(createMessage);
            }
            clientSession.commit();
            ClientConsumerInternal createConsumer = clientSession.createConsumer(simpleString);
            clientSession.start();
            for (int i2 = 0; i2 < 100; i2++) {
                long currentTimeMillis = System.currentTimeMillis() + 2000;
                while (currentTimeMillis > System.currentTimeMillis() && createConsumer.getBufferSize() <= 10) {
                    Thread.sleep(10L);
                }
                Assert.assertTrue(createConsumer.getBufferSize() >= 10);
                ClientMessage receive = createConsumer.receive(500L);
                receive.getBodyBuffer().readByte();
                Assert.assertNotNull(receive);
                receive.acknowledge();
                clientSession.rollback();
            }
            for (int i3 = 0; i3 < 100; i3++) {
                ClientMessage receive2 = createConsumer.receive(5000L);
                Assert.assertNotNull(receive2);
                receive2.getBodyBuffer().readByte();
                receive2.acknowledge();
                clientSession.commit();
            }
            if (clientSession != null) {
                try {
                    clientSession.close();
                } catch (Exception e) {
                }
            }
        } catch (Throwable th) {
            if (clientSession != null) {
                try {
                    clientSession.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            throw th;
        }
    }

    public void internalTestSlowConsumerOnMessageHandlerNoBuffers(boolean z) throws Exception {
        ClientSession clientSession = null;
        ClientSession clientSession2 = null;
        try {
            createServer(false, isNetty()).start();
            this.locator.setConsumerWindowSize(0);
            if (z) {
                this.locator.setMinLargeMessageSize(100);
            }
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            ClientSession createSession = createSessionFactory.createSession(false, true, true);
            SimpleString simpleString = new SimpleString("some-queue");
            createSession.createQueue(new QueueConfiguration(simpleString));
            ClientSession createSession2 = createSessionFactory.createSession(false, true, true);
            createSession2.start();
            createSession.start();
            ClientConsumerInternal createConsumer = createSession2.createConsumer(simpleString);
            CountDownLatch countDownLatch = new CountDownLatch(2);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            C1LocalHandler c1LocalHandler = new C1LocalHandler(countDownLatch, countDownLatch2, z, countDownLatch3);
            ClientConsumer createConsumer2 = createSession.createConsumer(simpleString);
            ClientProducer createProducer = createSession.createProducer(simpleString);
            for (int i = 0; i < 100; i++) {
                ClientMessage createTextMessage = createTextMessage(createSession, "Msg" + i);
                if (z) {
                    createTextMessage.getBodyBuffer().writeBytes(new byte[600]);
                }
                createProducer.send(createTextMessage);
            }
            createConsumer.setMessageHandler(c1LocalHandler);
            Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
            Assert.assertEquals(0L, createConsumer.getBufferSize());
            for (int i2 = 2; i2 < 100; i2++) {
                ClientMessage receive = createConsumer2.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i2, receive);
                Assert.assertEquals("Msg" + i2, getTextMessage(receive));
                receive.acknowledge();
            }
            Assert.assertEquals(0L, createConsumer.getBufferSize());
            countDownLatch2.countDown();
            Assert.assertTrue(countDownLatch3.await(10L, TimeUnit.SECONDS));
            createSession.close();
            clientSession2 = null;
            createSession2.close();
            clientSession = null;
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            Assert.assertFalse("MessageHandler received a failure", c1LocalHandler.failed);
            if (0 != 0) {
                try {
                    clientSession2.close();
                } catch (Exception e) {
                    return;
                }
            }
            if (0 != 0) {
                clientSession.close();
            }
        } catch (Throwable th) {
            if (clientSession2 != null) {
                try {
                    clientSession2.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            if (clientSession != null) {
                clientSession.close();
            }
            throw th;
        }
    }

    @Test
    public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception {
        internalTestSlowConsumerOnMessageHandlerBufferOne(false);
    }

    private void internalTestSlowConsumerOnMessageHandlerBufferOne(boolean z) throws Exception {
        ClientSession clientSession = null;
        ClientSession clientSession2 = null;
        try {
            createServer(false, isNetty()).start();
            this.locator.setConsumerWindowSize(1);
            if (z) {
                this.locator.setMinLargeMessageSize(100);
            }
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            ClientSession createSession = createSessionFactory.createSession(false, true, true);
            SimpleString simpleString = new SimpleString("some-queue");
            createSession.createQueue(new QueueConfiguration(simpleString));
            ClientSession createSession2 = createSessionFactory.createSession(false, true, true);
            createSession2.start();
            createSession.start();
            ClientConsumerInternal createConsumer = createSession2.createConsumer(simpleString);
            CountDownLatch countDownLatch = new CountDownLatch(2);
            CountDownLatch countDownLatch2 = new CountDownLatch(3);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            C2LocalHandler c2LocalHandler = new C2LocalHandler(countDownLatch, countDownLatch2, countDownLatch3);
            ClientProducer createProducer = createSession.createProducer(simpleString);
            for (int i = 0; i < 100; i++) {
                ClientMessage createTextMessage = createTextMessage(createSession, "Msg" + i);
                if (z) {
                    createTextMessage.getBodyBuffer().writeBytes(new byte[600]);
                }
                createProducer.send(createTextMessage);
            }
            createConsumer.setMessageHandler(c2LocalHandler);
            Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
            long currentTimeMillis = System.currentTimeMillis() + 5000;
            while (createConsumer.getBufferSize() == 0 && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
            }
            Assert.assertEquals(1L, createConsumer.getBufferSize());
            ClientConsumer createConsumer2 = createSession.createConsumer(simpleString);
            for (int i2 = 3; i2 < 100; i2++) {
                ClientMessage receive = createConsumer2.receive(1000L);
                Assert.assertNotNull("expected message at i = " + i2, receive);
                Assert.assertEquals("Msg" + i2, getTextMessage(receive));
                receive.acknowledge();
            }
            countDownLatch3.countDown();
            Assert.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
            createSession.close();
            clientSession2 = null;
            createSession2.close();
            clientSession = null;
            Assert.assertEquals(0L, getMessageCount(r0, simpleString.toString()));
            Assert.assertFalse("MessageHandler received a failure", c2LocalHandler.failed);
            if (0 != 0) {
                try {
                    clientSession2.close();
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
            if (0 != 0) {
                clientSession.close();
            }
        } catch (Throwable th) {
            if (clientSession2 != null) {
                try {
                    clientSession2.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                    throw th;
                }
            }
            if (clientSession != null) {
                clientSession.close();
            }
            throw th;
        }
    }

    @Test
    public void testNoWindowRoundRobin() throws Exception {
        testNoWindowRoundRobin(false);
    }

    private void testNoWindowRoundRobin(boolean z) throws Exception {
        boolean z2;
        boolean z3;
        ActiveMQServer createServer = createServer(false, isNetty());
        ClientSession clientSession = null;
        ClientSession clientSession2 = null;
        try {
            createServer.start();
            this.locator.setConsumerWindowSize(-1);
            if (z) {
                this.locator.setMinLargeMessageSize(100);
            }
            ClientSessionFactory createSessionFactory = createSessionFactory(this.locator);
            clientSession = createSessionFactory.createSession(false, true, true);
            SimpleString simpleString = new SimpleString("some-queue");
            clientSession.createQueue(new QueueConfiguration(simpleString));
            clientSession2 = createSessionFactory.createSession(false, true, true);
            clientSession.start();
            clientSession2.start();
            ClientConsumerInternal createConsumer = clientSession.createConsumer(simpleString);
            ClientConsumerInternal createConsumer2 = clientSession2.createConsumer(simpleString);
            Bindings bindingsForAddress = createServer.getPostOffice().getBindingsForAddress(simpleString);
            Assert.assertEquals(1L, bindingsForAddress.getBindings().size());
            Iterator it = bindingsForAddress.getBindings().iterator();
            while (it.hasNext()) {
                for (ServerConsumerImpl serverConsumerImpl : ((Binding) it.next()).getQueue().getConsumers()) {
                    long currentTimeMillis = System.currentTimeMillis() + 5000;
                    while (currentTimeMillis > System.currentTimeMillis() && serverConsumerImpl.getAvailableCredits() != null) {
                        Thread.sleep(10L);
                    }
                    Assert.assertNull(serverConsumerImpl.getAvailableCredits());
                }
            }
            ClientProducer createProducer = clientSession.createProducer(simpleString);
            for (int i = 0; i < 100; i++) {
                ClientMessage createTextMessage = createTextMessage(clientSession, "Msg" + i);
                if (z) {
                    createTextMessage.getBodyBuffer().writeBytes(new byte[600]);
                }
                createProducer.send(createTextMessage);
            }
            long currentTimeMillis2 = System.currentTimeMillis() + 5000;
            do {
                z2 = createConsumer.getBufferSize() == 50;
                z3 = createConsumer2.getBufferSize() == 50;
                Thread.sleep(10L);
                if (z2 && z3) {
                    break;
                }
            } while (System.currentTimeMillis() < currentTimeMillis2);
            Assert.assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + createConsumer.getBufferSize() + ", consB=" + createConsumer2.getBufferSize() + ") foundA = " + z2 + " foundB = " + z3, z2);
            Assert.assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + createConsumer.getBufferSize() + ", consB=" + createConsumer2.getBufferSize() + ") foundA = " + z2 + " foundB = " + z3, z3);
            if (clientSession != null) {
                try {
                    clientSession.close();
                } catch (Exception e) {
                    return;
                }
            }
            if (clientSession2 != null) {
                clientSession2.close();
            }
        } catch (Throwable th) {
            if (clientSession != null) {
                try {
                    clientSession.close();
                } catch (Exception e2) {
                    throw th;
                }
            }
            if (clientSession2 != null) {
                clientSession2.close();
            }
            throw th;
        }
    }

    @Test
    public void testDefaultConsumerWindowSize() throws Exception {
        ActiveMQServer createServer = createServer(false, isNetty());
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.queueA).setRoutingType(RoutingType.ANYCAST));
        createSessionFactory(this.locator).createSession(false, true, true).createConsumer(this.queueA).start();
        assertEquals(524288L, r0.getClientWindowSize());
    }

    @Test
    public void testConsumerWindowSizeAddressSettings() throws Exception {
        ActiveMQServer createServer = createServer(false, isNetty());
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setDefaultConsumerWindowSize(5120);
        createServer.getConfiguration().getAddressSettings().put(this.queueA.toString(), addressSettings);
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.queueA).setRoutingType(RoutingType.ANYCAST));
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        ClientConsumerImpl createConsumer = createSession.createConsumer(this.queueA);
        createSession.start();
        assertEquals(2560L, createConsumer.getClientWindowSize());
    }

    @Test
    public void testConsumerWindowSizeAddressSettingsDifferentAddressAndQueueName() throws Exception {
        ActiveMQServer createServer = createServer(false, isNetty());
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setDefaultConsumerWindowSize(5120);
        createServer.getConfiguration().getAddressSettings().put(this.addressA.toString(), addressSettings);
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.queueA).setAddress(this.addressA).setRoutingType(RoutingType.ANYCAST));
        ClientSessionImpl createSession = createSessionFactory(this.locator).createSession(false, true, true);
        ClientConsumerImpl createConsumer = createSession.createConsumer(this.queueA);
        createSession.start();
        assertEquals(2560L, createConsumer.getClientWindowSize());
        ServerConsumerImpl locateConsumer = createServer.getSessionByID(createSession.getName()).locateConsumer(createConsumer.getConsumerContext().getId());
        assertTrue(Wait.waitFor(() -> {
            return locateConsumer.getAvailableCredits().get() == createConsumer.getClientWindowSize() * 2;
        }, 5000L, 50L));
    }

    @Test
    public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
        ActiveMQServer createServer = createServer(false, isNetty());
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setDefaultConsumerWindowSize(5120);
        createServer.getConfiguration().getAddressSettings().put("#", addressSettings);
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.queueA).setRoutingType(RoutingType.ANYCAST));
        ClientSession createSession = createSessionFactory(this.locator).createSession(false, true, true);
        ClientConsumerImpl createConsumer = createSession.createConsumer(this.queueA);
        ClientConsumerImpl createConsumer2 = createSession.createConsumer(this.queueA);
        createSession.start();
        assertEquals(2560L, createConsumer.getClientWindowSize());
        assertEquals(2560L, createConsumer2.getClientWindowSize());
    }
}
