package org.apache.activemq.broker.region.cursors;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.util.SocketProxy;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/NegativeQueueTest.class */
public class NegativeQueueTest extends TestCase {
    private static final String QUEUE_1_NAME = "conn.test.queue.1";
    private static final String QUEUE_2_NAME = "conn.test.queue.2";
    private static final long QUEUE_MEMORY_LIMIT = 2097152;
    private static final long MEMORY_USAGE = 400000000;
    private static final long TEMP_USAGE = 200000000;
    private static final long STORE_USAGE = 1000000000;
    private static final int MESSAGE_COUNT = 2000;
    protected static final boolean TRANSACTED = true;
    protected static final boolean DEBUG = false;
    protected BrokerService broker;
    protected String bindAddress = "tcp://localhost:60706";
    public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
    protected static int NUM_CONSUMERS = 20;
    protected static int PREFETCH_SIZE = SocketProxy.ACCEPT_TIMEOUT_MILLIS;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/broker/region/cursors/NegativeQueueTest$SessionAwareMessageListener.class */
    public class SessionAwareMessageListener implements MessageListener {
        private List<Message> consumerList;
        private CountDownLatch latch;
        private Session consumerSession;
        private Session producerSession;
        private MessageProducer producer;

        public SessionAwareMessageListener(NegativeQueueTest negativeQueueTest, Session session, CountDownLatch countDownLatch, List<Message> list) {
            this(null, session, null, countDownLatch, list);
        }

