package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.class */
public class BridgeReconnectTest extends BridgeTestBase {

    @Parameterized.Parameter(0)
    public boolean persistCache;
    private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
    private static final int NUM_MESSAGES = 100;
    Map<String, Object> server0Params;
    Map<String, Object> server1Params;
    Map<String, Object> server2Params;
    ActiveMQServer server0;
    ActiveMQServer server1;
    ActiveMQServer server2;
    ServerLocator locator;
    ClientSession session0;
    ClientSession session1;
    ClientSession session2;
    private TransportConfiguration server1tc;
    private Map<String, TransportConfiguration> connectors;
    private ArrayList<String> staticConnectors;
    final String bridgeName = "bridge1";
    final String testAddress = "testAddress";
    final String queueName = "queue0";
    final String forwardAddress = "forwardAddress";
    final long retryInterval = 50;
    final double retryIntervalMultiplier = 1.0d;
    final int confirmationWindowSize = 1024;
    int reconnectAttempts = 3;

    @Parameterized.Parameters(name = "persistentCache={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server0Params = new HashMap();
        this.server1Params = new HashMap();
        this.server2Params = new HashMap();
        this.connectors = new HashMap();
        this.server1 = createActiveMQServer(1, isNetty(), this.server1Params);
        this.server1tc = new TransportConfiguration(getConnector(), this.server1Params, "server1tc");
        this.connectors.put(this.server1tc.getName(), this.server1tc);
        this.staticConnectors = new ArrayList<>();
        this.staticConnectors.add(this.server1tc.getName());
    }

    protected boolean isNetty() {
        return false;
    }

    private String getConnector() {
        return isNetty() ? NETTY_CONNECTOR_FACTORY : INVM_CONNECTOR_FACTORY;
    }

