package org.apache.activemq.transport.failover;

import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.class */
public class FailoverConsumerUnconsumedTest {
    private static final String QUEUE_NAME = "FailoverWithUnconsumed";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    final int prefetch = 10;
    BrokerService broker;
    private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class);
    static long idGen = 100;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest$TestConsumer.class */
    public class TestConsumer extends ActiveMQMessageConsumer {
        TestConsumer(Session session, Destination destination, ActiveMQConnection activeMQConnection) throws Exception {
            super((ActiveMQSession) session, new ConsumerId(new SessionId(activeMQConnection.getConnectionInfo().getConnectionId(), 1L), FailoverConsumerUnconsumedTest.access$100()), ActiveMQMessageTransformation.transformDestination(destination), (String) null, "", 10, -1, false, false, true, (MessageListener) null);
        }

        public int unconsumedSize() {
            return this.unconsumedMessages.size();
        }
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    @Test
    public void testFailoverConsumerDups() throws Exception {
        doTestFailoverConsumerDups(true);
    }

    @Test
    public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception {
        doTestFailoverConsumerDups(false);
    }

    public void doTestFailoverConsumerDups(final boolean z) throws Exception {
        this.broker = createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.1
            int consumerCount;

            public Subscription addConsumer(ConnectionContext connectionContext, final ConsumerInfo consumerInfo) throws Exception {
                int i = this.consumerCount + 1;
                this.consumerCount = i;
                if (i == 4 + (z ? 1 : 0)) {
                    connectionContext.setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            FailoverConsumerUnconsumedTest.LOG.info("Stopping broker on consumer: " + consumerInfo.getConsumerId());
                            try {
                                FailoverConsumerUnconsumedTest.this.broker.stop();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
                return super.addConsumer(connectionContext, consumerInfo);
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setWatchTopicAdvisories(z);
        final ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue("FailoverWithUnconsumed?jms.consumer.prefetch=10");
        final Vector vector = new Vector();
        for (int i = 0; i < 3; i++) {
            vector.add(new TestConsumer(createSession, createQueue, createConnection));
        }
        produceMessage(createSession, createQueue, 40L);
        Assert.assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.2
            public boolean isSatisified() throws Exception {
                int i2 = 0;
                Iterator it = vector.iterator();
                while (it.hasNext()) {
                    TestConsumer testConsumer = (TestConsumer) it.next();
                    long unconsumedSize = testConsumer.unconsumedSize();
                    FailoverConsumerUnconsumedTest.LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumedSize);
                    i2 = (int) (i2 + unconsumedSize);
                }
                return i2 == 30;
            }
        }));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    FailoverConsumerUnconsumedTest.LOG.info("add last consumer...");
                    vector.add(new TestConsumer(createSession, createQueue, createConnection));
                    countDownLatch.countDown();
                    FailoverConsumerUnconsumedTest.LOG.info("done add last consumer");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        this.broker.waitUntilStopped();
        Assert.assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.4
            public boolean isSatisified() throws Exception {
                int i2 = 0;
                Iterator it = vector.iterator();
                while (it.hasNext()) {
                    TestConsumer testConsumer = (TestConsumer) it.next();
                    long unconsumedSize = testConsumer.unconsumedSize();
                    FailoverConsumerUnconsumedTest.LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumedSize);
                    i2 = (int) (i2 + unconsumedSize);
                }
                return i2 == 0;
            }
        }));
        this.broker = createBroker(false, this.url);
        this.broker.start();
        Assert.assertTrue("consumer added through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.5
            public boolean isSatisified() throws Exception {
                int i2 = 0;
                Iterator it = vector.iterator();
                while (it.hasNext()) {
                    TestConsumer testConsumer = (TestConsumer) it.next();
                    long unconsumedSize = testConsumer.unconsumedSize();
                    FailoverConsumerUnconsumedTest.LOG.info(testConsumer.getConsumerId() + " after restart: unconsumed: " + unconsumedSize);
                    i2 = (int) (i2 + unconsumedSize);
                }
                return i2 == 40;
            }
        }));
        createConnection.close();
    }

    private void produceMessage(Session session, Queue queue, long j) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        for (int i = 0; i < j; i++) {
            createProducer.send(session.createTextMessage("Test message " + i));
        }
        createProducer.close();
    }

    private static long nextGen() {
        idGen -= 5;
        return idGen;
    }

    static /* synthetic */ long access$100() {
        return nextGen();
    }
}
