package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ1866.class */
public class AMQ1866 extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
    private BrokerService brokerService;
    private String ACTIVEMQ_BROKER_URI;
    private ActiveMQQueue destination;
    private ArrayList<Thread> threads = new ArrayList<>();
    private final String ACTIVEMQ_BROKER_BIND = JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT;
    AtomicBoolean shutdown = new AtomicBoolean();

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1866$ConsumerThread.class */
    public class ConsumerThread extends Thread {
        final AtomicLong counter;

        public ConsumerThread(String str) {
            super(str);
            this.counter = new AtomicLong();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Connection connection = null;
            try {
                try {
                    AMQ1866.log.debug(getName() + ": is running");
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(AMQ1866.this.ACTIVEMQ_BROKER_URI);
                    activeMQConnectionFactory.setDispatchAsync(true);
                    connection = activeMQConnectionFactory.createConnection();
                    MessageConsumer createConsumer = connection.createSession(false, 1).createConsumer(AMQ1866.this.destination);
                    connection.start();
                    while (!AMQ1866.this.shutdown.get()) {
                        if (createConsumer.receive(1000L) != null) {
                            int i = getName().equals("Consumer-1") ? 1000000 : 1;
                            this.counter.incrementAndGet();
                            Thread.sleep(i);
                        }
                    }
                    AMQ1866.log.debug(getName() + ": is stopping");
                    try {
                        connection.close();
                    } catch (Throwable th) {
                    }
                } catch (Exception e) {
                    AMQ1866.log.debug(getName() + ": is stopping");
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                    }
                }
            } catch (Throwable th3) {
                AMQ1866.log.debug(getName() + ": is stopping");
                try {
                    connection.close();
                } catch (Throwable th4) {
                }
                throw th3;
            }
        }
    }

    protected void setUp() throws Exception {
        this.brokerService = new BrokerService();
        AMQPersistenceAdapter aMQPersistenceAdapter = new AMQPersistenceAdapter();
        aMQPersistenceAdapter.setIndexBinSize(4096);
        this.brokerService.setPersistenceAdapter(aMQPersistenceAdapter);
        this.brokerService.deleteAllMessages();
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMaxPageSize(1);
        policyMap.put(new ActiveMQQueue(">"), policyEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.start();
        this.ACTIVEMQ_BROKER_URI = ((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString();
        this.destination = new ActiveMQQueue(getName());
    }

    protected void tearDown() throws Exception {
        this.shutdown.set(true);
        Iterator<Thread> it = this.threads.iterator();
        while (it.hasNext()) {
            Thread next = it.next();
            next.interrupt();
            next.join();
        }
        this.brokerService.stop();
    }

    public void testConsumerSlowDownPrefetch0() throws Exception {
        this.ACTIVEMQ_BROKER_URI += "?jms.prefetchPolicy.queuePrefetch=0";
        doTestConsumerSlowDown();
    }

    public void testConsumerSlowDownPrefetch10() throws Exception {
        this.ACTIVEMQ_BROKER_URI += "?jms.prefetchPolicy.queuePrefetch=10";
        doTestConsumerSlowDown();
    }

    public void testConsumerSlowDownDefaultPrefetch() throws Exception {
        doTestConsumerSlowDown();
    }

    public void doTestConsumerSlowDown() throws Exception {
        produce(20000);
        Thread thread = new Thread() { // from class: org.apache.activemq.bugs.AMQ1866.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!AMQ1866.this.shutdown.get()) {
                    try {
                        AMQ1866.this.produce(1000);
                    } catch (Exception e) {
                        return;
                    }
                }
            }
        };
        this.threads.add(thread);
        thread.start();
        ConsumerThread consumerThread = new ConsumerThread("Consumer-1");
        this.threads.add(consumerThread);
        consumerThread.start();
        Thread.sleep(500L);
        ConsumerThread consumerThread2 = new ConsumerThread("Consumer-2");
        this.threads.add(consumerThread2);
        consumerThread2.start();
        int i = 0;
        for (int i2 = 0; i2 < 30; i2++) {
            Thread.sleep(1000L);
            long andSet = consumerThread.counter.getAndSet(0L);
            long andSet2 = consumerThread2.counter.getAndSet(0L);
            log.debug("c1: " + andSet + ", c2: " + andSet2);
            i = (int) (((int) (i + andSet)) + andSet2);
            if (i2 > 10) {
                assertTrue("Total received=" + i + ", Consumer 2 should be receiving new messages every second.", andSet2 > 0);
            }
        }
    }

    public void produce(int i) throws Exception {
        Connection connection = null;
        try {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.ACTIVEMQ_BROKER_URI);
            activeMQConnectionFactory.setDispatchAsync(true);
            connection = activeMQConnectionFactory.createConnection();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(this.destination);
            connection.start();
            int i2 = 0;
            while (i2 < i) {
                int i3 = i2 + 1;
                createProducer.send(createSession.createTextMessage(getName() + " Message " + i3));
                i2 = i3 + 1;
            }
            try {
                connection.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                connection.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }
}
