/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.usecases.ProducerConsumerTestSupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumeTopicPrefetchTest
extends ProducerConsumerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumeTopicPrefetchTest.class);
    protected int prefetchSize = 100;
    protected String[] messageTexts;
    protected long consumerTimeout = 10000L;

    public void testSendPrefetchSize() throws JMSException {
        this.testWithMessageCount(this.prefetchSize);
    }

    public void testSendDoublePrefetchSize() throws JMSException {
        this.testWithMessageCount(this.prefetchSize * 2);
    }

    public void testSendPrefetchSizePlusOne() throws JMSException {
        this.testWithMessageCount(this.prefetchSize + 1);
    }

    protected void testWithMessageCount(int messageCount) throws JMSException {
        int i;
        this.makeMessages(messageCount);
        LOG.info("About to send and receive: " + messageCount + " on destination: " + this.destination + " of type: " + this.destination.getClass().getName());
        for (i = 0; i < messageCount; ++i) {
            TextMessage message = this.session.createTextMessage(this.messageTexts[i]);
            this.producer.send((Message)message);
        }
        this.validateConsumerPrefetch(this.getSubject(), this.prefetchSize);
        for (i = 0; i < messageCount; ++i) {
            this.consumeMessge(i);
        }
    }

    @Override
    protected Connection createConnection() throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)super.createConnection();
        connection.getPrefetchPolicy().setQueuePrefetch(this.prefetchSize);
        connection.getPrefetchPolicy().setTopicPrefetch(this.prefetchSize);
        return connection;
    }

    protected TextMessage consumeMessge(int i) throws JMSException {
        Message message = this.consumer.receive(this.consumerTimeout);
        ConsumeTopicPrefetchTest.assertTrue((String)("Should have received a message by now for message: " + i), (message != null ? 1 : 0) != 0);
        ConsumeTopicPrefetchTest.assertTrue((String)("Should be a TextMessage: " + message), (boolean)(message instanceof TextMessage));
        TextMessage textMessage = (TextMessage)message;
        ConsumeTopicPrefetchTest.assertEquals((String)"Message content", (String)this.messageTexts[i], (String)textMessage.getText());
        return textMessage;
    }

    protected void makeMessages(int messageCount) {
        this.messageTexts = new String[messageCount];
        for (int i = 0; i < messageCount; ++i) {
            this.messageTexts[i] = "Message for test: + " + this.getName() + " = " + i;
        }
    }

    protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException {
        RegionBroker regionBroker = (RegionBroker)BrokerRegistry.getInstance().lookup("localhost").getRegionBroker();
        Iterator i$ = regionBroker.getQueueRegion().getDestinationMap().values().iterator();
        while (i$.hasNext()) {
            Destination dest;
            final Destination target = dest = (Destination)i$.next();
            if (!dest.getName().equals(destination)) continue;
            try {
                Wait.waitFor((Wait.Condition)new Wait.Condition(){

                    public boolean isSatisified() throws Exception {
                        DestinationStatistics stats = target.getDestinationStatistics();
                        LOG.info("inflight for : " + target.getName() + ": " + stats.getInflight().getCount());
                        return stats.getInflight().getCount() == expectedCount;
                    }
                });
            }
            catch (Exception e) {
                throw new JMSException(e.toString());
            }
            DestinationStatistics stats = dest.getDestinationStatistics();
            LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
            ConsumeTopicPrefetchTest.assertEquals((String)("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches"), (long)expectedCount, (long)stats.getInflight().getCount());
        }
    }
}

