/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.policy;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbortSlowConsumerTest
extends JmsMultipleClientsTestSupport
implements ExceptionListener {
    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
    AbortSlowConsumerStrategy underTest;
    public boolean abortConnection = false;
    public long checkPeriod = 2000L;
    public long maxSlowDuration = 5000L;
    private final List<Throwable> exceptions = new ArrayList<Throwable>();

    @Override
    protected void setUp() throws Exception {
        this.exceptions.clear();
        this.topic = true;
        this.underTest = new AbortSlowConsumerStrategy();
        super.setUp();
        this.createDestination();
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = super.createBroker();
        PolicyEntry policy = new PolicyEntry();
        this.underTest.setAbortConnection(this.abortConnection);
        this.underTest.setCheckPeriod(this.checkPeriod);
        this.underTest.setMaxSlowDuration(this.maxSlowDuration);
        policy.setSlowConsumerStrategy((SlowConsumerStrategy)this.underTest);
        policy.setQueuePrefetch(10);
        policy.setTopicPrefetch(10);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(pMap);
        return broker;
    }

    public void testRegularConsumerIsNotAborted() throws Exception {
        this.startConsumers(this.destination);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(10);
        this.allMessagesList.assertAtLeastMessagesReceived(10);
    }

    public void initCombosForTestLittleSlowConsumerIsNotAborted() {
        this.addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testLittleSlowConsumerIsNotAborted() throws Exception {
        this.startConsumers(this.destination);
        Map.Entry consumertoAbort = this.consumers.entrySet().iterator().next();
        ((MessageIdList)((Object)consumertoAbort.getValue())).setProcessingDelay(500L);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 12);
        this.allMessagesList.waitForMessagesToArrive(10);
        this.allMessagesList.assertAtLeastMessagesReceived(10);
    }

    public void initCombosForTestSlowConsumerIsAborted() {
        this.addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testSlowConsumerIsAborted() throws Exception {
        this.startConsumers(this.destination);
        Map.Entry consumertoAbort = this.consumers.entrySet().iterator().next();
        ((MessageIdList)((Object)consumertoAbort.getValue())).setProcessingDelay(8000L);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertMessagesReceived(1);
        TimeUnit.SECONDS.sleep(5L);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertAtMostMessagesReceived(1);
    }

    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
        this.underTest.setMaxSlowDuration(60000L);
        this.startConsumers(this.destination);
        Map.Entry consumertoAbort = this.consumers.entrySet().iterator().next();
        ((MessageIdList)((Object)consumertoAbort.getValue())).setProcessingDelay(8000L);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertMessagesReceived(1);
        ActiveMQDestination amqDest = (ActiveMQDestination)this.destination;
        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + (amqDest.isTopic() ? "Topic" : "Queue") + ",destinationName=" + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
        DestinationViewMBean queue = (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
        AbortSlowConsumerTest.assertNotNull((Object)slowConsumerPolicyMBeanName);
        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)this.broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
        TimeUnit.SECONDS.sleep(3L);
        TabularData slowOnes = abortPolicy.getSlowConsumers();
        AbortSlowConsumerTest.assertEquals((String)"one slow consumers", (int)1, (int)slowOnes.size());
        LOG.info("slow ones:" + slowOnes);
        CompositeData slowOne = (CompositeData)slowOnes.values().iterator().next();
        LOG.info("Slow one: " + slowOne);
        AbortSlowConsumerTest.assertTrue((String)"we have an object name", (boolean)(slowOne.get("subscription") instanceof ObjectName));
        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertAtMostMessagesReceived(1);
        slowOnes = abortPolicy.getSlowConsumers();
        AbortSlowConsumerTest.assertEquals((String)"no slow consumers left", (int)0, (int)slowOnes.size());
        this.broker.getAdminView().removeTopic(amqDest.getPhysicalName());
        try {
            abortPolicy.getSlowConsumers();
            AbortSlowConsumerTest.fail((String)"expect not found post destination removal");
        }
        catch (UndeclaredThrowableException expected) {
            AbortSlowConsumerTest.assertTrue((String)("correct exception: " + expected.getCause()), (boolean)(expected.getCause() instanceof InstanceNotFoundException));
        }
    }

    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
        this.consumerCount = 10;
        this.startConsumers(this.destination);
        Map.Entry consumertoAbort = this.consumers.entrySet().iterator().next();
        ((MessageIdList)((Object)consumertoAbort.getValue())).setProcessingDelay(8000L);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(99);
        this.allMessagesList.assertAtLeastMessagesReceived(99);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertMessagesReceived(1);
        TimeUnit.SECONDS.sleep(5L);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertAtMostMessagesReceived(1);
    }

    public void testAbortAlreadyClosingConsumers() throws Exception {
        this.consumerCount = 1;
        this.startConsumers(this.destination);
        for (MessageIdList list : this.consumers.values()) {
            list.setProcessingDelay(6000L);
        }
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(this.consumerCount);
        for (MessageConsumer consumer : this.consumers.keySet()) {
            LOG.info("closing consumer: " + consumer);
            consumer.close();
        }
    }

    public void initCombosForTestAbortAlreadyClosedConsumers() {
        this.addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testAbortAlreadyClosedConsumers() throws Exception {
        Connection conn = this.createConnectionFactory().createConnection();
        conn.setExceptionListener((ExceptionListener)this);
        this.connections.add(conn);
        Session sess = conn.createSession(false, 2);
        MessageConsumer consumer = sess.createConsumer(this.destination);
        conn.start();
        this.startProducers(this.destination, 20);
        TimeUnit.SECONDS.sleep(1L);
        LOG.info("closing consumer: " + consumer);
        consumer.close();
        TimeUnit.SECONDS.sleep(5L);
        AbortSlowConsumerTest.assertTrue((String)("no exceptions : " + this.exceptions.toArray()), (boolean)this.exceptions.isEmpty());
    }

    public void initCombosForTestAbortAlreadyClosedConnection() {
        this.addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testAbortAlreadyClosedConnection() throws Exception {
        Connection conn = this.createConnectionFactory().createConnection();
        conn.setExceptionListener((ExceptionListener)this);
        Session sess = conn.createSession(false, 2);
        sess.createConsumer(this.destination);
        conn.start();
        this.startProducers(this.destination, 20);
        TimeUnit.SECONDS.sleep(1L);
        LOG.info("closing connection: " + conn);
        conn.close();
        TimeUnit.SECONDS.sleep(5L);
        AbortSlowConsumerTest.assertTrue((String)("no exceptions : " + this.exceptions.toArray()), (boolean)this.exceptions.isEmpty());
    }

    public void testAbortConsumerOnDeadConnection() throws Exception {
    }

    public void onException(JMSException exception) {
        this.exceptions.add(exception);
        exception.printStackTrace();
    }

    public static Test suite() {
        return AbortSlowConsumerTest.suite(AbortSlowConsumerTest.class);
    }
}

