package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.class */
public class MessageRedistributionTest extends ClusterTestBase {
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        start();
    }

    private void start() throws Exception {
        setupServers();
        setRedistributionDelay(0L);
    }

    protected boolean isNetty() {
        return false;
    }

    @Test
    public void testRedistributionWithMessageGroups() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        log.info("Doing test");
        getServer(0).getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("handler")).setType(GroupingHandlerConfiguration.TYPE.LOCAL).setAddress(new SimpleString("queues")));
        getServer(1).getConfiguration().setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString("handler")).setType(GroupingHandlerConfiguration.TYPE.REMOTE).setAddress(new SimpleString("queues")));
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 0, false);
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("grp1"));
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 1, false);
        send(0, "queues.testaddress", 10, false, null);
        for (int i = 0; i < 5; i++) {
            ClientMessage receive = getConsumer(1).receive(1000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
            Assert.assertNotNull(receive.getSimpleStringProperty(Message.HDR_GROUP_ID));
        }
        for (int i2 = 0; i2 < 5; i2++) {
            ClientMessage receive2 = getConsumer(0).receive(5000L);
            Assert.assertNotNull("" + i2, receive2);
            receive2.acknowledge();
            Assert.assertNull(receive2.getSimpleStringProperty(Message.HDR_GROUP_ID));
        }
        Assert.assertNull(getConsumer(0).receiveImmediate());
        waitForMessages(1, "queues.testaddress", 15);
        removeConsumer(1);
        for (int i3 = 0; i3 < 5; i3++) {
            ClientMessage receive3 = getConsumer(0).receive(5000L);
            if (receive3 == null) {
                System.out.println();
            }
            Assert.assertNotNull("" + i3, receive3);
            receive3.acknowledge();
            Assert.assertNull(receive3.getSimpleStringProperty(Message.HDR_GROUP_ID));
        }
        Assert.assertNull(getConsumer(0).receiveImmediate());
        removeConsumer(0);
        addConsumer(1, 1, "queue0", null);
        for (int i4 = 0; i4 < 5; i4++) {
            ClientMessage receive4 = getConsumer(1).receive(1000L);
            Assert.assertNotNull(receive4);
            receive4.acknowledge();
            Assert.assertNotNull(receive4.getSimpleStringProperty(Message.HDR_GROUP_ID));
        }
        log.info("Test done");
    }

    @Test
    public void testRedistributionStopsWhenConsumerAdded() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        log.info("Doing test");
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 2000, false, null);
        removeConsumer(0);
        addConsumer(0, 0, "queue0", null);
        Assert.assertFalse(this.servers[0].getPostOffice().getBinding(new SimpleString("queue0")).getBindable().debug().contains(Redistributor.class.getName()));
        log.info("Test done");
    }

    @Test
    public void testRedistributionWhenConsumerIsClosed() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        log.info("Doing test");
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        send(0, "queues.testaddress", 20, false, null);
        getReceivedOrder(0);
        int[] receivedOrder = getReceivedOrder(1);
        getReceivedOrder(2);
        removeConsumer(1);
        verifyReceiveRoundRobinInSomeOrderWithCounts(false, receivedOrder, 0, 2);
        log.info("Test done");
    }

    @Test
    public void testRedistributionWhenConsumerIsClosedDifferentQueues() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue1", null, true);
        createQueue(2, "queues.testaddress", "queue2", null, true);
        ClientSession createSession = this.sfs[0].createSession();
        ClientConsumer createConsumer = createSession.createConsumer("queue0");
        ClientSession createSession2 = this.sfs[1].createSession();
        ClientConsumer createConsumer2 = createSession2.createConsumer("queue1");
        ClientSession createSession3 = this.sfs[2].createSession();
        ClientConsumer createConsumer3 = createSession3.createConsumer("queue2");
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        for (int i = 0; i < 1000; i++) {
            createProducer.send(createSession.createMessage(true).putIntProperty("count", i));
        }
        createSession.start();
        createSession2.start();
        createSession3.start();
        for (int i2 = 0; i2 < 1000; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
            Assert.assertEquals(i2, receive.getIntProperty("count").intValue());
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        createConsumer2.close();
        Thread.sleep(500L);
        for (int i3 = 0; i3 < 1000; i3++) {
            ClientMessage receive2 = createConsumer3.receive(5000L);
            Assert.assertNotNull(receive2);
            receive2.acknowledge();
            Assert.assertEquals(i3, receive2.getIntProperty("count").intValue());
        }
        Assert.assertNull(createConsumer3.receiveImmediate());
        Assert.assertNull(createConsumer.receiveImmediate());
        ClientConsumer createConsumer4 = createSession2.createConsumer("queue1");
        for (int i4 = 0; i4 < 1000; i4++) {
            ClientMessage receive3 = createConsumer4.receive(5000L);
            Assert.assertNotNull(receive3);
            receive3.acknowledge();
            Assert.assertEquals(i4, receive3.getIntProperty("count").intValue());
        }
        Assert.assertNull(createConsumer.receiveImmediate());
        Assert.assertNull(createConsumer4.receiveImmediate());
        Assert.assertNull(createConsumer3.receiveImmediate());
        log.info("Test done");
    }

    @Test
    public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 20, false, null);
        int[] receivedOrder = getReceivedOrder(1);
        getReceivedOrder(2);
        removeConsumer(1);
        verifyReceiveRoundRobinInSomeOrderWithCounts(false, receivedOrder, 2);
    }

    @Test
    public void testNoRedistributionWhenConsumerIsClosedForwardWhenNoConsumersTrue() throws Exception {
        setupCluster(MessageLoadBalancingType.STRICT);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        send(0, "queues.testaddress", 20, false, null);
        removeConsumer(1);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        verifyReceiveRoundRobinInSomeOrder(20, 0, 1, 2);
    }

    @Test
    public void testNoRedistributionWhenConsumerIsClosedNoConsumersOnOtherNodes() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 20, false, null);
        removeConsumer(1);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 0, false);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        verifyReceiveAll(20, 1);
    }

    @Test
    public void testRedistributeWithScheduling() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        AddressSettings redeliveryDelay = new AddressSettings().setRedeliveryDelay(10000L);
        this.servers[0].getAddressSettingsRepository().addMatch("queues.testaddress", redeliveryDelay);
        this.servers[0].getAddressSettingsRepository().addMatch("queue0", redeliveryDelay);
        this.servers[1].getAddressSettingsRepository().addMatch("queue0", redeliveryDelay);
        this.servers[1].getAddressSettingsRepository().addMatch("queues.testaddress", redeliveryDelay);
        startServers(0);
        setupSessionFactory(0, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        ClientSession createSession = this.sfs[0].createSession(false, false, false);
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("key", i);
            byte[] bArr = new byte[24];
            ByteBuffer.wrap(bArr).putLong(i);
            createMessage.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bArr);
            createProducer.send(createMessage);
            createSession.commit();
        }
        createSession.close();
        ClientSession createSession2 = this.sfs[0].createSession(true, false, false);
        ClientConsumer createConsumer = createSession2.createConsumer("queue0");
        createSession2.start();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            XidImpl newXID = newXID();
            createSession2.start(newXID, 0);
            createConsumer.receive(5000L).acknowledge();
            createSession2.end(newXID, 67108864);
            createSession2.prepare(newXID);
            arrayList.add(newXID);
        }
        createSession2.close();
        this.sfs[0].close();
        this.sfs[0] = null;
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        ClientSession createSession3 = this.sfs[1].createSession(false, false);
        createSession3.start();
        ClientConsumer createConsumer2 = createSession3.createConsumer("queue0");
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        ClientSession createSession4 = this.sfs[0].createSession(true, false, false);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            createSession4.rollback((Xid) it.next());
        }
        for (int i3 = 0; i3 < 100; i3++) {
            ClientMessage receive = createConsumer2.receive(15000L);
            Assert.assertNotNull(receive);
            receive.acknowledge();
        }
        createSession3.commit();
    }

    @Test
    public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", "giraffe", false);
        createQueue(1, "queues.testaddress", "queue0", "platypus", false);
        createQueue(2, "queues.testaddress", "queue0", "giraffe", false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        send(0, "queues.testaddress", 20, false, "giraffe");
        int[] receivedOrder = getReceivedOrder(0);
        getReceivedOrder(1);
        getReceivedOrder(2);
        removeConsumer(0);
        verifyReceiveRoundRobinInSomeOrderWithCounts(false, receivedOrder, 2);
    }

    @Test
    public void testRedistributionWhenConsumerIsClosedConsumersWithFilters() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", "giraffe");
        addConsumer(1, 1, "queue0", "platypus");
        addConsumer(2, 2, "queue0", "giraffe");
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        send(0, "queues.testaddress", 20, false, "giraffe");
        int[] receivedOrder = getReceivedOrder(0);
        getReceivedOrder(1);
        getReceivedOrder(2);
        removeConsumer(0);
        verifyReceiveRoundRobinInSomeOrderWithCounts(false, receivedOrder, 2);
    }

    @Test
    public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 20, false, null);
        removeConsumer(0);
        addConsumer(1, 1, "queue0", null);
        verifyReceiveAll(20, 1);
        verifyNotReceive(1);
    }

    @Test
    public void testBackAndForth() throws Exception {
        for (int i = 0; i < 10; i++) {
            setupCluster(MessageLoadBalancingType.ON_DEMAND);
            startServers(0, 1, 2);
            setupSessionFactory(0, isNetty());
            setupSessionFactory(1, isNetty());
            setupSessionFactory(2, isNetty());
            createQueue(0, "queues.testaddress", "queue0", null, false);
            createQueue(1, "queues.testaddress", "queue0", null, false);
            createQueue(2, "queues.testaddress", "queue0", null, false);
            addConsumer(0, 0, "queue0", null);
            waitForBindings(0, "queues.testaddress", 1, 1, true);
            waitForBindings(1, "queues.testaddress", 1, 0, true);
            waitForBindings(2, "queues.testaddress", 1, 0, true);
            waitForBindings(0, "queues.testaddress", 2, 0, false);
            waitForBindings(1, "queues.testaddress", 2, 1, false);
            waitForBindings(2, "queues.testaddress", 2, 1, false);
            send(0, "queues.testaddress", 20, false, null);
            waitForMessages(0, "queues.testaddress", 20);
            removeConsumer(0);
            waitForBindings(0, "queues.testaddress", 1, 0, true);
            waitForBindings(1, "queues.testaddress", 1, 0, true);
            waitForBindings(2, "queues.testaddress", 1, 0, true);
            waitForBindings(0, "queues.testaddress", 2, 0, false);
            waitForBindings(1, "queues.testaddress", 2, 0, false);
            waitForBindings(2, "queues.testaddress", 2, 0, false);
            addConsumer(1, 1, "queue0", null);
            waitForBindings(0, "queues.testaddress", 1, 0, true);
            waitForBindings(1, "queues.testaddress", 1, 1, true);
            waitForBindings(2, "queues.testaddress", 1, 0, true);
            waitForMessages(1, "queues.testaddress", 20);
            waitForMessages(0, "queues.testaddress", 0);
            waitForBindings(0, "queues.testaddress", 2, 1, false);
            waitForBindings(1, "queues.testaddress", 2, 0, false);
            waitForBindings(2, "queues.testaddress", 2, 1, false);
            removeConsumer(1);
            waitForBindings(0, "queues.testaddress", 1, 0, true);
            waitForBindings(1, "queues.testaddress", 1, 0, true);
            waitForBindings(2, "queues.testaddress", 1, 0, true);
            waitForBindings(0, "queues.testaddress", 2, 0, false);
            waitForBindings(1, "queues.testaddress", 2, 0, false);
            waitForBindings(2, "queues.testaddress", 2, 0, false);
            addConsumer(0, 0, "queue0", null);
            waitForBindings(0, "queues.testaddress", 1, 1, true);
            waitForBindings(1, "queues.testaddress", 1, 0, true);
            waitForBindings(2, "queues.testaddress", 1, 0, true);
            waitForBindings(0, "queues.testaddress", 2, 0, false);
            waitForBindings(1, "queues.testaddress", 2, 1, false);
            waitForBindings(2, "queues.testaddress", 2, 1, false);
            waitForMessages(0, "queues.testaddress", 20);
            verifyReceiveAll(20, 0);
            verifyNotReceive(0);
            addConsumer(1, 1, "queue0", null);
            verifyNotReceive(1);
            removeConsumer(1);
            stopServers();
            start();
        }
    }

    @Test
    public void testBackAndForth2WithDuplicDetection() throws Exception {
        internalTestBackAndForth2(true);
    }

    @Test
    public void testBackAndForth2() throws Exception {
        internalTestBackAndForth2(false);
    }

    public void internalTestBackAndForth2(boolean z) throws Exception {
        AtomicInteger atomicInteger = z ? new AtomicInteger(0) : null;
        for (int i = 0; i < 10; i++) {
            setupCluster(MessageLoadBalancingType.ON_DEMAND);
            startServers(0, 1);
            setupSessionFactory(0, isNetty());
            setupSessionFactory(1, isNetty());
            createQueue(0, "queues.testaddress", "queue0", null, false);
            createQueue(1, "queues.testaddress", "queue0", null, false);
            addConsumer(0, 0, "queue0", null);
            waitForBindings(0, "queues.testaddress", 1, 1, true);
            waitForBindings(1, "queues.testaddress", 1, 0, true);
            waitForBindings(0, "queues.testaddress", 1, 0, false);
            waitForBindings(1, "queues.testaddress", 1, 1, false);
            send(1, "queues.testaddress", 20, false, null, atomicInteger);
            waitForMessages(0, "queues.testaddress", 20);
            removeConsumer(0);
            waitForBindings(0, "queues.testaddress", 1, 0, true);
            waitForBindings(1, "queues.testaddress", 1, 0, true);
            waitForBindings(0, "queues.testaddress", 1, 0, false);
            waitForBindings(1, "queues.testaddress", 1, 0, false);
            addConsumer(1, 1, "queue0", null);
            waitForMessages(1, "queues.testaddress", 20);
            waitForMessages(0, "queues.testaddress", 0);
            waitForBindings(0, "queues.testaddress", 1, 1, false);
            waitForBindings(1, "queues.testaddress", 1, 0, false);
            removeConsumer(1);
            addConsumer(0, 0, "queue0", null);
            waitForMessages(1, "queues.testaddress", 0);
            waitForMessages(0, "queues.testaddress", 20);
            removeConsumer(0);
            addConsumer(1, 1, "queue0", null);
            waitForMessages(1, "queues.testaddress", 20);
            waitForMessages(0, "queues.testaddress", 0);
            verifyReceiveAll(20, 1);
            stopServers();
            start();
        }
    }

    @Test
    public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(0, "queues.testaddress", 0, 10, false, "giraffe");
        sendInRange(0, "queues.testaddress", 10, 20, false, "platypus");
        removeConsumer(0);
        addConsumer(1, 1, "queue0", "giraffe");
        addConsumer(2, 2, "queue0", "platypus");
        verifyReceiveAllInRange(0, 10, 1);
        verifyReceiveAllInRange(10, 20, 2);
    }

    @Test
    public void testDelayedRedistribution() throws Exception {
        setRedistributionDelay(1000L);
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 20, false, null);
        long currentTimeMillis = System.currentTimeMillis();
        removeConsumer(0);
        addConsumer(1, 1, "queue0", null);
        verifyReceiveAllNotBefore(currentTimeMillis + 1000, 20, 1);
    }

    @Test
    public void testDelayedRedistributionCancelled() throws Exception {
        setRedistributionDelay(1000L);
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 20, false, null);
        removeConsumer(0);
        addConsumer(1, 1, "queue0", null);
        Thread.sleep(500L);
        addConsumer(0, 0, "queue0", null);
        Thread.sleep(1000L);
        verifyReceiveAll(20, 0);
    }

    @Test
    public void testRedistributionNumberOfMessagesGreaterThanBatchSize() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        send(0, "queues.testaddress", 200, false, null);
        removeConsumer(0);
        addConsumer(1, 1, "queue0", null);
        verifyReceiveAll(200, 1);
    }

    @Test
    public void testRedistributionWhenNewNodeIsAddedWithConsumer() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        startServers(0);
        setupSessionFactory(0, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        send(0, "queues.testaddress", 20, false, null);
        startServers(1);
        setupSessionFactory(1, isNetty());
        createQueue(1, "queues.testaddress", "queue0", null, false);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 1, 0, false);
        addConsumer(0, 1, "queue0", null);
        verifyReceiveAll(20, 0);
        verifyNotReceive(0);
    }

    @Test
    public void testRedistributionWithPagingOnTarget() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);
        AddressSettings maxSizeBytes = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(10000L).setMaxSizeBytes(20000L);
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", maxSizeBytes);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", maxSizeBytes);
        getServer(2).getAddressSettingsRepository().addMatch("queues.*", maxSizeBytes);
        startServers(0);
        startServers(1);
        waitForTopology(getServer(0), 2);
        waitForTopology(getServer(1), 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 1, 0, false);
        getServer(0).getPagingManager().getPageStore(new SimpleString("queues.testaddress")).startPaging();
        ClientSession createSession = this.sfs[0].createSession(true, true, 0);
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        ClientConsumer createConsumer = createSession.createConsumer("queue0");
        createSession.start();
        ClientSession createSession2 = this.sfs[1].createSession(true, true, 0);
        ClientConsumer createConsumer2 = createSession2.createConsumer("queue0");
        createSession2.start();
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("i", i);
            createProducer.send(createMessage);
            createProducer.send(createMessage);
            Assert.assertNotNull(createConsumer.receive(5000L));
            Assert.assertEquals(i, r0.getIntProperty("i").intValue());
            ClientMessage receive = createConsumer2.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(i, receive.getIntProperty("i").intValue());
            receive.acknowledge();
        }
        createSession.close();
        createSession2.close();
    }

    protected void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
    }

    protected void setRedistributionDelay(long j) {
        AddressSettings redistributionDelay = new AddressSettings().setRedistributionDelay(j);
        getServer(0).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
        getServer(1).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
        getServer(2).getAddressSettingsRepository().addMatch("queues.*", redistributionDelay);
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        stopServers(0, 1, 2);
        clearServer(0, 1, 2);
    }
}
