/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.Arrays;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.ConsumerThread;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class MemoryLimitTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitTest.class);
    final String payload = new String(new byte[10240]);
    protected BrokerService broker;
    @Parameterized.Parameter
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    @Parameterized.Parameters(name="store={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList({TestSupport.PersistenceAdapterChoice.KahaDB}, {TestSupport.PersistenceAdapterChoice.LevelDB}, {TestSupport.PersistenceAdapterChoice.JDBC});
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.getSystemUsage().getMemoryUsage().setLimit(0x100000L);
        broker.deleteAllMessages();
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setProducerFlowControl(false);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue(">"), (Object)policyEntry);
        broker.setDestinationPolicy(policyMap);
        LOG.info("Starting broker with persistenceAdapterChoice " + this.persistenceAdapterChoice.toString());
        this.setPersistenceAdapter(broker, this.persistenceAdapterChoice);
        broker.getPersistenceAdapter().deleteAllMessages();
        return broker;
    }

    @Before
    public void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = this.createBroker();
        }
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test(timeout=120000L)
    public void testCursorBatch() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
        factory.setOptimizeAcknowledge(true);
        Connection conn = factory.createConnection();
        conn.start();
        Session sess = conn.createSession(false, 2);
        Queue queue = sess.createQueue("STORE");
        ProducerThread producer = new ProducerThread(sess, (Destination)queue){

            @Override
            protected Message createMessage(int i) throws Exception {
                return this.sess.createTextMessage(MemoryLimitTest.this.payload + "::" + i);
            }
        };
        producer.setMessageCount(2000);
        producer.start();
        producer.join();
        Thread.sleep(1000L);
        final org.apache.activemq.broker.region.Destination dest = this.broker.getDestination((ActiveMQDestination)((ActiveMQQueue)queue));
        LOG.info("Destination usage: " + dest.getMemoryUsage());
        int percentUsage = dest.getMemoryUsage().getPercentUsage();
        MemoryLimitTest.assertTrue((String)("Should be less than 70% of limit but was: " + percentUsage), (percentUsage <= 71 ? 1 : 0) != 0);
        LOG.info("Broker usage: " + this.broker.getSystemUsage().getMemoryUsage());
        MemoryLimitTest.assertTrue((this.broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71 ? 1 : 0) != 0);
        MessageConsumer consumer = sess.createConsumer((Destination)queue);
        Message msg = consumer.receive(5000L);
        msg.acknowledge();
        MemoryLimitTest.assertTrue((String)"Limit is exceeded", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Destination usage: " + dest.getMemoryUsage());
                return dest.getMemoryUsage().getPercentUsage() >= 470;
            }
        }));
        LOG.info("Broker usage: " + this.broker.getSystemUsage().getMemoryUsage());
        MemoryLimitTest.assertTrue((this.broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 470 ? 1 : 0) != 0);
        for (int i = 1; i < 2000; ++i) {
            msg = consumer.receive(5000L);
            if (msg == null) {
                MemoryLimitTest.dumpAllThreads((String)"NoMessage");
            }
            MemoryLimitTest.assertNotNull((String)("Didn't receive message " + i), (Object)msg);
            msg.acknowledge();
        }
    }

    @Ignore
    @Test(timeout=120000L)
    public void testLimit() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
        factory.setOptimizeAcknowledge(true);
        Connection conn = factory.createConnection();
        conn.start();
        Session sess = conn.createSession(false, 1);
        ProducerThread producer = new ProducerThread(sess, (Destination)sess.createQueue("STORE.1")){

            @Override
            protected Message createMessage(int i) throws Exception {
                return this.sess.createTextMessage(MemoryLimitTest.this.payload + "::" + i);
            }
        };
        producer.setMessageCount(1000);
        ProducerThread producer2 = new ProducerThread(sess, (Destination)sess.createQueue("STORE.2")){

            @Override
            protected Message createMessage(int i) throws Exception {
                return this.sess.createTextMessage(MemoryLimitTest.this.payload + "::" + i);
            }
        };
        producer2.setMessageCount(1000);
        ConsumerThread consumer = new ConsumerThread(sess, (Destination)sess.createQueue("STORE.1"));
        consumer.setBreakOnNull(false);
        consumer.setMessageCount(1000);
        producer.start();
        producer.join();
        producer2.start();
        Thread.sleep(300L);
        consumer.start();
        consumer.join();
        producer2.join();
        MemoryLimitTest.assertEquals((String)"consumer got all produced messages", (int)producer.getMessageCount(), (int)consumer.getReceived());
    }
}

