package org.apache.activemq.usecases;

import java.net.URI;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;

/* loaded from: input_file:org/apache/activemq/usecases/VirtualTopicNetworkClusterReactivationTest.class */
public class VirtualTopicNetworkClusterReactivationTest extends JmsMultipleBrokersTestSupport {
    private static final String BROKER_A = "brokerA";
    private static final String BROKER_B = "brokerB";
    private static final String BROKER_A_TRANSPORT_URL = "tcp://localhost:61616";
    private static final String BROKER_B_TRANSPORT_URL = "tcp://localhost:61617";
    private static final long DEFAULT_SLEEP_MS = 1000;
    private ActiveMQTopic topic = new ActiveMQTopic("VirtualTopic.FOO.TEST");
    private ActiveMQQueue queue = new ActiveMQQueue("Consumer.FOO.VirtualTopic.FOO.TEST");

    public void testDurableSubReconnectFromAtoB() throws JMSException {
        Connection createConnection = new ActiveMQConnectionFactory("tcp://localhost:61617?jms.prefetchPolicy.queuePrefetch=0").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.queue);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        produce(createSession2.createProducer(this.topic), createSession2, 5);
        sleep();
        createConsumer.close();
        createSession.close();
        createConnection.close();
        sleep();
        Connection createConnection3 = activeMQConnectionFactory.createConnection();
        createConnection3.start();
        MessageConsumer createConsumer2 = createConnection3.createSession(false, 1).createConsumer(this.queue);
        sleep();
        consume(createConsumer2, 5);
    }

    private void consume(MessageConsumer messageConsumer, int i) throws JMSException {
        for (int i2 = 0; i2 < i; i2++) {
            TextMessage receive = messageConsumer.receive(DEFAULT_SLEEP_MS);
            assertNotNull(receive);
            TextMessage textMessage = receive;
            System.out.println("received: " + textMessage.getText());
            assertEquals("message: " + i2, textMessage.getText());
        }
    }

    private void produce(MessageProducer messageProducer, Session session, int i) throws JMSException {
        for (int i2 = 0; i2 < i; i2++) {
            messageProducer.send(session.createTextMessage("message: " + i2));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        maxSetupTime = 1000;
        super.setAutoFail(true);
        super.setUp();
        BrokerService createBroker = createBroker(new URI(String.format("broker:(%s)/%s%s", "tcp://localhost:61616", BROKER_A, "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true")));
        createBroker.setDestinationPolicy(buildPolicyMap());
        createBroker.setDestinations(new ActiveMQDestination[]{this.queue});
        BrokerService createBroker2 = createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_B_TRANSPORT_URL, BROKER_B, "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true")));
        createBroker2.setDestinationPolicy(buildPolicyMap());
        createBroker2.setDestinations(new ActiveMQDestination[]{this.queue});
        bridgeBrokers(BROKER_A, BROKER_B);
        bridgeBrokers(BROKER_B, BROKER_A);
        startAllBrokers();
    }

    private PolicyMap buildPolicyMap() {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policyEntry.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        policyEntry.setEnableAudit(false);
        policyMap.put(new ActiveMQQueue("Consumer.*.VirtualTopic.>"), policyEntry);
        return policyMap;
    }

    private void sleep() {
        try {
            Thread.sleep(DEFAULT_SLEEP_MS);
        } catch (InterruptedException e) {
        }
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }
}