    @Test
    public void testFailoverDeploysBridge() throws Exception {
        InVMNodeManager inVMNodeManager = new InVMNodeManager(false);
        this.server0 = createActiveMQServer(0, this.server0Params, isNetty(), inVMNodeManager);
        this.server2 = createBackupActiveMQServer(2, this.server2Params, isNetty(), 0, inVMNodeManager);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), this.server2Params, "server2tc");
        this.connectors.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server1.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server2.getConfiguration().setConnectorConfigurations(this.connectors);
        this.reconnectAttempts = -1;
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        createBridgeConfig.setQueueName("queue0");
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        this.server2.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server1.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server0.getConfiguration().setQueueConfigs(arrayList3);
        this.server2.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        waitForServerStart(this.server0);
        this.server0.fail(true);
        waitForServerStart(this.server2);
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        this.session0 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration2)).createSession(false, true, true);
        assertTrue("backup must deploy bridge on failover", !this.server2.getClusterManager().getBridges().isEmpty());
    }

    @Test
    public void testFailoverAndReconnectImmediately() throws Exception {
        InVMNodeManager inVMNodeManager = new InVMNodeManager(false);
        this.server0 = createActiveMQServer(0, this.server0Params, isNetty(), inVMNodeManager);
        this.server2 = createBackupActiveMQServer(2, this.server2Params, isNetty(), 0, inVMNodeManager);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        TransportConfiguration transportConfiguration2 = new TransportConfiguration(getConnector(), this.server2Params, "server2tc");
        this.connectors.put(transportConfiguration2.getName(), transportConfiguration2);
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server1.getConfiguration().setConnectorConfigurations(this.connectors);
        this.reconnectAttempts = 1;
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        this.server2.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        log.debug("** failing connection");
        this.server0.fail(true);
        waitForServerStart(this.server2);
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{transportConfiguration, transportConfiguration2}));
        this.session0 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration2)).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        this.session2 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration2)).createSession(false, true, true);
        ClientConsumer createConsumer = this.session2.createConsumer("queue0");
        this.session2.start();
        SimpleString simpleString = new SimpleString("propkey");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(true);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(1500L);
            assertNotNull(receive);
            assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
        }
        closeServers();
        assertNoMoreConnections();
    }

    private BridgeConfiguration createBridgeConfig() {
        return new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setRetryInterval(50L).setRetryIntervalMultiplier(1.0d).setReconnectAttempts(this.reconnectAttempts).setReconnectAttemptsOnSameNode(0).setConfirmationWindowSize(1024).setStaticConnectors(this.staticConnectors).setPassword("UnitTestsClusterPassword");
    }

    @Test
    public void testFailoverAndReconnectAfterAFewTries() throws Exception {
        InVMNodeManager inVMNodeManager = new InVMNodeManager(false);
        this.server0 = createActiveMQServer(0, this.server0Params, isNetty(), inVMNodeManager);
        this.server2 = createBackupActiveMQServer(2, this.server2Params, isNetty(), 0, inVMNodeManager);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server2Params, "server2tc");
        this.connectors.put(transportConfiguration.getName(), transportConfiguration);
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server1.getConfiguration().setConnectorConfigurations(this.connectors);
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        this.server2.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        this.server0.fail(true);
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(new TransportConfiguration[]{transportConfiguration})).setReconnectAttempts(100);
        this.session0 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration)).createSession(false, true, true);
        this.session2 = addSessionFactory(this.locator.createSessionFactory(transportConfiguration)).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        ClientConsumer createConsumer = this.session2.createConsumer("queue0");
        this.session2.start();
        SimpleString simpleString = new SimpleString("propkey");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(false);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(1500L);
            assertNotNull(receive);
            assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
        }
        closeServers();
        assertNoMoreConnections();
    }

    @Test
    public void testReconnectSameNode() throws Exception {
        this.server0 = createActiveMQServer(0, isNetty(), this.server0Params);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server1.getConfiguration().setConnectorConfigurations(this.connectors);
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(new TransportConfiguration[]{transportConfiguration, this.server1tc}));
        this.session0 = this.locator.createSessionFactory(transportConfiguration).createSession(false, true, true);
        this.session1 = this.locator.createSessionFactory(this.server1tc).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        ClientConsumer createConsumer = this.session1.createConsumer("queue0");
        this.session1.start();
        Bridge bridge = (Bridge) this.server0.getClusterManager().getBridges().get("bridge1");
        assertNotNull(bridge);
        RemotingConnection forwardingConnection = getForwardingConnection(bridge);
        InVMConnector.failOnCreateConnection = true;
        InVMConnector.numberOfFailures = this.reconnectAttempts - 1;
        forwardingConnection.fail(new ActiveMQNotConnectedException());
        getForwardingConnection(bridge).fail(new ActiveMQNotConnectedException());
        assertEquals(0L, ((QueueControl) this.server0.getManagementService().getResource("queue.queue0")).getDeliveringCount());
        SimpleString simpleString = new SimpleString("propkey");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(false);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
        }
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(1500L);
            assertNotNull(receive);
            assertEquals(Integer.valueOf(i2), receive.getObjectProperty(simpleString));
        }
        closeServers();
        assertNoMoreConnections();
    }

    @Test
    public void testReconnectSameNodeAfterDeliveryWithBlocking() throws Exception {
        this.server0 = createActiveMQServer(0, isNetty(), this.server0Params);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server1.getConfiguration().setConnectorConfigurations(this.connectors);
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(new TransportConfiguration[]{transportConfiguration, this.server1tc}));
        this.session0 = this.locator.createSessionFactory(transportConfiguration).createSession(false, true, true);
        this.session1 = this.locator.createSessionFactory(this.server1tc).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        ClientConsumer createConsumer = this.session1.createConsumer("queue0");
        this.session1.start();
        QueueControl queueControl = (QueueControl) this.server0.getManagementService().getResource("queue.queue0");
        assertEquals(0L, queueControl.getDeliveringCount());
        SimpleString simpleString = new SimpleString("propkey");
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        final CountDownLatch countDownLatch2 = new CountDownLatch(200);
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.server0.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeReconnectTest.1
            public void afterDeliverBridge(Bridge bridge, MessageReference messageReference, HandleStatus handleStatus) throws ActiveMQException {
                super.afterDeliverBridge(bridge, messageReference, handleStatus);
                countDownLatch.countDown();
                countDownLatch2.countDown();
            }
        });
        this.server1.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() { // from class: org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeReconnectTest.2
            public void beforeSend(ServerSession serverSession, Transaction transaction, Message message, boolean z, boolean z2) throws ActiveMQException {
                synchronizedList.add(message);
                try {
                    countDownLatch2.await();
                } catch (InterruptedException e) {
                    BridgeReconnectTest.log.debug(e);
                }
            }

            public void beforeMessageRoute(Message message, RoutingContext routingContext, boolean z, boolean z2) throws ActiveMQException {
                if (synchronizedList.contains(message)) {
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        BridgeReconnectTest.log.debug(e);
                    } catch (BrokenBarrierException e2) {
                        BridgeReconnectTest.log.debug(e2);
                    }
                }
            }
        });
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(false);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
        }
        countDownLatch.await();
        assertEquals(100L, queueControl.getDeliveringCount());
        Bridge bridge = (Bridge) this.server0.getClusterManager().getBridges().get("bridge1");
        assertNotNull(bridge);
        getForwardingConnection(bridge).fail(new ActiveMQNotConnectedException());
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(1500L);
            assertNotNull(receive);
            assertNull(concurrentHashMap.putIfAbsent(receive.getIntProperty(simpleString), receive));
        }
        closeServers();
        assertNoMoreConnections();
        HashMap countJournal = countJournal(this.server1.getConfiguration());
        if (this.persistCache) {
            Assert.assertEquals(100L, ((AtomicInteger) countJournal.get(37)).intValue());
        } else {
            Assert.assertNull(countJournal.get(37));
        }
    }

    @Test
    public void testShutdownServerCleanlyAndReconnectSameNodeWithSleep() throws Exception {
        testShutdownServerCleanlyAndReconnectSameNode(true);
    }

    @Test
    public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception {
        testShutdownServerCleanlyAndReconnectSameNode(false);
    }

    private void testShutdownServerCleanlyAndReconnectSameNode(boolean z) throws Exception {
        Assume.assumeTrue(this.persistCache);
        this.server0 = createActiveMQServer(0, isNetty(), this.server0Params);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        this.server1.getConfiguration().setConnectorConfigurations(this.connectors);
        this.reconnectAttempts = -1;
        BridgeConfiguration password = new BridgeConfiguration().setName("bridge1").setQueueName("queue0").setForwardingAddress("forwardAddress").setClientFailureCheckPeriod(1000L).setRetryInterval(50L).setRetryIntervalMultiplier(1.0d).setReconnectAttempts(this.reconnectAttempts).setReconnectAttemptsOnSameNode(0).setConfirmationWindowSize(1024).setStaticConnectors(this.staticConnectors).setPassword("UnitTestsClusterPassword");
        ArrayList arrayList = new ArrayList();
        arrayList.add(password);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        waitForServerStart(this.server0);
        waitForServerStart(this.server1);
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(new TransportConfiguration[]{transportConfiguration, this.server1tc}));
        this.session0 = this.locator.createSessionFactory(transportConfiguration).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        log.debug("stopping server1");
        this.server1.stop();
        if (z) {
            Thread.sleep(2000L);
        }
        log.debug("restarting server1");
        this.server1.start();
        log.debug("server 1 restarted");
        this.session1 = this.locator.createSessionFactory(this.server1tc).createSession(false, true, true);
        ClientConsumer createConsumer = this.session1.createConsumer("queue0");
        this.session1.start();
        SimpleString simpleString = new SimpleString("propkey");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(false);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
        }
        log.debug("sent messages");
        for (int i2 = 0; i2 < 100; i2++) {
            ClientMessage receive = createConsumer.receive(30000L);
            assertNotNull("received expected msg", receive);
            assertEquals("property value matches", Integer.valueOf(i2), receive.getObjectProperty(simpleString));
        }
        log.debug("got messages");
        closeServers();
        assertNoMoreConnections();
    }

    private void closeServers() throws Exception {
        if (this.session0 != null) {
            this.session0.close();
        }
        if (this.session1 != null) {
            this.session1.close();
        }
        if (this.session2 != null) {
            this.session2.close();
        }
        if (this.locator != null) {
            this.locator.close();
        }
        this.server0.stop();
        this.server1.stop();
        if (this.server2 != null) {
            this.server2.stop();
        }
    }

    private void assertNoMoreConnections() {
        assertEquals(0L, this.server0.getRemotingService().getConnections().size());
        assertEquals(0L, this.server1.getRemotingService().getConnections().size());
        if (this.server2 != null) {
            assertEquals(0L, this.server2.getRemotingService().getConnections().size());
        }
    }

    @Test
    public void testFailoverThenFailAgainAndReconnect() throws Exception {
        this.server0 = createActiveMQServer(0, isNetty(), this.server0Params);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(new TransportConfiguration[]{transportConfiguration, this.server1tc}));
        this.session0 = this.locator.createSessionFactory(transportConfiguration).createSession(false, true, true);
        this.session1 = this.locator.createSessionFactory(this.server1tc).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        ClientConsumer createConsumer = this.session1.createConsumer("queue0");
        this.session1.start();
        BridgeImpl bridgeImpl = (Bridge) this.server0.getClusterManager().getBridges().get("bridge1");
        RemotingConnection forwardingConnection = getForwardingConnection(bridgeImpl);
        InVMConnector.failOnCreateConnection = true;
        InVMConnector.numberOfFailures = this.reconnectAttempts - 1;
        forwardingConnection.fail(new ActiveMQNotConnectedException());
        SimpleString simpleString = new SimpleString("propkey");
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(false);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
        }
        int i2 = -1;
        int i3 = -1;
        for (int i4 = 0; i4 < 100; i4++) {
            ClientMessage receive = createConsumer.receive(1500L);
            assertNotNull(receive);
            if (i2 == -1 && i4 != receive.getIntProperty(simpleString).intValue()) {
                i2 = receive.getIntProperty(simpleString).intValue();
                i3 = i4;
            }
        }
        if (i2 != -1) {
            fail("Message " + i2 + " was received out of order, it was supposed to be " + i3);
        }
        log.debug("=========== second failure, sending message");
        RemotingConnection forwardingConnection2 = bridgeImpl.getForwardingConnection();
        InVMConnector.failOnCreateConnection = true;
        InVMConnector.numberOfFailures = this.reconnectAttempts - 1;
        forwardingConnection2.fail(new ActiveMQException(ActiveMQExceptionType.UNBLOCKED));
        for (int i5 = 0; i5 < 100; i5++) {
            ClientMessage createMessage2 = this.session0.createMessage(false);
            createMessage2.putIntProperty(simpleString, i5);
            createProducer.send(createMessage2);
        }
        for (int i6 = 0; i6 < 100; i6++) {
            ClientMessage receive2 = createConsumer.receive(1500L);
            assertNotNull("Didn't receive message", receive2);
            if (i2 == -1 && i6 != receive2.getIntProperty(simpleString).intValue()) {
                i2 = receive2.getIntProperty(simpleString).intValue();
                i3 = i6;
            }
        }
        if (i2 != -1) {
            fail("Message " + i2 + " was received out of order, it was supposed to be " + i3);
        }
        closeServers();
        assertNoMoreConnections();
    }

    @Test
    public void testDeliveringCountOnBridgeConnectionFailure() throws Exception {
        this.server0 = createActiveMQServer(0, isNetty(), this.server0Params);
        TransportConfiguration transportConfiguration = new TransportConfiguration(getConnector(), this.server0Params, "server0tc");
        this.server0.getConfiguration().setConnectorConfigurations(this.connectors);
        BridgeConfiguration createBridgeConfig = createBridgeConfig();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createBridgeConfig);
        this.server0.getConfiguration().setBridgeConfigurations(arrayList);
        QueueConfiguration address = new QueueConfiguration("queue0").setAddress("testAddress");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(address);
        this.server0.getConfiguration().setQueueConfigs(arrayList2);
        QueueConfiguration address2 = new QueueConfiguration("queue0").setAddress("forwardAddress");
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(address2);
        this.server1.getConfiguration().setQueueConfigs(arrayList3);
        startServers();
        this.locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(new TransportConfiguration[]{transportConfiguration, this.server1tc}));
        this.session0 = this.locator.createSessionFactory(transportConfiguration).createSession(false, true, true);
        this.session1 = this.locator.createSessionFactory(this.server1tc).createSession(false, true, true);
        ClientProducer createProducer = this.session0.createProducer("testAddress");
        this.session1.start();
        RemotingConnection forwardingConnection = getForwardingConnection((Bridge) this.server0.getClusterManager().getBridges().get("bridge1"));
        InVMConnector.failOnCreateConnection = true;
        InVMConnector.numberOfFailures = this.reconnectAttempts - 1;
        SimpleString simpleString = new SimpleString("propkey");
        Queue bindable = this.server0.getPostOffice().getBinding(new SimpleString("queue0")).getBindable();
        for (int i = 0; i < 100; i++) {
            ClientMessage createMessage = this.session0.createMessage(false);
            createMessage.putIntProperty(simpleString, i);
            createProducer.send(createMessage);
            if (i == 50) {
                forwardingConnection.fail(new ActiveMQException(ActiveMQExceptionType.UNBLOCKED));
            }
        }
        bindable.getClass();
        Wait.assertEquals(0, bindable::getDeliveringCount);
        closeServers();
        assertNoMoreConnections();
    }

    private void startServers() throws Exception {
        if (this.server2 != null) {
            this.server2.start();
        }
        this.server1.start();
        this.server0.start();
    }

    private RemotingConnection getForwardingConnection(Bridge bridge) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        do {
            RemotingConnection forwardingConnection = ((BridgeImpl) bridge).getForwardingConnection();
            if (forwardingConnection != null) {
                return forwardingConnection;
            }
            Thread.sleep(10L);
        } while (System.currentTimeMillis() - currentTimeMillis < 50000);
        throw new IllegalStateException("Failed to get forwarding connection");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.cluster.bridge.BridgeTestBase
    public ActiveMQServer createActiveMQServer(int i, Map<String, Object> map, boolean z, NodeManager nodeManager) throws Exception {
        ActiveMQServer createActiveMQServer = super.createActiveMQServer(i, map, z, nodeManager);
        createActiveMQServer.getConfiguration().setPersistIDCache(this.persistCache);
        return createActiveMQServer;
    }
}
