/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.policy;

import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.policy.AbortSlowConsumerTest;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbortSlowAckConsumerTest
extends AbortSlowConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class);
    protected long maxTimeSinceLastAck = 5000L;

    @Override
    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
        return new AbortSlowConsumerStrategy();
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = super.createBroker();
        PolicyEntry policy = new PolicyEntry();
        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy)this.underTest;
        strategy.setAbortConnection(this.abortConnection);
        strategy.setCheckPeriod(this.checkPeriod);
        strategy.setMaxSlowDuration(this.maxSlowDuration);
        strategy.setMaxTimeSinceLastAck(this.maxTimeSinceLastAck);
        policy.setSlowConsumerStrategy((SlowConsumerStrategy)strategy);
        policy.setQueuePrefetch(10);
        policy.setTopicPrefetch(10);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        broker.setDestinationPolicy(pMap);
        return broker;
    }

    @Override
    protected ConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        factory.getPrefetchPolicy().setAll(1);
        return factory;
    }

    @Override
    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy)this.underTest;
        strategy.setMaxTimeSinceLastAck(500L);
        super.testSlowConsumerIsAbortedViaJmx();
    }

    @Override
    public void initCombosForTestSlowConsumerIsAborted() {
        this.addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testZeroPrefetchConsumerIsAborted() throws Exception {
        ActiveMQConnection conn = (ActiveMQConnection)this.createConnectionFactory().createConnection();
        conn.setExceptionListener((ExceptionListener)this);
        this.connections.add(conn);
        Session sess = conn.createSession(false, 2);
        MessageConsumer consumer = sess.createConsumer(this.destination);
        AbortSlowAckConsumerTest.assertNotNull((Object)consumer);
        conn.start();
        this.startProducers(this.destination, 20);
        Message message = consumer.receive(5000L);
        AbortSlowAckConsumerTest.assertNotNull((Object)message);
        try {
            consumer.receive(20000L);
            AbortSlowAckConsumerTest.fail((String)"Slow consumer not aborted.");
        }
        catch (Exception ex) {
            // empty catch block
        }
    }

    public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy)this.underTest;
        strategy.setIgnoreIdleConsumers(false);
        ActiveMQConnection conn = (ActiveMQConnection)this.createConnectionFactory().createConnection();
        conn.setExceptionListener((ExceptionListener)this);
        this.connections.add(conn);
        Session sess = conn.createSession(false, 2);
        MessageConsumer consumer = sess.createConsumer(this.destination);
        AbortSlowAckConsumerTest.assertNotNull((Object)consumer);
        conn.start();
        this.startProducers(this.destination, 20);
        try {
            consumer.receive(20000L);
            AbortSlowAckConsumerTest.fail((String)"Idle consumer not aborted.");
        }
        catch (Exception ex) {
            // empty catch block
        }
    }

    public void testIdleConsumerCanBeAborted() throws Exception {
        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy)this.underTest;
        strategy.setIgnoreIdleConsumers(false);
        ActiveMQConnection conn = (ActiveMQConnection)this.createConnectionFactory().createConnection();
        conn.setExceptionListener((ExceptionListener)this);
        this.connections.add(conn);
        Session sess = conn.createSession(false, 2);
        MessageConsumer consumer = sess.createConsumer(this.destination);
        AbortSlowAckConsumerTest.assertNotNull((Object)consumer);
        conn.start();
        this.startProducers(this.destination, 20);
        Message message = consumer.receive(5000L);
        AbortSlowAckConsumerTest.assertNotNull((Object)message);
        message.acknowledge();
        try {
            consumer.receive(20000L);
            AbortSlowAckConsumerTest.fail((String)"Slow consumer not aborted.");
        }
        catch (Exception ex) {
            // empty catch block
        }
    }
}

