package org.apache.activemq.tool;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Topic;
import org.apache.activemq.tool.properties.JmsClientProperties;
import org.apache.activemq.tool.properties.JmsConsumerProperties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/tool/JmsConsumerClient.class */
public class JmsConsumerClient extends AbstractJmsMeasurableClient {
    private static final Log LOG = LogFactory.getLog(JmsConsumerClient.class);
    protected MessageConsumer jmsConsumer;
    protected JmsConsumerProperties client;

    public JmsConsumerClient(ConnectionFactory connectionFactory) {
        this(new JmsConsumerProperties(), connectionFactory);
    }

    public JmsConsumerClient(JmsConsumerProperties jmsConsumerProperties, ConnectionFactory connectionFactory) {
        super(connectionFactory);
        this.client = jmsConsumerProperties;
    }

    public void receiveMessages() throws JMSException {
        if (this.client.isAsyncRecv()) {
            if (this.client.getRecvType().equalsIgnoreCase("time")) {
                receiveAsyncTimeBasedMessages(this.client.getRecvDuration());
                return;
            } else {
                receiveAsyncCountBasedMessages(this.client.getRecvCount());
                return;
            }
        }
        if (this.client.getRecvType().equalsIgnoreCase("time")) {
            receiveSyncTimeBasedMessages(this.client.getRecvDuration());
        } else {
            receiveSyncCountBasedMessages(this.client.getRecvCount());
        }
    }

    public void receiveMessages(int i) throws JMSException {
        this.destCount = i;
        receiveMessages();
    }

    public void receiveMessages(int i, int i2) throws JMSException {
        this.destIndex = i;
        receiveMessages(i2);
    }

    public void receiveSyncTimeBasedMessages(long j) throws JMSException {
        if (getJmsConsumer() == null) {
            createJmsConsumer();
        }
        try {
            getConnection().start();
            LOG.info("Starting to synchronously receive messages for " + j + " ms...");
            long currentTimeMillis = System.currentTimeMillis() + j;
            while (System.currentTimeMillis() < currentTimeMillis) {
                getJmsConsumer().receive();
                incThroughput();
            }
        } finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + getClientName());
                getJmsConsumer().close();
                getSession().unsubscribe(getClientName());
            }
            getConnection().close();
        }
    }

    public void receiveSyncCountBasedMessages(long j) throws JMSException {
        if (getJmsConsumer() == null) {
            createJmsConsumer();
        }
        try {
            getConnection().start();
            LOG.info("Starting to synchronously receive " + j + " messages...");
            for (int i = 0; i < j; i++) {
                getJmsConsumer().receive();
                incThroughput();
            }
        } finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + getClientName());
                getJmsConsumer().close();
                getSession().unsubscribe(getClientName());
            }
            getConnection().close();
        }
    }

    public void receiveAsyncTimeBasedMessages(long j) throws JMSException {
        if (getJmsConsumer() == null) {
            createJmsConsumer();
        }
        getJmsConsumer().setMessageListener(new MessageListener() { // from class: org.apache.activemq.tool.JmsConsumerClient.1
            public void onMessage(Message message) {
                JmsConsumerClient.this.incThroughput();
            }
        });
        try {
            getConnection().start();
            LOG.info("Starting to asynchronously receive messages for " + j + " ms...");
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
                throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
            }
        } finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + getClientName());
                getJmsConsumer().close();
                getSession().unsubscribe(getClientName());
            }
            getConnection().close();
        }
    }

    public void receiveAsyncCountBasedMessages(long j) throws JMSException {
        if (getJmsConsumer() == null) {
            createJmsConsumer();
        }
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        getJmsConsumer().setMessageListener(new MessageListener() { // from class: org.apache.activemq.tool.JmsConsumerClient.2
            public void onMessage(Message message) {
                JmsConsumerClient.this.incThroughput();
                atomicInteger.incrementAndGet();
                atomicInteger.notify();
            }
        });
        try {
            getConnection().start();
            LOG.info("Starting to asynchronously receive " + this.client.getRecvCount() + " messages...");
            while (atomicInteger.get() < j) {
                try {
                    atomicInteger.wait();
                } catch (InterruptedException e) {
                    throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
                }
            }
        } finally {
            if (this.client.isDurable() && this.client.isUnsubscribe()) {
                LOG.info("Unsubscribing durable subscriber: " + getClientName());
                getJmsConsumer().close();
                getSession().unsubscribe(getClientName());
            }
            getConnection().close();
        }
    }

    public MessageConsumer createJmsConsumer() throws JMSException {
        return createJmsConsumer(createDestination(this.destIndex, this.destCount)[0]);
    }

    public MessageConsumer createJmsConsumer(Destination destination) throws JMSException {
        if (this.client.isDurable()) {
            String clientName = getClientName();
            if (clientName == null) {
                clientName = "JmsConsumer";
                setClientName(clientName);
            }
            LOG.info("Creating durable subscriber (" + clientName + ") to: " + destination.toString());
            this.jmsConsumer = getSession().createDurableSubscriber((Topic) destination, clientName);
        } else {
            LOG.info("Creating non-durable consumer to: " + destination.toString());
            this.jmsConsumer = getSession().createConsumer(destination);
        }
        return this.jmsConsumer;
    }

    public MessageConsumer createJmsConsumer(Destination destination, String str, boolean z) throws JMSException {
        if (this.client.isDurable()) {
            String clientName = getClientName();
            if (clientName == null) {
                clientName = "JmsConsumer";
                setClientName(clientName);
            }
            LOG.info("Creating durable subscriber (" + clientName + ") to: " + destination.toString());
            this.jmsConsumer = getSession().createDurableSubscriber((Topic) destination, clientName, str, z);
        } else {
            LOG.info("Creating non-durable consumer to: " + destination.toString());
            this.jmsConsumer = getSession().createConsumer(destination, str, z);
        }
        return this.jmsConsumer;
    }

    public MessageConsumer getJmsConsumer() {
        return this.jmsConsumer;
    }

    @Override // org.apache.activemq.tool.AbstractJmsClient
    public JmsClientProperties getClient() {
        return this.client;
    }

    @Override // org.apache.activemq.tool.AbstractJmsClient
    public void setClient(JmsClientProperties jmsClientProperties) {
        this.client = (JmsConsumerProperties) jmsClientProperties;
    }
}
