/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpiredMessagesWithNoConsumerTest
extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
    private final ActiveMQDestination destination = new ActiveMQQueue("test");
    private boolean optimizedDispatch = true;
    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
    private BrokerService broker;
    private String connectionUri;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public static Test suite() {
        return ExpiredMessagesWithNoConsumerTest.suite(ExpiredMessagesWithNoConsumerTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)ExpiredMessagesWithNoConsumerTest.suite());
    }

    protected void createBrokerWithMemoryLimit() throws Exception {
        this.createBrokerWithMemoryLimit(800);
    }

    protected void createBrokerWithMemoryLimit(int expireMessagesPeriod) throws Exception {
        this.doCreateBroker(true, expireMessagesPeriod);
    }

    protected void createBroker() throws Exception {
        this.doCreateBroker(false, 800);
    }

    private void doCreateBroker(boolean memoryLimit, int expireMessagesPeriod) throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName("localhost");
        this.broker.setUseJmx(true);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://localhost:0");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setOptimizedDispatch(this.optimizedDispatch);
        defaultEntry.setExpireMessagesPeriod((long)expireMessagesPeriod);
        defaultEntry.setMaxExpirePageSize(800);
        defaultEntry.setPendingQueuePolicy(this.pendingQueuePolicy);
        if (memoryLimit) {
            defaultEntry.setDeadLetterStrategy(null);
            defaultEntry.setMemoryLimit(200000L);
        }
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    public void testExpiredNonPersistentMessagesWithNoConsumer() throws Exception {
        this.createBrokerWithMemoryLimit(2000);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connection = factory.createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer((Destination)this.destination);
        this.producer.setTimeToLive(1000L);
        this.producer.setDeliveryMode(1);
        this.connection.start();
        long sendCount = 2000L;
        final Thread producingThread = new Thread("Producing Thread"){

            @Override
            public void run() {
                try {
                    int i = 0;
                    long tStamp = System.currentTimeMillis();
                    while ((long)i++ < 2000L) {
                        ExpiredMessagesWithNoConsumerTest.this.producer.send((Message)ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            LOG.info("sent: " + i + " @ " + (System.currentTimeMillis() - tStamp) / 100L + "m/ms");
                            tStamp = System.currentTimeMillis();
                        }
                        if (135 != i) continue;
                        TimeUnit.SECONDS.sleep(5L);
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"producer failed to complete within allocated time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                producingThread.join(TimeUnit.SECONDS.toMillis(3000L));
                return !producingThread.isAlive();
            }
        }));
        TimeUnit.SECONDS.sleep(5L);
        final DestinationViewMBean view = this.createView(this.destination);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                try {
                    LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
                    return view.getDequeueCount() != 0L && view.getDequeueCount() == view.getExpiredCount() && view.getDequeueCount() == view.getEnqueueCount() && view.getQueueSize() == 0L;
                }
                catch (Exception ignored) {
                    LOG.info(ignored.toString());
                    return false;
                }
            }
        }, (long)300000L);
        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"memory usage doesn't go to duck egg", (int)0, (int)view.getMemoryPercentUsage());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"0 queue", (long)0L, (long)view.getQueueSize());
    }

    public void initCombosForTestExpiredMessagesWithNoConsumer() {
        this.addCombinationValues("optimizedDispatch", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("pendingQueuePolicy", new Object[]{null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()});
    }

    public void testExpiredMessagesWithNoConsumer() throws Exception {
        this.createBrokerWithMemoryLimit();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connection = factory.createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer((Destination)this.destination);
        this.producer.setTimeToLive(1000L);
        this.connection.start();
        long sendCount = 2000L;
        final Thread producingThread = new Thread("Producing Thread"){

            @Override
            public void run() {
                try {
                    int i = 0;
                    long tStamp = System.currentTimeMillis();
                    while ((long)i++ < 2000L) {
                        ExpiredMessagesWithNoConsumerTest.this.producer.send((Message)ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 != 0) continue;
                        LOG.info("sent: " + i + " @ " + (System.currentTimeMillis() - tStamp) / 100L + "m/ms");
                        tStamp = System.currentTimeMillis();
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"producer failed to complete within allocated time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                producingThread.join(TimeUnit.SECONDS.toMillis(3000L));
                return !producingThread.isAlive();
            }
        }));
        final DestinationViewMBean view = this.createView(this.destination);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
                return 2000L == view.getExpiredCount();
            }
        }, (long)300000L);
        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"Not all sent messages have expired", (long)2000L, (long)view.getExpiredCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"memory usage doesn't go to duck egg", (int)0, (int)view.getMemoryPercentUsage());
    }

    public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
        this.createBroker();
        long queuePrefetch = 600L;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + 600L);
        this.connection = factory.createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer((Destination)this.destination);
        int ttl = 4000;
        this.producer.setTimeToLive(4000L);
        long sendCount = 1500L;
        final CountDownLatch receivedOneCondition = new CountDownLatch(1);
        final CountDownLatch waitCondition = new CountDownLatch(1);
        MessageConsumer consumer = this.session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    LOG.info("Got my message: " + message);
                    receivedOneCondition.countDown();
                    waitCondition.await(6L, TimeUnit.MINUTES);
                    LOG.info("acking message: " + message);
                    message.acknowledge();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    TestCase.fail((String)e.toString());
                }
            }
        });
        this.connection.start();
        final Thread producingThread = new Thread("Producing Thread"){

            @Override
            public void run() {
                try {
                    int i = 0;
                    long tStamp = System.currentTimeMillis();
                    while ((long)i++ < 1500L) {
                        ExpiredMessagesWithNoConsumerTest.this.producer.send((Message)ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 != 0) continue;
                        LOG.info("sent: " + i + " @ " + (System.currentTimeMillis() - tStamp) / 100L + "m/ms");
                        tStamp = System.currentTimeMillis();
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"got one message", (boolean)receivedOneCondition.await(20L, TimeUnit.SECONDS));
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"producer failed to complete within allocated time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                producingThread.join(1000L);
                return !producingThread.isAlive();
            }
        }, (long)300000L));
        final DestinationViewMBean view = this.createView(this.destination);
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"all dispatched up to default prefetch ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 600L == view.getDispatchCount();
            }
        }));
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"Not all sent have expired ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1500L == view.getExpiredCount();
            }
        }));
        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
        waitCondition.countDown();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == view.getInFlightCount();
            }
        });
        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"inflight reduced to duck", (long)0L, (long)view.getInFlightCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"size didn't get back to 0 ", (long)0L, (long)view.getQueueSize());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"dequeues didn't match sent/expired ", (long)1500L, (long)view.getDequeueCount());
        consumer.close();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == view.getInFlightCount();
            }
        });
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"inflight goes to zero on close", (long)0L, (long)view.getInFlightCount());
        LOG.info("done: " + this.getName());
    }

    public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
        this.createBroker();
        long queuePrefetch = 600L;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + 600L);
        this.connection = factory.createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer((Destination)this.destination);
        int ttl = 4000;
        this.producer.setTimeToLive(4000L);
        long sendCount = 1500L;
        final CountDownLatch receivedOneCondition = new CountDownLatch(1);
        final CountDownLatch waitCondition = new CountDownLatch(1);
        final AtomicLong received = new AtomicLong();
        MessageConsumer consumer = this.session.createConsumer((Destination)this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got my message: " + message);
                    }
                    receivedOneCondition.countDown();
                    received.incrementAndGet();
                    waitCondition.await(5L, TimeUnit.MINUTES);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("acking message: " + message);
                    }
                    message.acknowledge();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    TestCase.fail((String)e.toString());
                }
            }
        });
        this.connection.start();
        final Thread producingThread = new Thread("Producing Thread"){

            @Override
            public void run() {
                try {
                    int i = 0;
                    long tStamp = System.currentTimeMillis();
                    while ((long)i++ < 1500L) {
                        ExpiredMessagesWithNoConsumerTest.this.producer.send((Message)ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 != 0) continue;
                        LOG.info("sent: " + i + " @ " + (System.currentTimeMillis() - tStamp) / 100L + "m/ms");
                        tStamp = System.currentTimeMillis();
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"got one message", (boolean)receivedOneCondition.await(20L, TimeUnit.SECONDS));
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"producer failed to complete within allocated time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                producingThread.join(1000L);
                return !producingThread.isAlive();
            }
        }, (long)300000L));
        final DestinationViewMBean view = this.createView(this.destination);
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"Not all dispatched up to default prefetch ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 600L == view.getDispatchCount();
            }
        }));
        ExpiredMessagesWithNoConsumerTest.assertTrue((String)"All have not sent have expired ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1500L == view.getExpiredCount();
            }
        }));
        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
        waitCondition.countDown();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == view.getInFlightCount();
            }
        });
        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"inflight didn't reduce to duck", (long)0L, (long)view.getInFlightCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"size doesn't get back to 0 ", (long)0L, (long)view.getQueueSize());
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"dequeues don't match sent/expired ", (long)1500L, (long)view.getDequeueCount());
        this.producer.setTimeToLive(0L);
        long tStamp = System.currentTimeMillis();
        int i = 0;
        while ((long)i < 1500L) {
            this.producer.send((Message)this.session.createTextMessage("test-" + i));
            if (i % 100 == 0) {
                LOG.info("sent: " + i + " @ " + (System.currentTimeMillis() - tStamp) / 100L + "m/ms");
                tStamp = System.currentTimeMillis();
            }
            ++i;
        }
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return received.get() >= 1500L;
            }
        });
        consumer.close();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == view.getInFlightCount();
            }
        });
        ExpiredMessagesWithNoConsumerTest.assertEquals((String)"inflight did not go to zero on close", (long)0L, (long)view.getInFlightCount());
        LOG.info("done: " + this.getName());
    }

    public void testExpireMessagesForDurableSubscriber() throws Exception {
        this.createBroker();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connection = factory.createConnection();
        this.connection.setClientID("myConnection");
        this.session = this.connection.createSession(false, 2);
        this.connection.start();
        Topic destination = this.session.createTopic("test");
        this.producer = this.session.createProducer((Destination)destination);
        int ttl = 1000;
        this.producer.setTimeToLive(1000L);
        long sendCount = 10L;
        TopicSubscriber sub = this.session.createDurableSubscriber(destination, "mySub");
        sub.close();
        int i = 0;
        while ((long)i < 10L) {
            this.producer.send((Message)this.session.createTextMessage("test"));
            ++i;
        }
        DestinationViewMBean view = this.createView((ActiveMQDestination)((ActiveMQTopic)destination));
        LOG.info("messages sent");
        LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)0L, (long)view.getExpiredCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)10L, (long)view.getEnqueueCount());
        Thread.sleep(5000L);
        LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)10L, (long)view.getExpiredCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)0L, (long)view.getEnqueueCount());
        final AtomicLong received = new AtomicLong();
        sub = this.session.createDurableSubscriber(destination, "mySub");
        sub.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.incrementAndGet();
            }
        });
        LOG.info("Waiting for messages to arrive");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return received.get() >= 10L;
            }
        }, (long)1000L);
        LOG.info("received=" + received.get());
        LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)0L, (long)received.get());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)10L, (long)view.getExpiredCount());
        ExpiredMessagesWithNoConsumerTest.assertEquals((long)0L, (long)view.getEnqueueCount());
    }

    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
        String domain = "org.apache.activemq";
        ObjectName name = destination.isQueue() ? new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test") : new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test");
        return (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
    }

    protected void tearDown() throws Exception {
        this.connection.stop();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public boolean getOptimizedDispatch() {
        return this.optimizedDispatch;
    }

    public void setOptimizedDispatch(boolean option) {
        this.optimizedDispatch = option;
    }

    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
        return this.pendingQueuePolicy;
    }

    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy policy) {
        this.pendingQueuePolicy = policy;
    }
}

