/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetworkLoadTest
extends TestCase {
    private static final transient Logger LOG = LoggerFactory.getLogger(NetworkLoadTest.class);
    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "12"));
    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "5000"));
    protected static final int BROKER_COUNT = 4;
    protected static final int MESSAGE_SIZE = 2000;
    String groupId;
    private BrokerService[] brokers;
    private ForwardingClient[] forwardingClients;

    protected void setUp() throws Exception {
        int i;
        this.groupId = "network-load-test-" + System.currentTimeMillis();
        this.brokers = new BrokerService[4];
        for (i = 0; i < this.brokers.length; ++i) {
            LOG.info("Starting broker: " + i);
            this.brokers[i] = this.createBroker(i);
            this.brokers[i].start();
        }
        Thread.sleep(800L);
        this.forwardingClients = new ForwardingClient[3];
        for (i = 0; i < this.forwardingClients.length; ++i) {
            LOG.info("Starting fowarding client " + i);
            this.forwardingClients[i] = new ForwardingClient(i, i + 1);
            this.forwardingClients[i].start();
        }
    }

    protected void tearDown() throws Exception {
        int i;
        for (i = 0; i < this.forwardingClients.length; ++i) {
            LOG.info("Stoping fowarding client " + i);
            this.forwardingClients[i].close();
        }
        for (i = 0; i < this.brokers.length; ++i) {
            LOG.info("Stoping broker " + i);
            this.brokers[i].stop();
        }
    }

    protected Connection createConnection(int brokerId) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (60000 + brokerId));
        connectionFactory.setOptimizedMessageDispatch(true);
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setUseCompression(false);
        connectionFactory.setDispatchAsync(true);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setOptimizeAcknowledge(false);
        connectionFactory.setWatchTopicAdvisories(false);
        ActiveMQPrefetchPolicy qPrefetchPolicy = new ActiveMQPrefetchPolicy();
        qPrefetchPolicy.setQueuePrefetch(100);
        qPrefetchPolicy.setTopicPrefetch(1000);
        connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
        connectionFactory.setAlwaysSyncSend(true);
        return connectionFactory.createConnection();
    }

    protected BrokerService createBroker(int brokerId) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("broker-" + brokerId);
        broker.setPersistent(false);
        broker.setUseJmx(true);
        broker.getManagementContext().setCreateConnector(false);
        SystemUsage memoryManager = new SystemUsage();
        memoryManager.getMemoryUsage().setLimit(0x3200000L);
        broker.setSystemUsage(memoryManager);
        ArrayList<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
        PolicyEntry entry = new PolicyEntry();
        entry.setQueue(">");
        entry.setMemoryLimit(0x100000L);
        entry.setPendingSubscriberPolicy((PendingSubscriberMessageStoragePolicy)new VMPendingSubscriberMessageStoragePolicy());
        entry.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new VMPendingQueueMessageStoragePolicy());
        policyEntries.add(entry);
        PolicyEntry topicPolicyEntry = new PolicyEntry();
        topicPolicyEntry.setTopic(">");
        NoSubscriptionRecoveryPolicy noSubscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
        topicPolicyEntry.setSubscriptionRecoveryPolicy((SubscriptionRecoveryPolicy)noSubscriptionRecoveryPolicy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(policyEntries);
        broker.setDestinationPolicy(policyMap);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:" + (60000 + brokerId)));
        transportConnector.setDiscoveryUri(new URI("multicast://default?group=" + this.groupId));
        broker.addConnector(transportConnector);
        DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
        networkConnector.setUri(new URI("multicast://default?group=" + this.groupId));
        networkConnector.setBridgeTempDestinations(true);
        networkConnector.setPrefetchSize(1);
        broker.addNetworkConnector((NetworkConnector)networkConnector);
        return broker;
    }

    public void testRequestReply() throws Exception {
        boolean to = false;
        int from = this.brokers.length - 1;
        LOG.info("Staring Final Consumer");
        Connection fromConnection = this.createConnection(from);
        fromConnection.start();
        Session fromSession = fromConnection.createSession(false, 1);
        MessageConsumer consumer = fromSession.createConsumer((Destination)new ActiveMQQueue("Q" + from));
        final AtomicReference lastMessageReceived = new AtomicReference();
        final AtomicLong producedMessages = new AtomicLong();
        final AtomicLong receivedMessages = new AtomicLong();
        final AtomicBoolean done = new AtomicBoolean();
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                ActiveMQTextMessage m = (ActiveMQTextMessage)msg;
                ActiveMQTextMessage last = (ActiveMQTextMessage)lastMessageReceived.get();
                if (last != null && last.getMessageId().getProducerSequenceId() > m.getMessageId().getProducerSequenceId()) {
                    System.out.println("Received an out of order message. Got " + m.getMessageId() + ", expected something after " + last.getMessageId());
                }
                lastMessageReceived.set(m);
                receivedMessages.incrementAndGet();
            }
        });
        LOG.info("Staring Initial Producer");
        final Connection toConnection = this.createConnection(0);
        Thread producer = new Thread("Producer"){

            @Override
            public void run() {
                try {
                    toConnection.start();
                    Session toSession = toConnection.createSession(false, 1);
                    MessageProducer producer = toSession.createProducer((Destination)new ActiveMQQueue("Q0"));
                    producer.setDeliveryMode(1);
                    producer.setDisableMessageID(true);
                    int i = 0;
                    while (!done.get()) {
                        TextMessage msg = toSession.createTextMessage(this.createMessageText(i));
                        producer.send((Message)msg);
                        producedMessages.incrementAndGet();
                        ++i;
                    }
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            private String createMessageText(int index) {
                StringBuffer buffer = new StringBuffer(2000);
                buffer.append(index + " on " + new Date() + " ...");
                if (buffer.length() > 2000) {
                    return buffer.substring(0, 2000);
                }
                for (int i = buffer.length(); i < 2000; ++i) {
                    buffer.append(' ');
                }
                return buffer.toString();
            }
        };
        producer.start();
        Thread.sleep(800L);
        int i = 0;
        while ((long)i < SAMPLES) {
            long start = System.currentTimeMillis();
            producedMessages.set(0L);
            receivedMessages.set(0L);
            for (int j = 0; j < this.forwardingClients.length; ++j) {
                this.forwardingClients[j].forwardCounter.set(0L);
            }
            Thread.sleep(SAMPLE_DURATION);
            long end = System.currentTimeMillis();
            long r = receivedMessages.get();
            long p = producedMessages.get();
            LOG.info("published: " + p + " msgs at " + (float)p * 1000.0f / (float)(end - start) + " msgs/sec, " + "consumed: " + r + " msgs at " + (float)r * 1000.0f / (float)(end - start) + " msgs/sec");
            StringBuffer fwdingmsg = new StringBuffer(500);
            fwdingmsg.append("  forwarding counters: ");
            for (int j = 0; j < this.forwardingClients.length; ++j) {
                if (j != 0) {
                    fwdingmsg.append(", ");
                }
                fwdingmsg.append(this.forwardingClients[j].forwardCounter.get());
            }
            LOG.info(fwdingmsg.toString());
            NetworkLoadTest.assertTrue((String)"Recieved some messages since last sample", (r > 0L ? 1 : 0) != 0);
            NetworkLoadTest.assertTrue((String)"Produced some messages since last sample", (p > 0L ? 1 : 0) != 0);
            ++i;
        }
        LOG.info("Sample done.");
        done.set(true);
        producer.join(5000L);
        toConnection.close();
        fromConnection.close();
    }

    class ForwardingClient {
        private final AtomicLong forwardCounter = new AtomicLong();
        private final Connection toConnection;
        private final Connection fromConnection;

        public ForwardingClient(int from, int to) throws JMSException {
            this.toConnection = NetworkLoadTest.this.createConnection(from);
            Session toSession = this.toConnection.createSession(false, 1);
            final MessageProducer producer = toSession.createProducer((Destination)new ActiveMQQueue("Q" + to));
            producer.setDeliveryMode(1);
            producer.setDisableMessageID(true);
            this.fromConnection = NetworkLoadTest.this.createConnection(from);
            Session fromSession = this.fromConnection.createSession(false, 1);
            MessageConsumer consumer = fromSession.createConsumer((Destination)new ActiveMQQueue("Q" + from));
            consumer.setMessageListener(new MessageListener(){

                public void onMessage(Message msg) {
                    try {
                        producer.send(msg);
                        ForwardingClient.this.forwardCounter.incrementAndGet();
                    }
                    catch (JMSException jMSException) {
                        // empty catch block
                    }
                }
            });
        }

        public void start() throws JMSException {
            this.toConnection.start();
            this.fromConnection.start();
        }

        public void stop() throws JMSException {
            this.toConnection.stop();
            this.fromConnection.stop();
        }

        public void close() throws JMSException {
            this.toConnection.close();
            this.fromConnection.close();
        }
    }
}