        public SessionAwareMessageListener(Connection connection, Session session, String str, CountDownLatch countDownLatch, List<Message> list) {
            this.consumerList = list;
            this.latch = countDownLatch;
            this.consumerSession = session;
            if (connection != null) {
                try {
                    this.producerSession = connection.createSession(true, 1);
                    this.producer = this.producerSession.createProducer(this.producerSession.createQueue(str));
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

        public void onMessage(Message message) {
            try {
                if (this.producer == null) {
                    Thread.sleep(50L);
                } else {
                    this.producer.send(message);
                    this.producerSession.commit();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            synchronized (this.consumerList) {
                this.consumerList.add(message);
                if (this.consumerList.size() == 2000) {
                    this.latch.countDown();
                }
            }
            try {
                this.consumerSession.commit();
            } catch (JMSException e2) {
                e2.printStackTrace();
            }
        }
    }

    public void testWithDefaultPrefetch() throws Exception {
        PREFETCH_SIZE = SocketProxy.ACCEPT_TIMEOUT_MILLIS;
        NUM_CONSUMERS = 20;
        blastAndConsume();
    }

    public void testWithDefaultPrefetchFiveConsumers() throws Exception {
        PREFETCH_SIZE = SocketProxy.ACCEPT_TIMEOUT_MILLIS;
        NUM_CONSUMERS = 5;
        blastAndConsume();
    }

    public void testWithDefaultPrefetchTwoConsumers() throws Exception {
        PREFETCH_SIZE = SocketProxy.ACCEPT_TIMEOUT_MILLIS;
        NUM_CONSUMERS = 2;
        blastAndConsume();
    }

    public void testWithDefaultPrefetchOneConsumer() throws Exception {
        PREFETCH_SIZE = SocketProxy.ACCEPT_TIMEOUT_MILLIS;
        NUM_CONSUMERS = 1;
        blastAndConsume();
    }

    public void testWithMediumPrefetch() throws Exception {
        PREFETCH_SIZE = 50;
        NUM_CONSUMERS = 20;
        blastAndConsume();
    }

    public void testWithSmallPrefetch() throws Exception {
        PREFETCH_SIZE = 10;
        NUM_CONSUMERS = 20;
        blastAndConsume();
    }

    public void testWithNoPrefetch() throws Exception {
        PREFETCH_SIZE = 1;
        blastAndConsume();
    }

    public void blastAndConsume() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        Connection createConnection = createConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        QueueViewMBean proxyToQueueViewMBean = getProxyToQueueViewMBean(createSession.createQueue(QUEUE_1_NAME));
        QueueViewMBean proxyToQueueViewMBean2 = getProxyToQueueViewMBean(createSession.createQueue(QUEUE_2_NAME));
        Connection createConnection2 = createConnectionFactory.createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(true, 1);
        MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue(QUEUE_1_NAME));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2000; i++) {
            TextMessage createTextMessage = createSession2.createTextMessage(formatter.format(new Date()));
            arrayList.add(createTextMessage);
            createProducer.send(createTextMessage);
            createSession2.commit();
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Connection[] connectionArr = new Connection[NUM_CONSUMERS];
        ArrayList arrayList2 = new ArrayList();
        Connection[] connectionArr2 = new Connection[NUM_CONSUMERS];
        Connection[] connectionArr3 = new Connection[NUM_CONSUMERS];
        ArrayList arrayList3 = new ArrayList();
        for (int i2 = 0; i2 < NUM_CONSUMERS; i2++) {
            connectionArr3[i2] = createConnectionFactory.createConnection();
            connectionArr3[i2].start();
            connectionArr[i2] = getConsumerConnection(createConnectionFactory);
            Session createSession3 = connectionArr[i2].createSession(true, 1);
            createSession3.createConsumer(createSession2.createQueue(QUEUE_1_NAME)).setMessageListener(new SessionAwareMessageListener(connectionArr3[i2], createSession3, QUEUE_2_NAME, countDownLatch, arrayList2));
        }
        countDownLatch.await(300000L, TimeUnit.MILLISECONDS);
        for (int i3 = 0; i3 < NUM_CONSUMERS; i3++) {
            connectionArr2[i3] = getConsumerConnection(createConnectionFactory);
            Session createSession4 = connectionArr2[i3].createSession(true, 1);
            createSession4.createConsumer(createSession2.createQueue(QUEUE_2_NAME)).setMessageListener(new SessionAwareMessageListener(this, createSession4, countDownLatch2, arrayList3));
        }
        countDownLatch2.await(300000L, TimeUnit.MILLISECONDS);
        createConnection2.close();
        for (int i4 = 0; i4 < NUM_CONSUMERS; i4++) {
            connectionArr[i4].close();
            connectionArr2[i4].close();
            connectionArr3[i4].close();
        }
        Thread.sleep(500L);
        assertEquals("Queue1 has gone negative,", 0L, proxyToQueueViewMBean.getQueueSize());
        assertEquals("Queue2 has gone negative,", 0L, proxyToQueueViewMBean2.getQueueSize());
        createConnection.close();
    }

    private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
        return (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:Type=Queue,Destination=" + queue.getQueueName() + ",BrokerName=localhost"), QueueViewMBean.class, true);
    }

    protected Connection getConsumerConnection(ConnectionFactory connectionFactory) throws JMSException {
        Connection createConnection = connectionFactory.createConnection();
        createConnection.start();
        return createConnection;
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker();
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.bindAddress);
        Properties properties = new Properties();
        properties.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
        properties.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
        properties.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
        activeMQConnectionFactory.setProperties(properties);
        return activeMQConnectionFactory;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService);
        brokerService.start();
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(QUEUE_MEMORY_LIMIT);
        policyEntry.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.addConnector(this.bindAddress);
        MemoryUsage memoryUsage = new MemoryUsage();
        memoryUsage.setLimit(MEMORY_USAGE);
        memoryUsage.setPercentUsageMinDelta(20);
        TempUsage tempUsage = new TempUsage();
        tempUsage.setLimit(TEMP_USAGE);
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(STORE_USAGE);
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.setMemoryUsage(memoryUsage);
        systemUsage.setTempUsage(tempUsage);
        systemUsage.setStoreUsage(storeUsage);
        brokerService.setSystemUsage(systemUsage);
    }
}
