package org.apache.activemq.broker.policy;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/policy/AbortSlowConsumerTest.class */
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
    protected AbortSlowConsumerStrategy underTest;
    protected boolean abortConnection = false;
    protected long checkPeriod = 2000;
    protected long maxSlowDuration = 5000;
    protected final List<Throwable> exceptions = new ArrayList();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleClientsTestSupport
    public void setUp() throws Exception {
        this.exceptions.clear();
        this.topic = true;
        this.underTest = createSlowConsumerStrategy();
        super.setUp();
        createDestination();
    }

    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
        return new AbortSlowConsumerStrategy();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleClientsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService createBroker = super.createBroker();
        PolicyEntry policyEntry = new PolicyEntry();
        this.underTest.setAbortConnection(this.abortConnection);
        this.underTest.setCheckPeriod(this.checkPeriod);
        this.underTest.setMaxSlowDuration(this.maxSlowDuration);
        policyEntry.setSlowConsumerStrategy(this.underTest);
        policyEntry.setQueuePrefetch(10);
        policyEntry.setTopicPrefetch(10);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        createBroker.setDestinationPolicy(policyMap);
        return createBroker;
    }

    public void testRegularConsumerIsNotAborted() throws Exception {
        startConsumers(this.destination);
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().setExceptionListener(this);
        }
        startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(10);
        this.allMessagesList.assertAtLeastMessagesReceived(10);
    }

    public void initCombosForTestLittleSlowConsumerIsNotAborted() {
        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testLittleSlowConsumerIsNotAborted() throws Exception {
        startConsumers(this.destination);
        this.consumers.entrySet().iterator().next().getValue().setProcessingDelay(500L);
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().setExceptionListener(this);
        }
        startProducers(this.destination, 12);
        this.allMessagesList.waitForMessagesToArrive(10);
        this.allMessagesList.assertAtLeastMessagesReceived(10);
    }

    public void initCombosForTestSlowConsumerIsAborted() {
        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testSlowConsumerIsAborted() throws Exception {
        startConsumers(this.destination);
        Map.Entry<MessageConsumer, MessageIdList> next = this.consumers.entrySet().iterator().next();
        next.getValue().setProcessingDelay(8000L);
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().setExceptionListener(this);
        }
        startProducers(this.destination, 100);
        next.getValue().assertMessagesReceived(1);
        TimeUnit.SECONDS.sleep(5L);
        next.getValue().assertAtMostMessagesReceived(1);
    }

    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
        this.underTest.setMaxSlowDuration(60000L);
        startConsumers(this.destination);
        Map.Entry<MessageConsumer, MessageIdList> next = this.consumers.entrySet().iterator().next();
        next.getValue().setProcessingDelay(8000L);
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().setExceptionListener(this);
        }
        startProducers(this.destination, 100);
        next.getValue().assertMessagesReceived(1);
        ActiveMQDestination activeMQDestination = this.destination;
        ObjectName slowConsumerStrategy = ((DestinationViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:destinationType=" + (activeMQDestination.isTopic() ? "Topic" : "Queue") + ",destinationName=" + activeMQDestination.getPhysicalName() + ",type=Broker,brokerName=localhost"), DestinationViewMBean.class, true)).getSlowConsumerStrategy();
        assertNotNull(slowConsumerStrategy);
        AbortSlowConsumerStrategyViewMBean abortSlowConsumerStrategyViewMBean = (AbortSlowConsumerStrategyViewMBean) this.broker.getManagementContext().newProxyInstance(slowConsumerStrategy, AbortSlowConsumerStrategyViewMBean.class, true);
        TimeUnit.SECONDS.sleep(3L);
        TabularData slowConsumers = abortSlowConsumerStrategyViewMBean.getSlowConsumers();
        assertEquals("one slow consumers", 1, slowConsumers.size());
        LOG.info("slow ones:" + slowConsumers);
        CompositeData compositeData = (CompositeData) slowConsumers.values().iterator().next();
        LOG.info("Slow one: " + compositeData);
        assertTrue("we have an object name", compositeData.get("subscription") instanceof ObjectName);
        abortSlowConsumerStrategyViewMBean.abortConsumer((ObjectName) compositeData.get("subscription"));
        next.getValue().assertAtMostMessagesReceived(1);
        assertEquals("no slow consumers left", 0, abortSlowConsumerStrategyViewMBean.getSlowConsumers().size());
        this.broker.getAdminView().removeTopic(activeMQDestination.getPhysicalName());
        try {
            abortSlowConsumerStrategyViewMBean.getSlowConsumers();
            fail("expect not found post destination removal");
        } catch (UndeclaredThrowableException e) {
            assertTrue("correct exception: " + e.getCause(), e.getCause() instanceof InstanceNotFoundException);
        }
    }

    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
        this.consumerCount = 10;
        startConsumers(this.destination);
        Map.Entry<MessageConsumer, MessageIdList> next = this.consumers.entrySet().iterator().next();
        next.getValue().setProcessingDelay(8000L);
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().setExceptionListener(this);
        }
        startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(99);
        this.allMessagesList.assertAtLeastMessagesReceived(99);
        next.getValue().assertMessagesReceived(1);
        TimeUnit.SECONDS.sleep(5L);
        next.getValue().assertAtMostMessagesReceived(1);
    }

    public void testAbortAlreadyClosingConsumers() throws Exception {
        this.consumerCount = 1;
        startConsumers(this.destination);
        Iterator<MessageIdList> it = this.consumers.values().iterator();
        while (it.hasNext()) {
            it.next().setProcessingDelay(6000L);
        }
        Iterator<Connection> it2 = this.connections.iterator();
        while (it2.hasNext()) {
            it2.next().setExceptionListener(this);
        }
        startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(this.consumerCount);
        for (MessageConsumer messageConsumer : this.consumers.keySet()) {
            LOG.info("closing consumer: " + messageConsumer);
            messageConsumer.close();
        }
    }

    public void initCombosForTestAbortAlreadyClosedConsumers() {
        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testAbortAlreadyClosedConsumers() throws Exception {
        Connection createConnection = createConnectionFactory().createConnection();
        createConnection.setExceptionListener(this);
        this.connections.add(createConnection);
        MessageConsumer createConsumer = createConnection.createSession(false, 2).createConsumer(this.destination);
        createConnection.start();
        startProducers(this.destination, 20);
        TimeUnit.SECONDS.sleep(1L);
        LOG.info("closing consumer: " + createConsumer);
        createConsumer.close();
        TimeUnit.SECONDS.sleep(5L);
        assertTrue("no exceptions : " + this.exceptions.toArray(), this.exceptions.isEmpty());
    }

    public void initCombosForTestAbortAlreadyClosedConnection() {
        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testAbortAlreadyClosedConnection() throws Exception {
        Connection createConnection = createConnectionFactory().createConnection();
        createConnection.setExceptionListener(this);
        createConnection.createSession(false, 2).createConsumer(this.destination);
        createConnection.start();
        startProducers(this.destination, 20);
        TimeUnit.SECONDS.sleep(1L);
        LOG.info("closing connection: " + createConnection);
        createConnection.close();
        TimeUnit.SECONDS.sleep(5L);
        assertTrue("no exceptions : " + this.exceptions.toArray(), this.exceptions.isEmpty());
    }

    public void testAbortConsumerOnDeadConnection() throws Exception {
    }

    public void onException(JMSException jMSException) {
        this.exceptions.add(jMSException);
        jMSException.printStackTrace();
    }

    public static Test suite() {
        return suite(AbortSlowConsumerTest.class);
    }
}
