package org.apache.activemq.usecases;

import java.net.URI;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.class */
public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport {
    protected static final int MESSAGE_COUNT = 100;
    protected static final int PREFETCH_COUNT = 1;
    private static final Log LOG = LogFactory.getLog(TwoBrokerQueueClientsReconnectTest.class);
    protected int msgsClient1;
    protected int msgsClient2;
    protected String broker1;
    protected String broker2;

    public void testClientAReceivesOnly() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        doOneClientReceivesOnly();
    }

    public void testClientBReceivesOnly() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        doOneClientReceivesOnly();
    }

    public void doOneClientReceivesOnly() throws Exception {
        bridgeBrokers(this.broker1, this.broker2);
        bridgeBrokers(this.broker2, this.broker1);
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer(this.broker1, createDestination);
        MessageConsumer createConsumer2 = createConsumer(this.broker2, createDestination);
        Thread.sleep(500L);
        sendMessages("BrokerA", createDestination, 100);
        createConsumer2.close();
        this.msgsClient1 += receiveAllMessages(createConsumer);
        createConsumer.close();
        assertEquals("Client for " + this.broker1 + " should have receive all messages.", 100, this.msgsClient1);
    }

    public void testClientAReceivesOnlyAfterReconnect() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        doOneClientReceivesOnlyAfterReconnect();
    }

    public void testClientBReceivesOnlyAfterReconnect() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        doOneClientReceivesOnlyAfterReconnect();
    }

    public void doOneClientReceivesOnlyAfterReconnect() throws Exception {
        bridgeBrokers(this.broker1, this.broker2);
        bridgeBrokers(this.broker2, this.broker1);
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer(this.broker1, createDestination);
        MessageConsumer createConsumer2 = createConsumer(this.broker2, createDestination);
        Thread.sleep(500L);
        sendMessages("BrokerA", createDestination, 100);
        this.msgsClient1 += receiveExactMessages(createConsumer, 20);
        createConsumer.close();
        MessageConsumer createConsumer3 = createConsumer(this.broker1, createDestination);
        Thread.sleep(500L);
        createConsumer2.close();
        this.msgsClient1 += receiveAllMessages(createConsumer3);
        createConsumer3.close();
        assertEquals("Client for " + this.broker1 + " should have received all messages.", 100, this.msgsClient1);
    }

    public void testTwoClientsReceiveClientADisconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        doTwoClientsReceiveOneClientDisconnects();
    }

    public void testTwoClientsReceiveClientBDisconnects() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        doTwoClientsReceiveOneClientDisconnects();
    }

    public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
        bridgeBrokers(this.broker1, this.broker2);
        bridgeBrokers(this.broker2, this.broker1);
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer(this.broker1, createDestination);
        MessageConsumer createConsumer2 = createConsumer(this.broker2, createDestination);
        Thread.sleep(500L);
        sendMessages("BrokerA", createDestination, 100);
        this.msgsClient1 += receiveExactMessages(createConsumer, 20);
        this.msgsClient2 += receiveExactMessages(createConsumer2, 20);
        createConsumer.close();
        this.msgsClient2 += receiveAllMessages(createConsumer2);
        createConsumer2.close();
        assertEquals("Client for " + this.broker1 + " should have received 20% of the messages.", 20, this.msgsClient1);
        assertEquals("Client for " + this.broker2 + " should have received 80% of the messages.", 80, this.msgsClient2);
    }

    public void testTwoClientsReceiveClientAReconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        doTwoClientsReceiveOneClientReconnects();
    }

    public void testTwoClientsReceiveClientBReconnects() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        doTwoClientsReceiveOneClientReconnects();
    }

    public void doTwoClientsReceiveOneClientReconnects() throws Exception {
        bridgeBrokers(this.broker1, this.broker2);
        bridgeBrokers(this.broker2, this.broker1);
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer(this.broker1, createDestination);
        MessageConsumer createConsumer2 = createConsumer(this.broker2, createDestination);
        Thread.sleep(500L);
        sendMessages("BrokerA", createDestination, 100);
        this.msgsClient1 += receiveExactMessages(createConsumer, 20);
        this.msgsClient2 += receiveExactMessages(createConsumer2, 20);
        createConsumer.close();
        this.msgsClient2 += receiveExactMessages(createConsumer2, 20);
        MessageConsumer createConsumer3 = createConsumer(this.broker1, createDestination);
        Thread.sleep(500L);
        this.msgsClient1 += receiveExactMessages(createConsumer3, 20);
        createConsumer3.close();
        this.msgsClient2 += receiveExactMessages(createConsumer2, 20);
        createConsumer2.close();
        assertEquals("Client for " + this.broker1 + " should have received 40% of the messages.", 40, this.msgsClient1);
        assertEquals("Client for " + this.broker2 + " should have received 60% of the messages.", 60, this.msgsClient2);
    }

    public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        bridgeBrokers(this.broker1, this.broker2);
        bridgeBrokers(this.broker2, this.broker1);
        startAllBrokers();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer(this.broker1, createDestination);
        MessageConsumer createConsumer2 = createConsumer(this.broker2, createDestination);
        Thread.sleep(500L);
        sendMessages("BrokerA", createDestination, 100);
        this.msgsClient1 += receiveExactMessages(createConsumer, 20);
        this.msgsClient2 += receiveExactMessages(createConsumer2, 20);
        createConsumer.close();
        createConsumer2.close();
        MessageConsumer createConsumer3 = createConsumer(this.broker1, createDestination);
        MessageConsumer createConsumer4 = createConsumer(this.broker2, createDestination);
        Thread.sleep(500L);
        this.msgsClient1 += receiveExactMessages(createConsumer3, 30);
        createConsumer3.close();
        this.msgsClient2 += receiveExactMessages(createConsumer4, 30);
        createConsumer4.close();
        assertEquals("Client for " + this.broker1 + " should have received 50% of the messages.", 50, this.msgsClient1);
        assertEquals("Client for " + this.broker2 + " should have received 50% of the messages.", 50, this.msgsClient2);
    }

    protected int receiveExactMessages(MessageConsumer messageConsumer, int i) throws Exception {
        int i2 = 0;
        while (true) {
            if (i2 >= i) {
                break;
            }
            if (messageConsumer.receive(1000L) == null) {
                LOG.error("Consumer failed to receive exactly " + i + " messages. Actual messages received is: " + i2);
                break;
            }
            i2++;
        }
        return i2;
    }

    protected int receiveAllMessages(MessageConsumer messageConsumer) throws Exception {
        Message receive;
        int i = 0;
        do {
            receive = messageConsumer.receive(1000L);
            if (receive != null) {
                i++;
            }
        } while (receive != null);
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public MessageConsumer createConsumer(String str, Destination destination) throws Exception {
        Connection createConnection = createConnection(str);
        createConnection.start();
        return createConnection.createSession(false, 1).createConsumer(destination);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        ActiveMQConnectionFactory connectionFactory = getConnectionFactory("BrokerA");
        ActiveMQConnectionFactory connectionFactory2 = getConnectionFactory("BrokerB");
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(1);
        connectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        connectionFactory2.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.msgsClient1 = 0;
        this.msgsClient2 = 0;
    }
}
