package org.apache.activemq;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/LargeMessageTestSupport.class */
public class LargeMessageTestSupport extends ClientTestSupport implements MessageListener {
    protected static final int LARGE_MESSAGE_SIZE = 131072;
    protected static final int MESSAGE_COUNT = 100;
    private static final Log LOG = LogFactory.getLog(LargeMessageTestSupport.class);
    protected Connection producerConnection;
    protected Connection consumerConnection;
    protected MessageConsumer consumer;
    protected MessageProducer producer;
    protected Session producerSession;
    protected Session consumerSession;
    protected byte[] largeMessageData;
    protected Destination destination;
    protected boolean isTopic = true;
    protected boolean isDurable = true;
    protected int deliveryMode = 2;
    protected IdGenerator idGen = new IdGenerator();
    protected boolean validMessageConsumption = true;
    protected AtomicInteger messageCount = new AtomicInteger(0);
    protected int prefetchValue = 10000000;

    protected Destination createDestination() {
        String name = getClass().getName();
        return this.isTopic ? new ActiveMQTopic(name) : new ActiveMQQueue(name);
    }

    protected MessageConsumer createConsumer() throws JMSException {
        return (this.isTopic && this.isDurable) ? this.consumerSession.createDurableSubscriber(this.destination, this.idGen.generateId()) : this.consumerSession.createConsumer(this.destination);
    }

    @Override // org.apache.activemq.ClientTestSupport
    public void setUp() throws Exception {
        super.setUp();
        ClientTestSupport.removeMessageStore();
        LOG.info("Setting up . . . . . ");
        this.messageCount.set(0);
        this.destination = createDestination();
        this.largeMessageData = new byte[LARGE_MESSAGE_SIZE];
        for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
            if (i % 2 == 0) {
                this.largeMessageData[i] = 97;
            } else {
                this.largeMessageData[i] = 122;
            }
        }
        try {
            Thread.sleep(1000L);
            ActiveMQConnectionFactory connectionFactory = getConnectionFactory();
            this.producerConnection = connectionFactory.createConnection();
            setPrefetchPolicy((ActiveMQConnection) this.producerConnection);
            this.producerConnection.start();
            this.consumerConnection = connectionFactory.createConnection();
            setPrefetchPolicy((ActiveMQConnection) this.consumerConnection);
            this.consumerConnection.setClientID(this.idGen.generateId());
            this.consumerConnection.start();
            this.producerSession = this.producerConnection.createSession(false, 1);
            this.producer = this.producerSession.createProducer(createDestination());
            this.producer.setDeliveryMode(this.deliveryMode);
            this.consumerSession = this.consumerConnection.createSession(false, 1);
            this.consumer = createConsumer();
            this.consumer.setMessageListener(this);
            LOG.info("Setup complete");
        } catch (InterruptedException e) {
            throw new JMSException(e.getMessage());
        }
    }

    protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) {
        activeMQConnection.getPrefetchPolicy().setTopicPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setQueuePrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(this.prefetchValue);
    }

    @Override // org.apache.activemq.ClientTestSupport
    public void tearDown() throws Exception {
        Thread.sleep(1000L);
        this.producerConnection.close();
        this.consumerConnection.close();
        super.tearDown();
        this.largeMessageData = null;
    }

    protected boolean isSame(BytesMessage bytesMessage) throws Exception {
        boolean z = false;
        ((ActiveMQMessage) bytesMessage).setReadOnlyBody(true);
        for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
            z = bytesMessage.readByte() == this.largeMessageData[i];
            if (!z) {
                break;
            }
        }
        return z;
    }

    public void onMessage(Message message) {
        try {
            BytesMessage bytesMessage = (BytesMessage) message;
            this.validMessageConsumption &= isSame(bytesMessage);
            assertTrue(bytesMessage.getBodyLength() == 131072);
            if (this.messageCount.incrementAndGet() >= 100) {
                synchronized (this.messageCount) {
                    this.messageCount.notify();
                }
            }
            LOG.info("got message = " + this.messageCount);
            if (this.messageCount.get() % 50 == 0) {
                LOG.info("count = " + this.messageCount);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void testLargeMessages() throws Exception {
        for (int i = 0; i < 100; i++) {
            LOG.info("Sending message: " + i);
            BytesMessage createBytesMessage = this.producerSession.createBytesMessage();
            createBytesMessage.writeBytes(this.largeMessageData);
            this.producer.send(createBytesMessage);
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (currentTimeMillis + 60000 > System.currentTimeMillis() && this.messageCount.get() < 100) {
            LOG.info("message count = " + this.messageCount);
            synchronized (this.messageCount) {
                this.messageCount.wait(1000L);
            }
        }
        LOG.info("Finished count = " + this.messageCount);
        assertTrue("Not enough messages - expected 100 but got " + this.messageCount, this.messageCount.get() == 100);
        assertTrue("received messages are not valid", this.validMessageConsumption);
        Thread.sleep(1000L);
        LOG.info("FINAL count = " + this.messageCount);
    }
}
