package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.class */
public class ClusteredBridgeReconnectTest extends ClusterTestBase {
    @Test
    public void testReconnectBridge() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
        setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
        startServers(0, 1);
        setupSessionFactory(0, isNetty());
        setupSessionFactory(1, isNetty());
        createQueue(0, "queues.testaddress", "queue0", null, true);
        createQueue(1, "queues.testaddress", "queue0", null, true);
        addConsumer(0, 0, "queue0", null);
        addConsumer(1, 1, "queue0", null);
        waitForBindings(0, "queues.testaddress", 1, 1, true);
        waitForBindings(1, "queues.testaddress", 1, 1, true);
        waitForBindings(0, "queues.testaddress", 1, 1, false);
        waitForBindings(1, "queues.testaddress", 1, 1, false);
        ClientSession createSession = this.sfs[0].createSession();
        ClientSession createSession2 = this.sfs[0].createSession();
        createSession.start();
        createSession2.start();
        ClientProducer createProducer = createSession.createProducer("queues.testaddress");
        Assert.assertEquals(1L, this.servers[0].getClusterManager().getClusterConnections().size());
        ClusterConnectionImpl clusterConnectionImpl = ((ClusterConnectionImpl[]) this.servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0]))[0];
        Assert.assertEquals(1L, clusterConnectionImpl.getRecords().size());
        ClusterConnectionBridge bridge = ((MessageFlowRecord[]) clusterConnectionImpl.getRecords().values().toArray(new MessageFlowRecord[1]))[0].getBridge();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createMessage(true));
            createSession.commit();
            if (i == 17) {
                bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!"));
            }
        }
        int i2 = 0;
        int i3 = 0;
        while (true) {
            ClientMessage receive = this.consumers[0].getConsumer().receive(1000L);
            if (receive == null) {
                break;
            }
            i2++;
            receive.acknowledge();
            createSession.commit();
        }
        while (true) {
            ClientMessage receive2 = this.consumers[1].getConsumer().receive(1000L);
            if (receive2 == null) {
                Assert.assertEquals("cons0 = " + i2 + ", cons1 = " + i3, 100, i2 + i3);
                createSession.commit();
                createSession2.commit();
                Assert.assertEquals(1L, ((ClusterConnectionImpl[]) this.servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0]))[0].getRecords().size());
                Assert.assertNotNull(bridge.getSessionFactory());
                stopServers(0, 1);
                return;
            }
            i3++;
            receive2.acknowledge();
            createSession2.commit();
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    @After
    public void tearDown() throws Exception {
        closeAllConsumers();
        closeAllSessionFactories();
        closeAllServerLocatorsFactories();
        super.tearDown();
    }

    public boolean isNetty() {
        return true;
    }
}
