package org.apache.activemq.artemis.tests.integration.openwire.cluster;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.class */
public class MessageRedistributionTest extends ClusterTestBase {
    @Test
    public void testRemoteConsumerClose() throws Exception {
        setupServer(0, true, true);
        setupServer(1, true, true);
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
        startServers(0, 1);
        waitForTopology(this.servers[0], 2);
        waitForTopology(this.servers[1], 2);
        setupSessionFactory(0, true);
        setupSessionFactory(1, true);
        createAddressInfo(0, "queues.testAddress", RoutingType.ANYCAST, -1, false);
        createAddressInfo(1, "queues.testAddress", RoutingType.ANYCAST, -1, false);
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        for (int i = 0; i < 50; i++) {
            closeConsumerAndConnectionConcurrently(i % 2, (i + 1) % 2);
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isResolveProtocols() {
        return true;
    }

    private void closeConsumerAndConnectionConcurrently(int i, int i2) throws Exception {
        String serverUri = getServerUri(i);
        System.out.println("uri is " + serverUri);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(serverUri);
        Connection connection = null;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            connection = activeMQConnectionFactory.createConnection();
            connection.start();
            ConsumerThread consumerThread = new ConsumerThread(connection.createSession(false, 1), ActiveMQDestination.createDestination("queue0", (byte) 1));
            consumerThread.setMessageCount(0);
            consumerThread.setFinished(countDownLatch);
            consumerThread.start();
            assertTrue("consumer takes too long to finish!", countDownLatch.await(5L, TimeUnit.SECONDS));
            connection.close();
            Wait.waitFor(() -> {
                return getRemoteQueueBinding(this.servers[i2]) != null;
            });
            RemoteQueueBinding remoteQueueBinding = getRemoteQueueBinding(this.servers[i2]);
            assertNotNull(remoteQueueBinding);
            Wait.waitFor(() -> {
                return remoteQueueBinding.consumerCount() >= 0;
            });
            int consumerCount = remoteQueueBinding.consumerCount();
            assertTrue("consumer count should never be negative " + consumerCount, consumerCount >= 0);
        } catch (Throwable th) {
            connection.close();
            throw th;
        }
    }

    private RemoteQueueBinding getRemoteQueueBinding(ActiveMQServer activeMQServer) throws Exception {
        return getRemoteQueueBinding(activeMQServer.getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings());
    }

    private RemoteQueueBinding getRemoteQueueBinding(Collection<Binding> collection) {
        RemoteQueueBinding remoteQueueBinding = null;
        Iterator<Binding> it = collection.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Binding next = it.next();
            if (next instanceof RemoteQueueBinding) {
                remoteQueueBinding = (RemoteQueueBinding) next;
                break;
            }
        }
        return remoteQueueBinding;
    }
}
