package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.class */
public class ClusteredBridgeReconnectTest extends ClusterTestBase {
    @Test
    public void testReconnectBridge() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 1, false);
        ClientSession createSession = this.sfs[0].createSession();
        ClientSession createSession2 = this.sfs[0].createSession();
        createSession.start();
        createSession2.start();
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        int i = 100;
        Assertions.assertEquals(1, this.servers[0].getClusterManager().getClusterConnections().size());
        ClusterConnectionImpl clusterConnectionImpl = ((ClusterConnectionImpl[]) this.servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0]))[0];
        Assertions.assertEquals(1, clusterConnectionImpl.getRecords().size());
        ClusterConnectionBridge bridge = ((MessageFlowRecord[]) clusterConnectionImpl.getRecords().values().toArray(new MessageFlowRecord[1]))[0].getBridge();
        Wait.assertEquals(2, () -> {
            return bridge.getSessionFactory().getServerLocator().getTopology().getMembers().size();
        });
        ArrayList arrayList = new ArrayList(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i2 = 0; i2 < 5; i2++) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Thread thread = new Thread(() -> {
                for (int i3 = 0; i3 < i; i3++) {
                    try {
                        createProducer.send(createSession.createMessage(true));
                        countDownLatch.countDown();
                        if (i3 % 10 == 0) {
                            createSession.commit();
                        }
                    } catch (Exception e) {
                        atomicInteger.incrementAndGet();
                        e.printStackTrace();
                        return;
                    }
                }
                createSession.commit();
            });
            thread.start();
            ArtemisExecutor executor = this.servers[0].getExecutorFactory().getExecutor();
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Wait.waitFor(() -> {
                return BridgeTestAccessor.withinRefs(bridge, map -> {
                    synchronized (map) {
                        if (map.size() <= 0) {
                            return false;
                        }
                        executor.execute(() -> {
                            bridge.connectionFailed(new ActiveMQException("bye"), false);
                        });
                        return true;
                    }
                });
            }, 500L, 1L);
            Queue queue = bridge.getQueue();
            Objects.requireNonNull(queue);
            Wait.assertEquals(0L, queue::getMessageCount, 5000L, 1L);
            Queue queue2 = bridge.getQueue();
            Objects.requireNonNull(queue2);
            Wait.assertEquals(0, queue2::getDeliveringCount, 5000L, 1L);
            thread.join(5000L);
        }
        Assertions.assertEquals(0, atomicInteger.get());
        Wait.assertEquals(2, () -> {
            return bridge.getSessionFactory().getServerLocator().getTopology().getMembers().size();
        });
        ArrayList arrayList2 = new ArrayList(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
        boolean z = true;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            TopologyMemberImpl topologyMemberImpl = (TopologyMemberImpl) it.next();
            boolean z2 = false;
            Iterator it2 = arrayList2.iterator();
            while (true) {
                if (it2.hasNext()) {
                    if (topologyMemberImpl.equals((TopologyMember) it2.next())) {
                        z2 = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (!z2) {
                z = false;
            }
        }
        Assertions.assertTrue(z, "The topology is slightly different after a reconnect");
        int i3 = 0;
        int i4 = 0;
        while (true) {
            ClientMessage receiveImmediate = this.consumers[0].getConsumer().receiveImmediate();
            if (receiveImmediate == null) {
                break;
            }
            i3++;
            receiveImmediate.acknowledge();
            createSession.commit();
        }
        while (true) {
            ClientMessage receiveImmediate2 = this.consumers[1].getConsumer().receiveImmediate();
            if (receiveImmediate2 == null) {
                Assertions.assertEquals(100 * 5, i3 + i4, "cons0 = " + i3 + ", cons1 = " + i4);
                createSession.commit();
                createSession2.commit();
                Assertions.assertEquals(1, ((ClusterConnectionImpl[]) this.servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0]))[0].getRecords().size());
                Assertions.assertNotNull(bridge.getSessionFactory());
                stopServers(0, 1);
                return;
            }
            i4++;
            receiveImmediate2.acknowledge();
            createSession2.commit();
        }
    }

    @Test
    public void testClusterBridgeAddRemoteBinding() throws Exception {
        String uuid = UUID.randomUUID().toString();
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.testaddress", uuid, null, false);
        addConsumer(0, 0, uuid, null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 0, 0, true);
        waitForBindings(0, "queues.testaddress", 0, 0, false);
        waitForBindings(1, "queues.testaddress", 1, 1, false);
        ClientSession createSession = this.sfs[0].createSession();
        ClientSession createSession2 = this.sfs[1].createSession();
        createSession.start();
        createSession2.start();
        ClientProducer createProducer = createSession2.createProducer("queues.testaddress");
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession2.createMessage(true));
            createSession2.commit();
        }
        int i2 = 0;
        while (true) {
            ClientMessage receive = this.consumers[0].getConsumer().receive(1000L);
            if (receive == null) {
                break;
            }
            i2++;
            receive.acknowledge();
            createSession.commit();
        }
        Assertions.assertEquals(10, i2);
        removeConsumer(0);
        this.servers[0].getManagementService().enableNotifications(false);
        this.servers[0].destroyQueue(SimpleString.of(uuid));
        this.servers[0].getManagementService().enableNotifications(true);
        createQueue(0, "queues.testaddress", uuid, null, false);
        addConsumer(0, 0, uuid, null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 0, 0, true);
        waitForBindings(0, "queues.testaddress", 0, 0, false);
        waitForBindings(1, "queues.testaddress", 1, 1, false);
        for (int i3 = 0; i3 < 10; i3++) {
            createProducer.send(createSession2.createMessage(true));
            createSession2.commit();
        }
        int i4 = 0;
        while (true) {
            ClientMessage receive2 = this.consumers[0].getConsumer().receive(2000L);
            if (receive2 == null) {
                Assertions.assertEquals(10, i4);
                stopServers(0, 1);
                return;
            } else {
                i4++;
                receive2.acknowledge();
                createSession.commit();
            }
        }
    }

    @Test
    public void testPauseAddressBlockingSnFQueue() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setRedistributionDelay(0L);
        this.servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
        this.servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        ClientSession createSession = this.sfs[0].createSession();
        ClientSession createSession2 = this.sfs[1].createSession();
        createSession.start();
        createSession2.start();
        createQueue(0, "queues.testaddress", "queue1", null, true);
        createQueue(1, "queues.testaddress", "queue1", null, true);
        ClientConsumer createConsumer = createSession2.createConsumer("queue1");
        waitForBindings(0, "queues.testaddress", 1, 0, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 0, false);
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        for (int i = 0; i < 10; i++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putStringProperty("origin", "from producer 0");
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            ClientMessage receive = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive);
            Assertions.assertEquals("from producer 0", receive.getStringProperty("origin"));
            receive.acknowledge();
        }
        Assertions.assertNull(createConsumer.receiveImmediate());
        ((AddressControl) this.servers[0].getManagementService().getResource("address.queues.testaddress")).pause();
        Bindings bindingsForAddress = this.servers[0].getPostOffice().getBindingsForAddress(SimpleString.of("queues.testaddress"));
        Assertions.assertNotNull(bindingsForAddress);
        Assertions.assertEquals(2, bindingsForAddress.getBindings().size());
        boolean z = false;
        boolean z2 = true;
        for (LocalQueueBinding localQueueBinding : bindingsForAddress.getBindings()) {
            if (localQueueBinding instanceof LocalQueueBinding) {
                z = localQueueBinding.getQueue().isPaused();
            }
            if (localQueueBinding instanceof RemoteQueueBinding) {
                z2 = ((RemoteQueueBinding) localQueueBinding).getQueue().isPaused();
            }
        }
        Assertions.assertTrue(z);
        Assertions.assertFalse(z2);
        for (int i3 = 0; i3 < 10; i3++) {
            ClientMessage createMessage2 = createSession.createMessage(true);
            createMessage2.putStringProperty("origin", "from producer 0");
            createProducer.send(createMessage2);
        }
        for (int i4 = 0; i4 < 10; i4++) {
            ClientMessage receive2 = createConsumer.receive(5000L);
            Assertions.assertNotNull(receive2);
            Assertions.assertEquals("from producer 0", receive2.getStringProperty("origin"));
            receive2.acknowledge();
        }
        Assertions.assertNull(createConsumer.receiveImmediate());
        stopServers(0, 1);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @AfterEach
    public void tearDown() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        super.tearDown();
    }

    public boolean isNetty() {
        return true;
    }
}
