package org.apache.activemq.usecases;

import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.transport.vm.VMTransportServer;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.class */
public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(BrokerQueueNetworkWithDisconnectTest.class);
    private static final int NETWORK_DOWN_TIME = 5000;
    protected static final int MESSAGE_COUNT = 200;
    private static final String HUB = "HubBroker";
    private static final String SPOKE = "SpokeBroker";
    private SocketProxy socketProxy;
    private long networkDownTimeStart;
    public boolean simulateStalledNetwork;
    public boolean useDuplexNetworkBridge = true;
    private long inactiveDuration = 1000;
    private boolean useSocketProxy = true;

    public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
        addCombinationValues("useDuplexNetworkBridge", new Object[]{Boolean.TRUE, Boolean.FALSE});
        addCombinationValues("simulateStalledNetwork", new Object[]{Boolean.TRUE});
    }

    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
        bridgeBrokers(SPOKE, HUB);
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer(HUB, createDestination);
        sleep(600);
        sendMessages(SPOKE, createDestination, MESSAGE_COUNT);
        MessageIdList consumerMessages = getConsumerMessages(HUB, createConsumer);
        consumerMessages.waitForMessagesToArrive(MESSAGE_COUNT);
        assertTrue("At least message 200 must be recieved, duplicates are expected, count=" + consumerMessages.getMessageCount(), MESSAGE_COUNT <= consumerMessages.getMessageCount());
    }

    public void testNoStuckConnectionsWithTransportDisconnect() throws Exception {
        this.inactiveDuration = 60000L;
        this.useDuplexNetworkBridge = true;
        bridgeBrokers(SPOKE, HUB);
        final JmsMultipleBrokersTestSupport.BrokerItem brokerItem = this.brokers.get(HUB);
        brokerItem.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.usecases.BrokerQueueNetworkWithDisconnectTest.1
            int sleepCount = 2;

            public void removeConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo, Throwable th) throws Exception {
                while (true) {
                    try {
                        int i = this.sleepCount - 1;
                        this.sleepCount = i;
                        if (i < 0) {
                            break;
                        }
                        BrokerQueueNetworkWithDisconnectTest.LOG.info("sleeping for a bit in close impl to simulate load where reconnect fails due to a pending close");
                        TimeUnit.SECONDS.sleep(2L);
                    } catch (Exception e) {
                    }
                }
                super.removeConnection(connectionContext, connectionInfo, th);
            }
        }});
        startAllBrokers();
        waitForBridgeFormation();
        for (int i = 0; i < 3; i++) {
            this.socketProxy.halfClose();
            sleep(NetworkedSyncTest.MESSAGE_COUNT);
        }
        boolean waitFor = Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.BrokerQueueNetworkWithDisconnectTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                long size = ((TransportConnector) brokerItem.broker.getTransportConnectors().get(0)).getConnections().size();
                BrokerQueueNetworkWithDisconnectTest.LOG.info("Num connetions:" + size);
                return size == 1;
            }
        });
        if (!waitFor) {
            dumpAllThreads("ExtraHubConnection");
        }
        assertTrue("should be only one transport connection for the single duplex network connector", waitFor);
        boolean waitFor2 = Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.BrokerQueueNetworkWithDisconnectTest.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                long connectionCount = ((VMTransportServer) VMTransportFactory.SERVERS.get(BrokerQueueNetworkWithDisconnectTest.HUB)).getConnectionCount();
                BrokerQueueNetworkWithDisconnectTest.LOG.info("Num VM connetions:" + connectionCount);
                return connectionCount == 1;
            }
        });
        if (!waitFor2) {
            dumpAllThreads("ExtraHubVMConnection");
        }
        assertTrue("should be only one vm connection for the single network duplex network connector", waitFor2);
    }

    public void testTwoDuplexNCsAreAllowed() throws Exception {
        this.useDuplexNetworkBridge = true;
        this.useSocketProxy = false;
        bridgeBrokers(SPOKE, HUB).setName("FirstDuplex");
        bridgeBrokers(SPOKE, HUB).setName("SecondDuplex");
        startAllBrokers();
        waitForBridgeFormation();
        assertEquals("Has two transport Connectors", 2, ((TransportConnector) this.brokers.get(HUB).broker.getTransportConnectors().get(0)).getConnections().size());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void startAllBrokers() throws Exception {
        this.brokers.get(HUB).broker.start();
        this.brokers.get(SPOKE).broker.start();
        sleep(600);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.networkDownTimeStart = 0L;
        this.inactiveDuration = 1000L;
        this.useSocketProxy = true;
        super.setAutoFail(true);
        super.setUp();
        createBroker(new URI("broker:(tcp://localhost:61617)/HubBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
        createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
        if (this.socketProxy != null) {
            this.socketProxy.close();
        }
    }

    public static Test suite() {
        return suite(BrokerQueueNetworkWithDisconnectTest.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void onSend(int i, TextMessage textMessage) {
        sleep(50);
        if (i == 50 || i == 150) {
            if (this.simulateStalledNetwork) {
                this.socketProxy.pause();
            } else {
                this.socketProxy.close();
            }
            this.networkDownTimeStart = System.currentTimeMillis();
        } else if (this.networkDownTimeStart > 0) {
            if (this.networkDownTimeStart + 5000 < System.currentTimeMillis()) {
                if (this.simulateStalledNetwork) {
                    this.socketProxy.goOn();
                } else {
                    this.socketProxy.reopen();
                }
                this.networkDownTimeStart = 0L;
            } else {
                sleep(500);
            }
        }
        super.onSend(i, textMessage);
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public NetworkConnector bridgeBrokers(BrokerService brokerService, BrokerService brokerService2, boolean z, int i, boolean z2, boolean z3) throws Exception {
        List transportConnectors = brokerService2.getTransportConnectors();
        if (transportConnectors.isEmpty()) {
            throw new Exception("Remote broker has no registered connectors.");
        }
        URI connectUri = ((TransportConnector) transportConnectors.get(0)).getConnectUri();
        if (this.useSocketProxy) {
            this.socketProxy = new SocketProxy(connectUri);
            connectUri = this.socketProxy.getUrl();
        }
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + connectUri + "?wireFormat.maxInactivityDuration=" + this.inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + this.inactiveDuration + ")?useExponentialBackOff=false"));
        discoveryNetworkConnector.setDynamicOnly(z);
        discoveryNetworkConnector.setNetworkTTL(i);
        brokerService.addNetworkConnector(discoveryNetworkConnector);
        maxSetupTime = 2000;
        if (this.useDuplexNetworkBridge) {
            discoveryNetworkConnector.setDuplex(true);
        }
        return discoveryNetworkConnector;
    }
}
