package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.UnproposalListener;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.persistence.XmlImportExportTest;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.class */
public class ClusteredGroupingTest extends ClusterTestBase {

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest$ThreadSender.class */
    class ThreadSender implements Runnable {
        private final int msgStart;
        private final int msgEnd;
        private final SimpleString id;
        private final CountDownLatch latch;
        private final boolean wait;
        private final int node;

        public ThreadSender(int i, int i2, int i3, SimpleString simpleString, CountDownLatch countDownLatch, boolean z) {
            this.msgStart = i;
            this.msgEnd = i2;
            this.node = i3;
            this.id = simpleString;
            this.latch = countDownLatch;
            this.wait = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.wait) {
                try {
                    this.latch.await(5L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                this.latch.countDown();
            }
            try {
                ClusteredGroupingTest.this.sendInRange(this.node, "queues.testaddress", this.msgStart, this.msgEnd, false, Message.HDR_GROUP_ID, this.id);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    @Test
    public void testGroupingGroupTimeoutWithUnproposal() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 2000, 1000L, 100L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 2000, 1000L, 100L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 2000, 1000L, 100L);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 0, false);
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        getServer(1).getManagementService().addNotificationListener(new NotificationListener() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.1
            public void onNotification(Notification notification) {
                if ((notification.getType() instanceof CoreNotificationType) && notification.getType() == CoreNotificationType.UNPROPOSAL) {
                    countDownLatch.countDown();
                }
            }
        });
        getServer(2).getManagementService().addNotificationListener(new NotificationListener() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.2
            public void onNotification(Notification notification) {
                if ((notification.getType() instanceof CoreNotificationType) && notification.getType() == CoreNotificationType.UNPROPOSAL) {
                    countDownLatch.countDown();
                }
            }
        });
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAll(10, 0);
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        verifyReceiveAll(10, 0);
        QueueImpl locateQueue = this.servers[2].locateQueue(SimpleString.toSimpleString("queue0"));
        assertEquals(2L, locateQueue.getGroupsUsed().size());
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (currentTimeMillis > System.currentTimeMillis() && locateQueue.getGroupsUsed().size() != 0) {
            Thread.sleep(10L);
        }
        assertEquals("Unproposal should cleanup the queue group as well", 0L, locateQueue.getGroupsUsed().size());
        removeConsumer(0);
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAll(10, 0);
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        verifyReceiveAll(10, 0);
    }

    @Test
    public void testGroupingGroupTimeoutSendRemote() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, -1, 2000L, 500L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 0, false);
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        getServer(1).getManagementService().addNotificationListener(new NotificationListener() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.3
            public void onNotification(Notification notification) {
                if ((notification.getType() instanceof CoreNotificationType) && notification.getType() == CoreNotificationType.UNPROPOSAL) {
                    countDownLatch.countDown();
                }
            }
        });
        getServer(2).getManagementService().addNotificationListener(new NotificationListener() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.4
            public void onNotification(Notification notification) {
                if ((notification.getType() instanceof CoreNotificationType) && notification.getType() == CoreNotificationType.UNPROPOSAL) {
                    countDownLatch.countDown();
                }
            }
        });
        sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAll(10, 0);
        sendWithProperty(1, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        verifyReceiveAll(10, 0);
        removeConsumer(0);
        assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        addConsumer(0, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAll(10, 0);
        sendWithProperty(1, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        verifyReceiveAll(10, 0);
    }

    @Test
    public void testGroupingSimple() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAll(10, 0);
    }

    @Test
    public void testGroupingSimpleFail2nd() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, XmlImportExportTest.CONSUMER_TIMEOUT, -1L, -1L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, XmlImportExportTest.CONSUMER_TIMEOUT, -1L, -1L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, XmlImportExportTest.CONSUMER_TIMEOUT, -1L, -1L);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        ClientMessage receive = this.consumers[0].getConsumer().receive(1000L);
        assertNotNull(receive);
        receive.acknowledge();
        assertNull(this.consumers[0].getConsumer().receiveImmediate());
        ClientMessage receive2 = this.consumers[1].getConsumer().receive(1000L);
        assertNotNull(receive2);
        receive2.acknowledge();
        SimpleString simpleStringProperty = receive2.getSimpleStringProperty(Message.HDR_GROUP_ID);
        assertNull(this.consumers[1].getConsumer().receiveImmediate());
        ClientMessage receive3 = this.consumers[2].getConsumer().receive(1000L);
        assertNotNull(receive3);
        receive3.acknowledge();
        assertNull(this.consumers[2].getConsumer().receiveImmediate());
        sendWithProperty(2, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, simpleStringProperty);
        ClientMessage receive4 = this.consumers[1].getConsumer().receive(1000L);
        assertNotNull(receive4);
        receive4.acknowledge();
        closeAllConsumers();
        closeAllSessionFactories();
        SimpleString nodeID = this.servers[1].getNodeID();
        assertTrue(this.servers[0].getGroupingHandler().getProposal(simpleStringProperty.concat(".").concat("queue0"), false).getClusterName().toString().equals("queue0" + nodeID));
        stopServers(0, 1, 2);
        long currentTimeMillis = System.currentTimeMillis();
        startServers(2, 0);
        assertTrue("The group start should have waited the timeout on groups", System.currentTimeMillis() >= currentTimeMillis + 5000);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(2, isNetty());
        addConsumer(0, 0, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(2, "queues.testaddress", 1, 1, false);
        sendWithProperty(2, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, simpleStringProperty);
        ClientMessage receive5 = this.consumers[0].getConsumer().receive(500L);
        if (receive5 == null) {
            receive5 = this.consumers[2].getConsumer().receive(500L);
        }
        assertFalse("group should have been reassigned since server is not up yet", this.servers[0].getGroupingHandler().getProposal(simpleStringProperty.concat(".").concat("queue0"), false).getClusterName().toString().equals("queue0" + nodeID));
        assertNotNull(receive5);
        receive5.acknowledge();
    }

    @Test
    public void testGroupTimeout() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 1000, 1000L, 100L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1000, 100L, 100L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1000, 100L, 100L);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        assertNotNull(this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false));
        Thread.sleep(1000L);
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (currentTimeMillis > System.currentTimeMillis() && this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false) != null) {
            Thread.sleep(10L);
        }
        Thread.sleep(1000L);
        assertNull("Group should have timed out", this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(1, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        assertNotNull(this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false));
        assertNotNull(this.servers[1].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false));
        long currentTimeMillis2 = System.currentTimeMillis() + 1500;
        while (currentTimeMillis2 > System.currentTimeMillis() && this.servers[1].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), true) != null) {
            assertNotNull(this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false));
            Thread.sleep(10L);
        }
        Thread.sleep(1000L);
        long currentTimeMillis3 = System.currentTimeMillis() + 5000;
        while (currentTimeMillis3 > System.currentTimeMillis() && this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false) != null) {
            Thread.sleep(10L);
        }
        assertNull("Group should have timed out", this.servers[0].getGroupingHandler().getProposal(SimpleString.toSimpleString("id1.queue0"), false));
    }

    @Test
    public void testGroupingWith3Nodes() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 10000, 500L, 750L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 10000, 500L, 750L);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 10000, 500L, 750L);
        startServers(0, 1, 2);
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
        this.servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
        this.servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
        this.servers[2].getAddressSettingsRepository().addMatch("#", addressSettings);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty(), 15);
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 0, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 0, false);
        final ClientSessionFactory clientSessionFactory = this.sfs[0];
        final ClientSessionFactory clientSessionFactory2 = this.sfs[1];
        final ClientSessionFactory clientSessionFactory3 = this.sfs[2];
        ClientSession addClientSession = addClientSession(clientSessionFactory2.createSession(false, false, false));
        ClientProducer addClientProducer = addClientProducer(addClientSession.createProducer("queues.testaddress"));
        ArrayList<String> arrayList = new ArrayList();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 500; i++) {
            ClientMessage createMessage = addClientSession.createMessage(true);
            String uuid = UUID.randomUUID().toString();
            createMessage.putStringProperty(Message.HDR_GROUP_ID, new SimpleString(uuid));
            SimpleString simpleString = new SimpleString(UUID.randomUUID().toString());
            createMessage.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, simpleString);
            if (i % 100 == 0) {
                arrayList.add(uuid);
            }
            addClientProducer.send(createMessage);
            IntegrationTestLogger.LOGGER.trace("Sent message to server 1 with dupID: " + simpleString);
        }
        addClientSession.commit();
        atomicInteger.addAndGet(500);
        IntegrationTestLogger.LOGGER.trace("Sent block of 500 messages to server 1. Total sent: " + atomicInteger.get());
        addClientSession.close();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool((arrayList.size() * 2) + 1);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(arrayList.size() + 1);
        final AtomicInteger atomicInteger3 = new AtomicInteger(0);
        final long currentTimeMillis = System.currentTimeMillis() + 5000;
        for (final String str : arrayList) {
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.5
                @Override // java.lang.Runnable
                public void run() {
                    ClientSessionFactory clientSessionFactory4;
                    String str2 = str;
                    String uuid2 = UUID.randomUUID().toString();
                    IntegrationTestLogger.LOGGER.debug("Starting producer thread...");
                    int i2 = 0;
                    try {
                        int incrementAndGet = atomicInteger2.incrementAndGet();
                        if (incrementAndGet % 3 == 0) {
                            clientSessionFactory4 = clientSessionFactory3;
                            i2 = 2;
                        } else if (incrementAndGet % 2 == 0) {
                            clientSessionFactory4 = clientSessionFactory2;
                            i2 = 1;
                        } else {
                            clientSessionFactory4 = clientSessionFactory;
                        }
                        IntegrationTestLogger.LOGGER.debug("Creating producer session factory to node " + i2);
                        ClientSession addClientSession2 = ClusteredGroupingTest.this.addClientSession(clientSessionFactory4.createSession(false, true, true));
                        ClientProducer addClientProducer2 = ClusteredGroupingTest.this.addClientProducer(addClientSession2.createProducer("queues.testaddress"));
                        int i3 = 0;
                        while (currentTimeMillis > System.currentTimeMillis()) {
                            ClientMessage createMessage2 = addClientSession2.createMessage(true);
                            createMessage2.putStringProperty(Message.HDR_GROUP_ID, new SimpleString(str2));
                            createMessage2.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString(uuid2 + ":" + i3));
                            try {
                                addClientProducer2.send(createMessage2);
                                atomicInteger.incrementAndGet();
                                i3++;
                            } catch (Exception e) {
                                IntegrationTestLogger.LOGGER.warn("Producer thread threw unexpected exception while sending messages to " + i2 + ": " + e.getMessage());
                                String str3 = str2 + "afterFail";
                            } catch (ActiveMQException e2) {
                                IntegrationTestLogger.LOGGER.warn("Producer thread threw exception while sending messages to " + i2 + ": " + e2.getMessage());
                                str2 = str2 + "afterFail";
                            }
                        }
                        countDownLatch.countDown();
                    } catch (Exception e3) {
                        atomicInteger3.incrementAndGet();
                        IntegrationTestLogger.LOGGER.warn("Producer thread couldn't establish connection", e3);
                    }
                }
            });
        }
        newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.6
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Cycling server");
                    ClusteredGroupingTest.this.cycleServer(1);
                } finally {
                    countDownLatch.countDown();
                }
            }
        });
        final AtomicInteger atomicInteger4 = new AtomicInteger(0);
        final AtomicInteger atomicInteger5 = new AtomicInteger(0);
        final CountDownLatch countDownLatch2 = new CountDownLatch(arrayList.size());
        for (String str2 : arrayList) {
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.7
                @Override // java.lang.Runnable
                public void run() {
                    ClientSessionFactory clientSessionFactory4;
                    ClientConsumer addClientConsumer;
                    ClientMessage receive;
                    try {
                        IntegrationTestLogger.LOGGER.info("Waiting to start consumer thread...");
                        countDownLatch.await(20L, TimeUnit.SECONDS);
                        IntegrationTestLogger.LOGGER.info("Starting consumer thread...");
                        int i2 = 0;
                        try {
                            synchronized (atomicInteger4) {
                                if (atomicInteger4.get() % 3 == 0) {
                                    clientSessionFactory4 = clientSessionFactory3;
                                    i2 = 2;
                                } else if (atomicInteger4.get() % 2 == 0) {
                                    clientSessionFactory4 = clientSessionFactory2;
                                    i2 = 1;
                                } else {
                                    clientSessionFactory4 = clientSessionFactory;
                                }
                                IntegrationTestLogger.LOGGER.info("Creating consumer session factory to node " + i2);
                                ClientSession addClientSession2 = ClusteredGroupingTest.this.addClientSession(clientSessionFactory4.createSession(false, false, true));
                                addClientConsumer = ClusteredGroupingTest.this.addClientConsumer(addClientSession2.createConsumer("queue0"));
                                addClientSession2.start();
                                atomicInteger4.incrementAndGet();
                            }
                            while (true) {
                                try {
                                    receive = addClientConsumer.receive(1000L);
                                } catch (ActiveMQException e) {
                                    atomicInteger3.incrementAndGet();
                                    IntegrationTestLogger.LOGGER.warn("Consumer thread threw exception while receiving messages from server " + i2 + ".: " + e.getMessage());
                                } catch (Exception e2) {
                                    atomicInteger3.incrementAndGet();
                                    IntegrationTestLogger.LOGGER.warn("Consumer thread threw unexpected exception while receiving messages from server " + i2 + ".: " + e2.getMessage());
                                    return;
                                }
                                if (receive == null) {
                                    countDownLatch2.countDown();
                                    return;
                                } else {
                                    receive.acknowledge();
                                    IntegrationTestLogger.LOGGER.trace("Consumed message " + receive.getStringProperty(Message.HDR_DUPLICATE_DETECTION_ID) + " from server " + i2 + ". Total consumed: " + atomicInteger5.incrementAndGet());
                                }
                            }
                        } catch (Exception e3) {
                            IntegrationTestLogger.LOGGER.info("Consumer thread couldn't establish connection", e3);
                            atomicInteger3.incrementAndGet();
                        }
                    } catch (InterruptedException e4) {
                        e4.printStackTrace();
                    }
                }
            });
        }
        countDownLatch2.await(20L, TimeUnit.SECONDS);
        newFixedThreadPool.shutdownNow();
        newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS);
        assertEquals(0L, atomicInteger3.get());
        assertEquals(atomicInteger.longValue(), atomicInteger5.longValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cycleServer(int i) {
        try {
            stopServers(i);
            startServers(i);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testGroupingBindingNotPresentAtStart() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 50000);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        verifyReceiveAll(1, 0, 1, 2);
        closeAllConsumers();
        closeSessionFactory(0);
        closeSessionFactory(1);
        closeSessionFactory(2);
        stopServers(0, 1, 2);
        long currentTimeMillis = System.currentTimeMillis();
        startServers(1, 2, 0);
        assertTrue("Server restart took a long wait even though it wasn't required as the server already had all the bindings", System.currentTimeMillis() - currentTimeMillis < ((long) 50000));
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        verifyReceiveAll(1, 0, 1, 2);
    }

    @Test
    public void testGroupingBindingsRemoved() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        closeAllConsumers();
        closeSessionFactory(0);
        closeSessionFactory(1);
        closeSessionFactory(2);
        stopServers(0);
        stopServers(1);
        startServers(0);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(2, isNetty());
        addConsumer(0, 0, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 1, 0, false);
        waitForBindings(2, "queues.testaddress", 1, 1, false);
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendWithProperty(0, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        verifyReceiveAll(1, 0);
        verifyReceiveAll(1, 0);
        addConsumer(1, 2, "queue0", null);
        verifyReceiveAll(1, 1);
    }

    @Test
    public void testTimeoutOnSending() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 0);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        setUpGroupHandler(new GroupingHandler() { // from class: org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusteredGroupingTest.8
            public void awaitBindings() throws Exception {
            }

            public void addListener(UnproposalListener unproposalListener) {
            }

            public void resendPending() throws Exception {
            }

            public void remove(SimpleString simpleString, SimpleString simpleString2) throws Exception {
            }

            public void forceRemove(SimpleString simpleString, SimpleString simpleString2) throws Exception {
            }

            public SimpleString getName() {
                return null;
            }

            public void remove(SimpleString simpleString, SimpleString simpleString2, int i) {
            }

            public void start() throws Exception {
            }

            public void stop() throws Exception {
            }

            public boolean isStarted() {
                return false;
            }

            public Response propose(Proposal proposal) throws Exception {
                return null;
            }

            public void proposed(Response response) throws Exception {
                System.out.println("ClusteredGroupingTest.proposed");
            }

            public void sendProposalResponse(Response response, int i) throws Exception {
                System.out.println("ClusteredGroupingTest.send");
            }

            public Response receive(Proposal proposal, int i) throws Exception {
                countDownLatch.countDown();
                return null;
            }

            public void onNotification(Notification notification) {
                System.out.println("ClusteredGroupingTest.onNotification " + notification);
            }

            public void addGroupBinding(GroupBinding groupBinding) {
                System.out.println("ClusteredGroupingTest.addGroupBinding");
            }

            public Response getProposal(SimpleString simpleString, boolean z) {
                return null;
            }
        }, 0);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        try {
            sendWithProperty(1, "queues.testaddress", 1, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
            assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testGroupingSendTo2queues() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendInRange(0, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(0, 10, 0);
        sendInRange(1, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 0);
    }

    @Test
    public void testGroupingSendTo3queues() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendInRange(0, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(0, 10, 0);
        sendInRange(1, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 0);
        sendInRange(2, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 0);
    }

    @Test
    public void testGroupingSendTo3queuesRemoteArbitrator() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(1, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(0, 10, 1);
        sendInRange(2, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 1);
        sendInRange(0, "queues.testaddress", 20, 30, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(20, 30, 1);
    }

    @Test
    public void testGroupingSendTo3queuesNoConsumerOnLocalQueue() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 0, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(1, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(0, 10, 0);
        sendInRange(2, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 0);
        sendInRange(0, "queues.testaddress", 20, 30, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(20, 30, 0);
    }

    @Test
    public void testGroupingRoundRobin() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendInRange(0, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        sendInRange(0, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
        sendInRange(0, "queues.testaddress", 20, 30, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
        verifyReceiveAllWithGroupIDRoundRobin(0, 10, 0, 1, 2);
    }

    @Test
    public void testGroupingSendTo3queuesQueueRemoved() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        sendInRange(0, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(0, 10, 0);
        sendInRange(1, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 0);
        sendInRange(2, "queues.testaddress", 20, 30, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(20, 30, 0);
        removeConsumer(0);
        removeConsumer(1);
        removeConsumer(2);
        deleteQueue(0, "queue0");
        deleteQueue(1, "queue0");
        deleteQueue(2, "queue0");
        createQueue(0, "queues.testaddress", "queue1", null, false);
        addConsumer(3, 0, "queue1", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, false);
        waitForBindings(2, "queues.testaddress", 1, 1, false);
        sendInRange(0, "queues.testaddress", 30, 40, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(30, 40, 3);
        sendInRange(1, "queues.testaddress", 40, 50, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(40, 50, 3);
        sendInRange(2, "queues.testaddress", 50, 60, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(50, 60, 3);
        System.out.println("*****************************************************************************");
    }

    @Test
    public void testGroupingSendTo3queuesPinnedNodeGoesDown() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(1, "queues.testaddress", 0, 10, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(true, 0, 10, 0);
        closeAllConsumers();
        stopServers(1);
        startServers(1);
        closeSessionFactory(1);
        setupSessionFactory(1, isNetty());
        addConsumer(0, 1, "queue0", null);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        sendInRange(2, "queues.testaddress", 10, 20, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 0);
        System.out.println("*****************************************************************************");
    }

    @Test
    public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(1, "queues.testaddress", 0, 10, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(true, 0, 10, 0);
        closeAllConsumers();
        sendInRange(2, "queues.testaddress", 10, 20, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
        stopServers(1);
        closeSessionFactory(1);
        startServers(1);
        setupSessionFactory(1, isNetty());
        addConsumer(1, 1, "queue0", null);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        verifyReceiveAllInRange(10, 20, 1);
    }

    @Test
    public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500L, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        createQueue(2, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 0, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(1, "queues.testaddress", 2, 0, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(1, "queues.testaddress", 0, 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(0, 10, 0);
        stopServers(1);
        closeSessionFactory(1);
        startServers(1);
        setupSessionFactory(1, isNetty());
        addConsumer(1, 1, "queue0", null);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 1, false);
        waitForBindings(2, "queues.testaddress", 2, 1, false);
        sendInRange(2, "queues.testaddress", 10, 20, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(10, 20, 1);
        sendInRange(0, "queues.testaddress", 20, 30, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAllInRange(20, 30, 1);
    }

    @Test
    public void testGroupingMultipleQueuesOnAddress() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        createQueue(0, "queues.testaddress", "queue1", null, false);
        createQueue(1, "queues.testaddress", "queue1", null, false);
        createQueue(2, "queues.testaddress", "queue1", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        addConsumer(3, 0, "queue0", null);
        addConsumer(4, 1, "queue0", null);
        addConsumer(5, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 2, 2, true);
        waitForBindings(1, "queues.testaddress", 2, 2, true);
        waitForBindings(2, "queues.testaddress", 2, 2, true);
        waitForBindings(0, "queues.testaddress", 4, 4, false);
        waitForBindings(1, "queues.testaddress", 4, 4, false);
        waitForBindings(2, "queues.testaddress", 4, 4, false);
        sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
        verifyReceiveAll(10, 0);
    }

    public void testGroupingMultipleSending() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupServer(2, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
        setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
        setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
        startServers(0, 1, 2);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        setupSessionFactory(2, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, false);
        createQueue(1, "queues.testaddress", "queue0", null, false);
        createQueue(2, "queues.testaddress", "queue0", null, false);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        addConsumer(2, 2, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(2, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 2, 2, false);
        waitForBindings(1, "queues.testaddress", 2, 2, false);
        waitForBindings(2, "queues.testaddress", 2, 2, false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread[] threadArr = new Thread[9];
        int i = 0;
        int i2 = 0;
        while (i2 < 9) {
            threadArr[i2] = new Thread(new ThreadSender(i, i + 10, 1, new SimpleString("id" + i2), countDownLatch, i2 < 8));
            i2++;
            i += 10;
        }
        for (Thread thread : threadArr) {
            thread.start();
        }
        verifyReceiveAllWithGroupIDRoundRobin(0, 30, 0, 1, 2);
    }

    public boolean isNetty() {
        return true;
    }
}
