package org.apache.activemq.store.kahadb.perf;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.nio.NIOSSLConcurrencyTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.class */
public class KahaBulkLoadingTest extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(KahaBulkLoadingTest.class);
    protected int messageSize = NIOSSLConcurrencyTest.MESSAGE_SIZE;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(new File("target/activemq-data/kahadb"));
        brokerService.setPersistenceAdapter(kahaDBStore);
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        return brokerService;
    }

    @Override // org.apache.activemq.JmsTestSupport
    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getServer().getConnectURI());
        activeMQConnectionFactory.setUseAsyncSend(true);
        return activeMQConnectionFactory;
    }

    public void testQueueSendThenAddConsumer() throws Exception {
        int i;
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        this.connection.setUseCompression(false);
        this.connection.getPrefetchPolicy().setAll(10);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 3);
        LOG.info("Receiving messages that are in the queue");
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        BytesMessage receive = createConsumer.receive(2000L);
        int i2 = 0;
        if (receive != null) {
            i2 = 0 + 1;
        }
        while (true) {
            int i3 = 0;
            if (receive == null) {
                break;
            }
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis;
            int i4 = 0;
            while (true) {
                i = i4;
                if (j - currentTimeMillis < 5000) {
                    receive = (BytesMessage) createConsumer.receive(5000L);
                    if (receive == null) {
                        break;
                    }
                    i3++;
                    i2++;
                    j = System.currentTimeMillis();
                    i4 = (int) (i + receive.getBodyLength());
                }
            }
            LOG.info("Consumed: " + ((i3 * 1000.0d) / (j - currentTimeMillis)) + "  messages/sec, " + (((1.0d * i) / 1048576.0d) * (1000.0d / (j - currentTimeMillis))) + " megs/sec ");
        }
        createConsumer.close();
        LOG.info("Consumed " + i2 + " messages from the queue.");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        createProducer.setDeliveryMode(2);
        LOG.info("Sending messages that are " + (this.messageSize / 1024.0d) + "k large");
        System.currentTimeMillis();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.activemq.store.kahadb.perf.KahaBulkLoadingTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
            }
        });
        int i5 = 0;
        while (!atomicBoolean.get()) {
            long currentTimeMillis2 = System.currentTimeMillis();
            long j2 = currentTimeMillis2;
            int i6 = 0;
            while (j2 - currentTimeMillis2 < 5000 && !atomicBoolean.get()) {
                BytesMessage createBytesMessage = createSession.createBytesMessage();
                createBytesMessage.writeBytes(new byte[this.messageSize]);
                createProducer.send(createBytesMessage);
                i6++;
                i5++;
                j2 = System.currentTimeMillis();
            }
            LOG.info("Produced: " + ((i6 * 1000.0d) / (j2 - currentTimeMillis2)) + " messages/sec, " + ((((1.0d * i6) * this.messageSize) / 1048576.0d) * (1000.0d / (j2 - currentTimeMillis2))) + " megs/sec");
        }
        LOG.info("Prodcued " + i5 + " messages to the queue.");
    }

    public static Test suite() {
        return suite(KahaBulkLoadingTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }
}
