package org.apache.activemq.usecases;

import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DuplexAdvisoryRaceTest.class */
public class DuplexAdvisoryRaceTest {
    private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class);
    private static String hostName;
    final AtomicLong responseReceived = new AtomicLong(0);
    BrokerService brokerA;
    BrokerService brokerB;
    String networkConnectorUrlString;

    @BeforeClass
    public static void initIp() throws Exception {
        hostName = InetAddress.getLocalHost().getHostAddress();
    }

    @Before
    public void createBrokers() throws Exception {
        this.networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort();
        this.brokerA = newBroker("A");
        this.brokerB = newBroker("B");
        this.responseReceived.set(0L);
    }

    @After
    public void stopBrokers() throws Exception {
        this.brokerA.stop();
        this.brokerB.stop();
    }

    public void repeatTestHang() throws Exception {
        for (int i = 0; i < 10; i++) {
            testHang();
            stopBrokers();
            createBrokers();
        }
    }

    @Test
    public void testHang() throws Exception {
        this.brokerA.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.usecases.DuplexAdvisoryRaceTest.1
            public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
                Subscription addConsumer = super.addConsumer(connectionContext, consumerInfo);
                if (connectionContext.isNetworkConnection()) {
                    TimeUnit.MILLISECONDS.sleep(200L);
                }
                return addConsumer;
            }
        }});
        NetworkConnector bridgeBrokers = bridgeBrokers(this.brokerA, this.brokerB);
        this.brokerA.start();
        this.brokerB.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(this.brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        populate(activeMQConnectionFactory, 0, 200, 50);
        populate(activeMQConnectionFactory2, 200, DurableSubProcessWithRestartTest.CARGO_SIZE, 50);
        LinkedList linkedList = new LinkedList();
        linkedList.add(demand(activeMQConnectionFactory2, 0, 200, 5));
        linkedList.add(demand(activeMQConnectionFactory, 200, DurableSubProcessWithRestartTest.CARGO_SIZE, 5));
        LOG.info("Allow duplex bridge to connect....");
        this.brokerB.startTransportConnector(this.brokerB.addConnector(this.networkConnectorUrlString + "?transport.socketBufferSize=1024"));
        if (!Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DuplexAdvisoryRaceTest.2
            public boolean isSatisified() throws Exception {
                DuplexAdvisoryRaceTest.LOG.info("received: " + DuplexAdvisoryRaceTest.this.responseReceived.get());
                return DuplexAdvisoryRaceTest.this.responseReceived.get() >= 20000;
            }
        }, 600000L)) {
            org.apache.activemq.TestSupport.dumpAllThreads("DD");
            for (DemandForwardingBridge demandForwardingBridge : bridgeBrokers.activeBridges()) {
                if (demandForwardingBridge instanceof DemandForwardingBridge) {
                    ((Socket) demandForwardingBridge.getRemoteBroker().narrow(Socket.class)).close();
                }
            }
        }
        bridgeBrokers.stop();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            try {
                ((Connection) it.next()).close();
            } catch (Exception e) {
            }
        }
        Assert.assertTrue("received all sent: " + this.responseReceived.get(), this.responseReceived.get() >= 20000);
    }

    private void populate(ActiveMQConnectionFactory activeMQConnectionFactory, int i, int i2, int i3) throws JMSException {
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        MessageProducer createProducer = createSession.createProducer((Destination) null);
        for (int i4 = i; i4 < i2; i4++) {
            Destination qFromInt = qFromInt(i4);
            for (int i5 = 0; i5 < i3; i5++) {
                createProducer.send(qFromInt, createBytesMessage);
            }
        }
        createConnection.close();
    }

    private Connection demand(ActiveMQConnectionFactory activeMQConnectionFactory, int i, int i2, int i3) throws Exception {
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        for (int i4 = i; i4 < i2; i4++) {
            Destination qFromInt = qFromInt(i4);
            for (int i5 = 0; i5 < i3; i5++) {
                createSession.createConsumer(qFromInt).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.DuplexAdvisoryRaceTest.3
                    public void onMessage(Message message) {
                        DuplexAdvisoryRaceTest.this.responseReceived.incrementAndGet();
                    }
                });
            }
        }
        createConnection.start();
        return createConnection;
    }

    private Destination qFromInt(int i) {
        StringBuilder sb = new StringBuilder();
        String format = String.format("%03d", Integer.valueOf(i));
        for (int i2 = 0; i2 < 3; i2++) {
            sb.append(format.charAt(i2));
            if (i2 < 2) {
                sb.append('.');
            }
        }
        return new ActiveMQQueue("Test." + sb.toString());
    }

    private BrokerService newBroker(String str) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setBrokerName(str);
        brokerService.addConnector("tcp://" + hostName + ":0?transport.socketBufferSize=1024");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        return brokerService;
    }

    protected NetworkConnector bridgeBrokers(BrokerService brokerService, BrokerService brokerService2) throws Exception {
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(failover:(" + this.networkConnectorUrlString + "?socketBufferSize=1024&trace=false)?maxReconnectAttempts=0)"));
        discoveryNetworkConnector.setName(brokerService.getBrokerName() + "-to-" + brokerService2.getBrokerName());
        discoveryNetworkConnector.setDuplex(true);
        brokerService.addNetworkConnector(discoveryNetworkConnector);
        return discoveryNetworkConnector;
    }
}
