/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.bugs.MessageSender;
import org.apache.activemq.bugs.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionNotStartedErrorTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
    private static final int counter = 500;
    private static int hectorToHaloCtr;
    private static int xenaToHaloCtr;
    private static int troyToHaloCtr;
    private static int haloToHectorCtr;
    private static int haloToXenaCtr;
    private static int haloToTroyCtr;
    private final String hectorToHalo = "hectorToHalo";
    private final String xenaToHalo = "xenaToHalo";
    private final String troyToHalo = "troyToHalo";
    private final String haloToHector = "haloToHector";
    private final String haloToXena = "haloToXena";
    private final String haloToTroy = "haloToTroy";
    private BrokerService broker;
    private Connection hectorConnection;
    private Connection xenaConnection;
    private Connection troyConnection;
    private Connection haloConnection;
    private final Object lock = new Object();

    public Connection createConnection() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        return factory.createConnection();
    }

    public Session createSession(Connection connection, boolean transacted) throws JMSException {
        return connection.createSession(transacted, 1);
    }

    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector("tcp://localhost:0").setName("Default");
        this.broker.start();
        LOG.info("Starting broker..");
    }

    public void tearDown() throws Exception {
        this.hectorConnection.close();
        this.xenaConnection.close();
        this.troyConnection.close();
        this.haloConnection.close();
        this.broker.stop();
    }

    public void testTransactionNotStartedError() throws Exception {
        this.startBroker();
        this.hectorConnection = this.createConnection();
        Thread hectorThread = this.buildProducer(this.hectorConnection, "hectorToHalo");
        Receiver hHectorReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                haloToHectorCtr++;
                if (haloToHectorCtr >= 500) {
                    Object object = TransactionNotStartedErrorTest.this.lock;
                    synchronized (object) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        this.buildReceiver(this.hectorConnection, "haloToHector", false, hHectorReceiver);
        this.troyConnection = this.createConnection();
        Thread troyThread = this.buildProducer(this.troyConnection, "troyToHalo");
        Receiver hTroyReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                haloToTroyCtr++;
                if (haloToTroyCtr >= 500) {
                    Object object = TransactionNotStartedErrorTest.this.lock;
                    synchronized (object) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        this.buildReceiver(this.hectorConnection, "haloToTroy", false, hTroyReceiver);
        this.xenaConnection = this.createConnection();
        Thread xenaThread = this.buildProducer(this.xenaConnection, "xenaToHalo");
        Receiver hXenaReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                haloToXenaCtr++;
                if (haloToXenaCtr >= 500) {
                    Object object = TransactionNotStartedErrorTest.this.lock;
                    synchronized (object) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        this.buildReceiver(this.xenaConnection, "haloToXena", false, hXenaReceiver);
        this.haloConnection = this.createConnection();
        final MessageSender hectorSender = this.buildTransactionalProducer("haloToHector", this.haloConnection);
        final MessageSender troySender = this.buildTransactionalProducer("haloToTroy", this.haloConnection);
        final MessageSender xenaSender = this.buildTransactionalProducer("haloToXena", this.haloConnection);
        Receiver hectorReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                hectorToHaloCtr++;
                troySender.send("halo to troy because of hector");
                if (hectorToHaloCtr >= 500) {
                    Object object = TransactionNotStartedErrorTest.this.lock;
                    synchronized (object) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        Receiver xenaReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                xenaToHaloCtr++;
                hectorSender.send("halo to hector because of xena");
                if (xenaToHaloCtr >= 500) {
                    Object object = TransactionNotStartedErrorTest.this.lock;
                    synchronized (object) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        Receiver troyReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                troyToHaloCtr++;
                xenaSender.send("halo to xena because of troy");
                if (troyToHaloCtr >= 500) {
                    Object object = TransactionNotStartedErrorTest.this.lock;
                    synchronized (object) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        this.buildReceiver(this.haloConnection, "hectorToHalo", true, hectorReceiver);
        this.buildReceiver(this.haloConnection, "xenaToHalo", true, xenaReceiver);
        this.buildReceiver(this.haloConnection, "troyToHalo", true, troyReceiver);
        this.haloConnection.start();
        this.troyConnection.start();
        troyThread.start();
        this.xenaConnection.start();
        xenaThread.start();
        this.hectorConnection.start();
        hectorThread.start();
        this.waitForMessagesToBeDelivered();
        TransactionNotStartedErrorTest.assertEquals((int)hectorToHaloCtr, (int)500);
        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
        TransactionNotStartedErrorTest.assertEquals((int)xenaToHaloCtr, (int)500);
        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
        TransactionNotStartedErrorTest.assertEquals((int)troyToHaloCtr, (int)500);
        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
        TransactionNotStartedErrorTest.assertEquals((int)haloToHectorCtr, (int)500);
        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
        TransactionNotStartedErrorTest.assertEquals((int)haloToXenaCtr, (int)500);
        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
        TransactionNotStartedErrorTest.assertEquals((int)haloToTroyCtr, (int)500);
        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForMessagesToBeDelivered() {
        long maxWaitTime;
        long waitTime = maxWaitTime = 1500000L;
        long start = maxWaitTime <= 0L ? 0L : System.currentTimeMillis();
        Object object = this.lock;
        synchronized (object) {
            boolean hasMessages = true;
            while (hasMessages && waitTime >= 0L) {
                try {
                    this.lock.wait(200L);
                }
                catch (InterruptedException e) {
                    LOG.error(e.toString());
                }
                hasMessages = hectorToHaloCtr < 500 || xenaToHaloCtr < 500 || troyToHaloCtr < 500 || haloToHectorCtr < 500 || haloToXenaCtr < 500 || haloToTroyCtr < 500;
                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
            }
        }
    }

    public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
        return new MessageSender(queueName, connection, true, false);
    }

    public Thread buildProducer(Connection connection, final String queueName) throws Exception {
        final Session session = connection.createSession(false, 1);
        final MessageSender producer = new MessageSender(queueName, connection, false, false);
        Thread thread = new Thread(){

            @Override
            public synchronized void run() {
                for (int i = 0; i < 500; ++i) {
                    try {
                        producer.send(queueName);
                        if (!session.getTransacted()) continue;
                        session.commit();
                        continue;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("on " + queueName + " send", e);
                    }
                }
            }
        };
        return thread;
    }

    public void buildReceiver(Connection connection, String queueName, boolean transacted, final Receiver receiver) throws Exception {
        final Session session = transacted ? connection.createSession(true, 0) : connection.createSession(false, 1);
        MessageConsumer inputMessageConsumer = session.createConsumer((Destination)session.createQueue(queueName));
        MessageListener messageListener = new MessageListener(){

            public void onMessage(Message message) {
                try {
                    ObjectMessage objectMessage = (ObjectMessage)message;
                    String s = (String)((Object)objectMessage.getObject());
                    receiver.receive(s);
                    if (session.getTransacted()) {
                        session.commit();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        inputMessageConsumer.setMessageListener(messageListener);
    }
}

