package org.apache.activemq.transport.fanout;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.mock.MockTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.class */
public class FanoutTransportBrokerTest extends NetworkTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransportBrokerTest.class);
    public ActiveMQDestination destination;
    public int deliveryMode;

    public static Test suite() {
        return suite(FanoutTransportBrokerTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }

    public void initCombosForTestPublisherFansout() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST"), new ActiveMQQueue("TEST")});
    }

    public void testPublisherFansout() throws Exception {
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        ConsumerInfo createConsumerInfo = createConsumerInfo(createSessionInfo, this.destination);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        createConnection.request(createConsumerInfo);
        StubConnection createRemoteConnection = createRemoteConnection();
        ConnectionInfo createConnectionInfo2 = createConnectionInfo();
        SessionInfo createSessionInfo2 = createSessionInfo(createConnectionInfo2);
        ConsumerInfo createConsumerInfo2 = createConsumerInfo(createSessionInfo2, this.destination);
        createRemoteConnection.send(createConnectionInfo2);
        createRemoteConnection.send(createSessionInfo2);
        createRemoteConnection.request(createConsumerInfo2);
        LOG.info("Starting the fanout connection.");
        StubConnection createFanoutConnection = createFanoutConnection();
        ConnectionInfo createConnectionInfo3 = createConnectionInfo();
        SessionInfo createSessionInfo3 = createSessionInfo(createConnectionInfo3);
        ProducerInfo createProducerInfo = createProducerInfo(createSessionInfo3);
        createFanoutConnection.send(createConnectionInfo3);
        createFanoutConnection.send(createSessionInfo3);
        createFanoutConnection.send(createProducerInfo);
        createFanoutConnection.request(createMessage(createProducerInfo, this.destination, this.deliveryMode));
        assertNotNull(receiveMessage(createConnection));
        assertNoMessagesLeft(createConnection);
        assertNotNull(receiveMessage(createRemoteConnection));
        assertNoMessagesLeft(createRemoteConnection);
    }

    public void initCombosForTestPublisherWaitsForServerToBeUp() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST")});
    }

    /* JADX WARN: Type inference failed for: r0v50, types: [org.apache.activemq.transport.fanout.FanoutTransportBrokerTest$2] */
    public void testPublisherWaitsForServerToBeUp() throws Exception {
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        ConsumerInfo createConsumerInfo = createConsumerInfo(createSessionInfo, this.destination);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        createConnection.request(createConsumerInfo);
        StubConnection createRemoteConnection = createRemoteConnection();
        ConnectionInfo createConnectionInfo2 = createConnectionInfo();
        SessionInfo createSessionInfo2 = createSessionInfo(createConnectionInfo2);
        ConsumerInfo createConsumerInfo2 = createConsumerInfo(createSessionInfo2, this.destination);
        createRemoteConnection.send(createConnectionInfo2);
        createRemoteConnection.send(createSessionInfo2);
        createRemoteConnection.request(createConsumerInfo2);
        LOG.info("Starting the fanout connection.");
        final StubConnection createFanoutConnection = createFanoutConnection();
        ConnectionInfo createConnectionInfo3 = createConnectionInfo();
        SessionInfo createSessionInfo3 = createSessionInfo(createConnectionInfo3);
        final ProducerInfo createProducerInfo = createProducerInfo(createSessionInfo3);
        createFanoutConnection.send(createConnectionInfo3);
        createFanoutConnection.send(createSessionInfo3);
        createFanoutConnection.send(createProducerInfo);
        createFanoutConnection.request(createMessage(createProducerInfo, this.destination, this.deliveryMode));
        assertNotNull(receiveMessage(createConnection));
        assertNoMessagesLeft(createConnection);
        assertNotNull(receiveMessage(createRemoteConnection));
        assertNoMessagesLeft(createRemoteConnection);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MockTransport mockTransport = (MockTransport) createFanoutConnection.getTransport().narrow(MockTransport.class);
        mockTransport.install(new TransportFilter(mockTransport.getNext()) { // from class: org.apache.activemq.transport.fanout.FanoutTransportBrokerTest.1
            public void oneway(Object obj) throws IOException {
                FanoutTransportBrokerTest.LOG.info("Dropping: " + obj);
            }
        });
        new Thread() { // from class: org.apache.activemq.transport.fanout.FanoutTransportBrokerTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createFanoutConnection.request(FanoutTransportBrokerTest.this.createMessage(createProducerInfo, FanoutTransportBrokerTest.this.destination, FanoutTransportBrokerTest.this.deliveryMode));
                } catch (Throwable th) {
                    th.printStackTrace();
                }
                countDownLatch.countDown();
            }
        }.start();
        assertFalse(countDownLatch.await(3L, TimeUnit.SECONDS));
        LOG.info("Restarting Broker");
        restartRemoteBroker();
        LOG.info("Broker Restarted");
        assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
    }

    @Override // org.apache.activemq.network.NetworkTestSupport
    protected String getLocalURI() {
        return NetworkedSyncTest.broker1URL;
    }

    @Override // org.apache.activemq.network.NetworkTestSupport
    protected String getRemoteURI() {
        return "tcp://localhost:61617";
    }

    protected StubConnection createFanoutConnection() throws Exception {
        StubConnection stubConnection = new StubConnection(TransportFactory.connect(new URI("fanout://(static://(" + this.connector.getServer().getConnectURI() + ",mock://" + this.remoteConnector.getServer().getConnectURI() + "))?fanOutQueues=true")));
        this.connections.add(stubConnection);
        return stubConnection;
    }
}
