/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ7077Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ7077Test.class);
    private BrokerService brokerService;
    private String connectionUri;

    protected ConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(this.connectionUri);
        conFactory.setWatchTopicAdvisories(false);
        return conFactory;
    }

    protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
        strategy.setCheckPeriod(500L);
        strategy.setMaxTimeSinceLastAck(1000L);
        strategy.setMaxSlowDuration(0L);
        strategy.setMaxSlowCount(4L);
        strategy.setIgnoreIdleConsumers(false);
        return strategy;
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = BrokerFactory.createBroker((URI)new URI("broker://()/localhost?persistent=false&useJmx=true"));
        PolicyEntry policy = new PolicyEntry();
        policy.setSlowConsumerStrategy((SlowConsumerStrategy)this.createSlowConsumerStrategy());
        policy.setQueuePrefetch(10);
        policy.setTopicPrefetch(10);
        policy.setAdvisoryForSlowConsumers(true);
        PolicyMap pMap = new PolicyMap();
        pMap.put((ActiveMQDestination)new ActiveMQQueue(">"), (Object)policy);
        this.brokerService.setDestinationPolicy(pMap);
        this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.start();
        this.connectionUri = this.brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
    }

    @Test
    public void testAdvisoryOnSlowAckDetection() throws Exception {
        Connection connection = this.createConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue("DD");
        MessageConsumer advisoryConsumer = session.createConsumer((Destination)AdvisorySupport.getSlowConsumerAdvisoryTopic((Destination)destination));
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        Message message = advisoryConsumer.receive(10000L);
        if (message == null) {
            message = advisoryConsumer.receive(2000L);
        }
        Assert.assertNotNull((String)"Got advisory", (Object)message);
        connection.close();
        QueueViewMBean queue = this.getProxyToQueue(destination.getQueueName());
        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
        Assert.assertNotNull((Object)slowConsumerPolicyMBeanName);
        final AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)this.brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
        Assert.assertTrue((String)"slow list is gone on remove", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                TabularData slowOnes = abortPolicy.getSlowConsumers();
                LOG.info("slow ones:" + slowOnes);
                return slowOnes.size() == 0;
            }
        }));
    }

    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
        QueueViewMBean proxy = (QueueViewMBean)this.brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
    }
}

