package org.apache.activemq.usecases;

import java.util.Arrays;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.ConsumerThread;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/usecases/MemoryLimitTest.class */
public class MemoryLimitTest extends org.apache.activemq.TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitTest.class);
    final String payload = new String(new byte[10240]);
    protected BrokerService broker;

    @Parameterized.Parameter
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    @Parameterized.Parameters(name = "store={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB}, new Object[]{TestSupport.PersistenceAdapterChoice.LevelDB}, new Object[]{TestSupport.PersistenceAdapterChoice.JDBC});
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.getSystemUsage().getMemoryUsage().setLimit(1048576L);
        brokerService.deleteAllMessages();
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setProducerFlowControl(false);
        policyMap.put(new ActiveMQQueue(">"), policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        LOG.info("Starting broker with persistenceAdapterChoice " + this.persistenceAdapterChoice.toString());
        setPersistenceAdapter(brokerService, this.persistenceAdapterChoice);
        brokerService.getPersistenceAdapter().deleteAllMessages();
        return brokerService;
    }

    @Before
    public void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker();
        }
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test(timeout = 120000)
    public void testCursorBatch() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
        activeMQConnectionFactory.setOptimizeAcknowledge(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 2);
        ActiveMQQueue createQueue = createSession.createQueue("STORE");
        ProducerThread producerThread = new ProducerThread(createSession, createQueue) { // from class: org.apache.activemq.usecases.MemoryLimitTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.activemq.util.ProducerThread
            public Message createMessage(int i) throws Exception {
                return this.sess.createTextMessage(MemoryLimitTest.this.payload + "::" + i);
            }
        };
        producerThread.setMessageCount(2000);
        producerThread.start();
        producerThread.join();
        Thread.sleep(1000L);
        final Destination destination = this.broker.getDestination(createQueue);
        LOG.info("Destination usage: " + destination.getMemoryUsage());
        int percentUsage = destination.getMemoryUsage().getPercentUsage();
        assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
        LOG.info("Broker usage: " + this.broker.getSystemUsage().getMemoryUsage());
        assertTrue(this.broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        createConsumer.receive(5000L).acknowledge();
        assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.MemoryLimitTest.2
            public boolean isSatisified() throws Exception {
                MemoryLimitTest.LOG.info("Destination usage: " + destination.getMemoryUsage());
                return destination.getMemoryUsage().getPercentUsage() >= 470;
            }
        }));
        LOG.info("Broker usage: " + this.broker.getSystemUsage().getMemoryUsage());
        assertTrue(this.broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 470);
        for (int i = 1; i < 2000; i++) {
            Message receive = createConsumer.receive(5000L);
            if (receive == null) {
                dumpAllThreads("NoMessage");
            }
            assertNotNull("Didn't receive message " + i, receive);
            receive.acknowledge();
        }
    }

    @Test(timeout = 120000)
    public void testMoveMessages() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
        activeMQConnectionFactory.setOptimizeAcknowledge(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 2);
        Queue createQueue = createSession.createQueue("IN");
        final byte[] bArr = new byte[204800];
        ProducerThread producerThread = new ProducerThread(createSession, createQueue) { // from class: org.apache.activemq.usecases.MemoryLimitTest.3
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.activemq.util.ProducerThread
            public Message createMessage(int i) throws Exception {
                BytesMessage createBytesMessage = this.sess.createBytesMessage();
                createBytesMessage.writeBytes(bArr);
                return createBytesMessage;
            }
        };
        producerThread.setMessageCount(4);
        producerThread.start();
        producerThread.join();
        Thread.sleep(1000L);
        final QueueViewMBean proxyToQueue = getProxyToQueue("IN");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.MemoryLimitTest.4
            public boolean isSatisified() throws Exception {
                return proxyToQueue.getQueueSize() == 4;
            }
        });
        assertEquals("Messages not sent", 4L, proxyToQueue.getQueueSize());
        assertEquals("Didn't move all messages", 4, proxyToQueue.moveMatchingMessagesTo("", "OUT"));
        final QueueViewMBean proxyToQueue2 = getProxyToQueue("OUT");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.MemoryLimitTest.5
            public boolean isSatisified() throws Exception {
                return proxyToQueue2.getQueueSize() == 4;
            }
        });
        assertEquals("Messages not moved", 4L, proxyToQueue2.getQueueSize());
    }

    @Test(timeout = 120000)
    @Ignore
    public void testLimit() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
        activeMQConnectionFactory.setOptimizeAcknowledge(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        ProducerThread producerThread = new ProducerThread(createSession, createSession.createQueue("STORE.1")) { // from class: org.apache.activemq.usecases.MemoryLimitTest.6
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.activemq.util.ProducerThread
            public Message createMessage(int i) throws Exception {
                return this.sess.createTextMessage(MemoryLimitTest.this.payload + "::" + i);
            }
        };
        producerThread.setMessageCount(1000);
        ProducerThread producerThread2 = new ProducerThread(createSession, createSession.createQueue("STORE.2")) { // from class: org.apache.activemq.usecases.MemoryLimitTest.7
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.activemq.util.ProducerThread
            public Message createMessage(int i) throws Exception {
                return this.sess.createTextMessage(MemoryLimitTest.this.payload + "::" + i);
            }
        };
        producerThread2.setMessageCount(1000);
        ConsumerThread consumerThread = new ConsumerThread(createSession, createSession.createQueue("STORE.1"));
        consumerThread.setBreakOnNull(false);
        consumerThread.setMessageCount(1000);
        producerThread.start();
        producerThread.join();
        producerThread2.start();
        Thread.sleep(300L);
        consumerThread.start();
        consumerThread.join();
        producerThread2.join();
        assertEquals("consumer got all produced messages", producerThread.getMessageCount(), consumerThread.getReceived());
    }

    protected QueueViewMBean getProxyToQueue(String str) throws MalformedObjectNameException, JMSException {
        return (QueueViewMBean) BrokerRegistry.getInstance().lookup("localhost").getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + str), QueueViewMBean.class, true);
    }
}
