/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.broker.BrokerTestSupport;
import org.apache.activemq.broker.ProgressPrinter;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerBenchmark
extends BrokerTestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(BrokerBenchmark.class);
    public int produceCount = Integer.parseInt(System.getProperty("PRODUCE_COUNT", "10000"));
    public ActiveMQDestination destination;
    public int prodcuerCount;
    public int consumerCount;
    public boolean deliveryMode;

    public void initCombosForTestPerformance() {
        this.addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
        this.addCombinationValues("PRODUCER_COUNT", new Object[]{new Integer("1"), new Integer("10")});
        this.addCombinationValues("CONSUMER_COUNT", new Object[]{new Integer("1"), new Integer("10")});
        this.addCombinationValues("CONSUMER_COUNT", new Object[]{new Integer("1"), new Integer("10")});
        this.addCombinationValues("deliveryMode", new Object[]{Boolean.TRUE});
    }

    public void testPerformance() throws Exception {
        int i;
        LOG.info("Running Benchmark for destination=" + this.destination + ", producers=" + this.prodcuerCount + ", consumers=" + this.consumerCount + ", deliveryMode=" + this.deliveryMode);
        final int consumeCount = this.destination.isTopic() ? this.consumerCount * this.produceCount : this.produceCount;
        final Semaphore consumersStarted = new Semaphore(1 - this.consumerCount);
        final Semaphore producersFinished = new Semaphore(1 - this.prodcuerCount);
        final Semaphore consumersFinished = new Semaphore(1 - this.consumerCount);
        final ProgressPrinter printer = new ProgressPrinter(this.produceCount + consumeCount, 10L);
        this.profilerPause("Benchmark ready.  Start profiler ");
        long start = System.currentTimeMillis();
        final AtomicInteger receiveCounter = new AtomicInteger(0);
        for (i = 0; i < this.consumerCount; ++i) {
            new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        StubConnection connection = new StubConnection(BrokerBenchmark.this.broker);
                        ConnectionInfo connectionInfo = BrokerBenchmark.this.createConnectionInfo();
                        connection.send((Command)connectionInfo);
                        SessionInfo sessionInfo = BrokerBenchmark.this.createSessionInfo(connectionInfo);
                        ConsumerInfo consumerInfo = BrokerBenchmark.this.createConsumerInfo(sessionInfo, BrokerBenchmark.this.destination);
                        consumerInfo.setPrefetchSize(1000);
                        connection.send((Command)sessionInfo);
                        connection.send((Command)consumerInfo);
                        consumersStarted.release();
                        while (receiveCounter.get() < consumeCount) {
                            int counter = 0;
                            Message msg = BrokerBenchmark.this.receiveMessage(connection, 2000L);
                            if (msg != null) {
                                printer.increment();
                                receiveCounter.incrementAndGet();
                                ++counter;
                                Message extra = null;
                                while ((extra = BrokerBenchmark.this.receiveMessage(connection, 0L)) != null) {
                                    msg = extra;
                                    printer.increment();
                                    receiveCounter.incrementAndGet();
                                    ++counter;
                                }
                            }
                            if (msg != null) {
                                connection.send((Command)BrokerBenchmark.this.createAck(consumerInfo, msg, counter, (byte)2));
                                continue;
                            }
                            if (receiveCounter.get() >= consumeCount) continue;
                            LOG.info("Consumer stall, waiting for message #" + receiveCounter.get() + 1);
                        }
                        connection.send((Command)BrokerBenchmark.this.closeConsumerInfo(consumerInfo));
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                    }
                    finally {
                        consumersFinished.release();
                    }
                }
            }.start();
        }
        consumersStarted.acquire();
        for (i = 0; i < this.prodcuerCount; ++i) {
            new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        StubConnection connection = new StubConnection(BrokerBenchmark.this.broker);
                        ConnectionInfo connectionInfo = BrokerBenchmark.this.createConnectionInfo();
                        connection.send((Command)connectionInfo);
                        SessionInfo sessionInfo = BrokerBenchmark.this.createSessionInfo(connectionInfo);
                        ProducerInfo producerInfo = BrokerBenchmark.this.createProducerInfo(sessionInfo);
                        connection.send((Command)sessionInfo);
                        connection.send((Command)producerInfo);
                        for (int i = 0; i < BrokerBenchmark.this.produceCount / BrokerBenchmark.this.prodcuerCount; ++i) {
                            Message message = BrokerBenchmark.this.createMessage(producerInfo, BrokerBenchmark.this.destination);
                            message.setPersistent(BrokerBenchmark.this.deliveryMode);
                            message.setResponseRequired(false);
                            connection.send((Command)message);
                            printer.increment();
                        }
                    }
                    catch (Throwable e) {
                        e.printStackTrace();
                    }
                    finally {
                        producersFinished.release();
                    }
                }
            }.start();
        }
        producersFinished.acquire();
        long end1 = System.currentTimeMillis();
        consumersFinished.acquire();
        long end2 = System.currentTimeMillis();
        LOG.info("Results for destination=" + this.destination + ", producers=" + this.prodcuerCount + ", consumers=" + this.consumerCount + ", deliveryMode=" + this.deliveryMode);
        LOG.info("Produced at messages/sec: " + (double)this.produceCount * 1000.0 / (double)(end1 - start));
        LOG.info("Consumed at messages/sec: " + (double)consumeCount * 1000.0 / (double)(end2 - start));
        this.profilerPause("Benchmark done.  Stop profiler ");
    }

    public static Test suite() {
        return BrokerBenchmark.suite(BrokerBenchmark.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)BrokerBenchmark.suite());
    }
}

