package org.apache.activemq.artemis.tests.integration.openwire.cluster;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.class */
public class MessageRedistributionTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isForceUniqueStorageManagerIds() {
        return false;
    }

    @Test
    public void testRemoteConsumerClose() throws Exception {
        setupServer(0, true, true);
        setupServer(1, true, true);
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
        startServers(0, 1);
        waitForTopology(this.servers[0], 2);
        waitForTopology(this.servers[1], 2);
        setupSessionFactory(0, true);
        setupSessionFactory(1, true);
        createAddressInfo(0, "queues.testAddress", RoutingType.ANYCAST, -1, false);
        createAddressInfo(1, "queues.testAddress", RoutingType.ANYCAST, -1, false);
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        for (int i = 0; i < 50; i++) {
            closeConsumerAndConnectionConcurrently(i % 2, (i + 1) % 2);
        }
    }

    @Test
    public void testFailoverNonClusteredBrokersInteropWithCoreProducer() throws Exception {
        setupServer(0, true, true);
        setupServer(1, true, true);
        startServers(0, 1);
        ((AddressSettings) this.servers[0].getAddressSettingsRepository().getMatch("#")).setRedeliveryDelay(0L).setRedistributionDelay(0L);
        ((AddressSettings) this.servers[1].getAddressSettingsRepository().getMatch("#")).setRedeliveryDelay(0L).setRedistributionDelay(0L);
        setupSessionFactory(0, true);
        setupSessionFactory(1, true);
        createAddressInfo(0, "q", RoutingType.ANYCAST, -1, false);
        createAddressInfo(1, "q", RoutingType.ANYCAST, -1, false);
        createQueue(0, "q", "q", null, true, RoutingType.ANYCAST);
        createQueue(1, "q", "q", null, true, RoutingType.ANYCAST);
        produceWithCoreTo(0, 1000);
        produceWithCoreTo(1, 1000);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + getServerUri(0) + "," + getServerUri(1) + ")?jms.prefetchPolicy.all=10&randomize=false&timeout=400&reconnectDelay=500&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500&nested.wireFormat.maxInactivityDurationInitalDelay=500&nested.ignoreRemoteWireFormat=true&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        new CountDownLatch(1);
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        createConnection.setClientInternalExceptionListener((v0) -> {
            v0.printStackTrace();
        });
        createConnection.createSession(false, 1).createConsumer(ActiveMQDestination.createDestination("q", (byte) 1)).setMessageListener(message -> {
            try {
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        Assertions.assertTrue(Wait.waitFor(() -> {
            return countDownLatch.getCount() <= 1000;
        }));
        this.servers[0].stop(false, true);
        Assertions.assertTrue(Wait.waitFor(() -> {
            return countDownLatch.await(1L, TimeUnit.SECONDS);
        }));
        createConnection.close();
    }

    private void produceWithCoreTo(int i, int i2) throws Exception {
        Connection connection = null;
        try {
            connection = ActiveMQJMSClient.createConnectionFactory(getServerUri(i), "cf" + i).createConnection();
            connection.setClientID("theProducer");
            Session createSession = connection.createSession(false, 1);
            TextMessage createTextMessage = createSession.createTextMessage("TEXT");
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("q"));
            for (int i3 = 0; i3 < i2; i3++) {
                createTextMessage.setIntProperty("MM", i3);
                createTextMessage.setIntProperty("SN", i);
                createProducer.send(createTextMessage);
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    @Test
    public void testAdvisoriesNotClustered() throws Exception {
        setupServer(0, true, true);
        setupServer(1, true, true);
        setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
        setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
        startServers(0, 1);
        waitForTopology(this.servers[0], 2);
        waitForTopology(this.servers[1], 2);
        setupSessionFactory(0, true);
        setupSessionFactory(1, true);
        createAddressInfo(0, "testAddress", RoutingType.MULTICAST, -1, false);
        createAddressInfo(1, "testAddress", RoutingType.MULTICAST, -1, false);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(getServerUri(1));
        Connection connection = null;
        Connection connection2 = null;
        new CountDownLatch(1);
        try {
            connection = activeMQConnectionFactory.createConnection();
            connection2 = activeMQConnectionFactory2.createConnection();
            connection2.setClientID("id");
            connection2.start();
            Session createSession = connection.createSession(false, 1);
            Session createSession2 = connection2.createSession(false, 1);
            Topic createDestination = ActiveMQDestination.createDestination("testAddress", (byte) 2);
            TopicSubscriber createDurableSubscriber = createSession2.createDurableSubscriber(createDestination, "mySubscriber");
            createSession.createProducer(createDestination).send(createSession.createTextMessage("test message"));
            createDurableSubscriber.receive(5000L);
            SimpleString of = SimpleString.of("ActiveMQ.Advisory.TempQueue");
            SimpleString of2 = SimpleString.of("ActiveMQ.Advisory.TempTopic");
            Assertions.assertEquals(1, this.servers[0].getPostOffice().getBindingsForAddress(of).getBindings().size(), "");
            Assertions.assertEquals(1, this.servers[0].getPostOffice().getBindingsForAddress(of2).getBindings().size(), "");
            Assertions.assertEquals(1, this.servers[1].getPostOffice().getBindingsForAddress(of).getBindings().size(), "");
            Assertions.assertEquals(1, this.servers[1].getPostOffice().getBindingsForAddress(of2).getBindings().size(), "");
            connection.close();
            connection2.close();
        } catch (Throwable th) {
            connection.close();
            connection2.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isResolveProtocols() {
        return true;
    }

    private void closeConsumerAndConnectionConcurrently(int i, int i2) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(getServerUri(i));
        Connection connection = null;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            connection = activeMQConnectionFactory.createConnection();
            connection.start();
            ConsumerThread consumerThread = new ConsumerThread(connection.createSession(false, 1), ActiveMQDestination.createDestination("queue0", (byte) 1));
            consumerThread.setMessageCount(0);
            consumerThread.setFinished(countDownLatch);
            consumerThread.start();
            Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "consumer takes too long to finish!");
            connection.close();
            Wait.waitFor(() -> {
                return getRemoteQueueBinding(this.servers[i2]) != null;
            });
            RemoteQueueBinding remoteQueueBinding = getRemoteQueueBinding(this.servers[i2]);
            Assertions.assertNotNull(remoteQueueBinding);
            Wait.waitFor(() -> {
                return remoteQueueBinding.consumerCount() >= 0;
            });
            int consumerCount = remoteQueueBinding.consumerCount();
            Assertions.assertTrue(consumerCount >= 0, "consumer count should never be negative " + consumerCount);
        } catch (Throwable th) {
            connection.close();
            throw th;
        }
    }

    private RemoteQueueBinding getRemoteQueueBinding(ActiveMQServer activeMQServer) throws Exception {
        return getRemoteQueueBinding(activeMQServer.getPostOffice().getBindingsForAddress(SimpleString.of("queues.testaddress")).getBindings());
    }

    private RemoteQueueBinding getRemoteQueueBinding(Collection<Binding> collection) {
        RemoteQueueBinding remoteQueueBinding = null;
        Iterator<Binding> it = collection.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Binding next = it.next();
            if (next instanceof RemoteQueueBinding) {
                remoteQueueBinding = (RemoteQueueBinding) next;
                break;
            }
        }
        return remoteQueueBinding;
    }
}
