package org.apache.activemq.transport.failover;

import java.net.URI;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkTestSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverConsumerTest.class */
public class FailoverConsumerTest extends NetworkTestSupport {
    public static final int MSG_COUNT = 100;
    private static final Log LOG = LogFactory.getLog(FailoverConsumerTest.class);

    public void testPublisherFailsOver() throws Exception {
        Message receive;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(new URI("failover://tcp://localhost:61616"));
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setQueuePrefetch(90);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue("Test"));
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("Test"));
        }
        createProducer.close();
        createSession.close();
        Session createSession2 = createConnection.createSession(false, 2);
        MessageConsumer createConsumer = createSession2.createConsumer(new ActiveMQQueue("Test"));
        createConnection.start();
        Message receive2 = createConsumer.receive(3000L);
        LOG.info("You should restart remote broker now and press enter!");
        System.in.read();
        restartRemoteBroker();
        receive2.acknowledge();
        int i2 = 0 + 1;
        int i3 = 1;
        while (true) {
            if (i3 >= 100) {
                break;
            }
            Message receive3 = createConsumer.receive(3000L);
            if (receive3 == null) {
                LOG.error("No messages received! Received:" + i2);
                break;
            } else {
                receive3.acknowledge();
                i2++;
                i3++;
            }
        }
        assertEquals(i2, 100);
        createConsumer.close();
        createSession2.close();
        createConnection.close();
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        Session createSession3 = createConnection2.createSession(false, 2);
        MessageConsumer createConsumer2 = createSession3.createConsumer(new ActiveMQQueue("Test"));
        createConnection2.start();
        int i4 = 0;
        do {
            receive = createConsumer2.receive(1000L);
            if (receive != null) {
                receive.acknowledge();
                i4++;
            }
        } while (receive != null);
        assertEquals(i4, 0);
        createConsumer2.close();
        createSession3.close();
        createConnection2.close();
    }

    @Override // org.apache.activemq.network.NetworkTestSupport
    protected String getRemoteURI() {
        return "tcp://localhost:55555";
    }
}
