package org.apache.activemq.broker.region.cursors;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/CursorSupport.class */
public abstract class CursorSupport extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(CursorSupport.class);
    protected BrokerService broker;
    public int MESSAGE_COUNT = 500;
    public int PREFETCH_SIZE = 50;
    protected String bindAddress = "tcp://localhost:60706";

    protected abstract Destination getDestination(Session session) throws JMSException;

    protected abstract MessageConsumer getConsumer(Connection connection) throws Exception;

    protected abstract void configureBroker(BrokerService brokerService) throws Exception;

    public void testSendFirstThenConsume() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        Connection consumerConnection = getConsumerConnection(createConnectionFactory);
        getConsumer(consumerConnection);
        consumerConnection.close();
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(getDestination(createSession));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.MESSAGE_COUNT; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("test" + i);
            arrayList.add(createTextMessage);
            createProducer.send(createTextMessage);
        }
        createConnection.close();
        Connection consumerConnection2 = getConsumerConnection(createConnectionFactory);
        MessageConsumer consumer = getConsumer(consumerConnection2);
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < this.MESSAGE_COUNT; i2++) {
            Message receive = consumer.receive(5000L);
            assertNotNull("Message " + i2 + " was missing.", receive);
            arrayList2.add(receive);
        }
        assertEquals(arrayList, arrayList2);
        consumerConnection2.close();
    }

    public void initCombosForTestSendWhilstConsume() {
        addCombinationValues("MESSAGE_COUNT", new Object[]{Integer.valueOf(DurableSubProcessWithRestartTest.CARGO_SIZE), 500});
        addCombinationValues("PREFETCH_SIZE", new Object[]{100, 50});
    }

    public void testSendWhilstConsume() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        Connection consumerConnection = getConsumerConnection(createConnectionFactory);
        getConsumer(consumerConnection);
        consumerConnection.close();
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(getDestination(createSession));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.MESSAGE_COUNT / 10; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("test" + i);
            arrayList.add(createTextMessage);
            createProducer.send(createTextMessage);
        }
        Connection consumerConnection2 = getConsumerConnection(createConnectionFactory);
        MessageConsumer consumer = getConsumer(consumerConnection2);
        final ArrayList arrayList2 = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.region.cursors.CursorSupport.1
            public void onMessage(Message message) {
                try {
                    Thread.sleep(50L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                arrayList2.add(message);
                if (arrayList2.size() == CursorSupport.this.MESSAGE_COUNT) {
                    countDownLatch.countDown();
                }
            }
        });
        for (int i2 = this.MESSAGE_COUNT / 10; i2 < this.MESSAGE_COUNT; i2++) {
            TextMessage createTextMessage2 = createSession.createTextMessage("test" + i2);
            arrayList.add(createTextMessage2);
            createProducer.send(createTextMessage2);
        }
        countDownLatch.await(DurableSubProcessWithRestartTest.RUNTIME, TimeUnit.MILLISECONDS);
        createConnection.close();
        consumerConnection2.close();
        assertEquals("Still dipatching - count down latch not sprung", countDownLatch.getCount(), 0L);
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            Message message = (Message) arrayList.get(i3);
            Message message2 = (Message) arrayList2.get(i3);
            if (!message.equals(message2)) {
                LOG.error("BAD MATCH AT POS " + i3);
                LOG.error(message.toString());
                LOG.error(message2.toString());
            }
            assertEquals("This should be the same at pos " + i3 + " in the list", message.getJMSMessageID(), message2.getJMSMessageID());
        }
    }

    protected Connection getConsumerConnection(ConnectionFactory connectionFactory) throws JMSException {
        Connection createConnection = connectionFactory.createConnection();
        createConnection.setClientID("testConsumer");
        createConnection.start();
        return createConnection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker();
        }
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.bindAddress);
        Properties properties = new Properties();
        properties.setProperty("prefetchPolicy.durableTopicPrefetch", "" + this.PREFETCH_SIZE);
        properties.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + this.PREFETCH_SIZE);
        properties.setProperty("prefetchPolicy.queuePrefetch", "" + this.PREFETCH_SIZE);
        activeMQConnectionFactory.setProperties(properties);
        return activeMQConnectionFactory;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService);
        brokerService.start();
        return brokerService;
    }
}
