package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/TransactionNotStartedErrorTest.class */
public class TransactionNotStartedErrorTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
    private static final int counter = 500;
    private static int hectorToHaloCtr;
    private static int xenaToHaloCtr;
    private static int troyToHaloCtr;
    private static int haloToHectorCtr;
    private static int haloToXenaCtr;
    private static int haloToTroyCtr;
    private BrokerService broker;
    private Connection hectorConnection;
    private Connection xenaConnection;
    private Connection troyConnection;
    private Connection haloConnection;
    private final String hectorToHalo = "hectorToHalo";
    private final String xenaToHalo = "xenaToHalo";
    private final String troyToHalo = "troyToHalo";
    private final String haloToHector = "haloToHector";
    private final String haloToXena = "haloToXena";
    private final String haloToTroy = "haloToTroy";
    private final Object lock = new Object();

    public Connection createConnection() throws Exception {
        return new ActiveMQConnectionFactory(((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString()).createConnection();
    }

    public Session createSession(Connection connection, boolean z) throws JMSException {
        return connection.createSession(z, 1);
    }

    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).setName("Default");
        this.broker.start();
        LOG.info("Starting broker..");
    }

    public void tearDown() throws Exception {
        this.hectorConnection.close();
        this.xenaConnection.close();
        this.troyConnection.close();
        this.haloConnection.close();
        this.broker.stop();
    }

    public void testTransactionNotStartedError() throws Exception {
        startBroker();
        this.hectorConnection = createConnection();
        Thread buildProducer = buildProducer(this.hectorConnection, "hectorToHalo");
        buildReceiver(this.hectorConnection, "haloToHector", false, new Receiver() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.1
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                TransactionNotStartedErrorTest.access$008();
                if (TransactionNotStartedErrorTest.haloToHectorCtr >= 500) {
                    synchronized (TransactionNotStartedErrorTest.this.lock) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        });
        this.troyConnection = createConnection();
        Thread buildProducer2 = buildProducer(this.troyConnection, "troyToHalo");
        buildReceiver(this.hectorConnection, "haloToTroy", false, new Receiver() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.2
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                TransactionNotStartedErrorTest.access$208();
                if (TransactionNotStartedErrorTest.haloToTroyCtr >= 500) {
                    synchronized (TransactionNotStartedErrorTest.this.lock) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        });
        this.xenaConnection = createConnection();
        Thread buildProducer3 = buildProducer(this.xenaConnection, "xenaToHalo");
        buildReceiver(this.xenaConnection, "haloToXena", false, new Receiver() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.3
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                TransactionNotStartedErrorTest.access$308();
                if (TransactionNotStartedErrorTest.haloToXenaCtr >= 500) {
                    synchronized (TransactionNotStartedErrorTest.this.lock) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        });
        this.haloConnection = createConnection();
        final MessageSender buildTransactionalProducer = buildTransactionalProducer("haloToHector", this.haloConnection);
        final MessageSender buildTransactionalProducer2 = buildTransactionalProducer("haloToTroy", this.haloConnection);
        final MessageSender buildTransactionalProducer3 = buildTransactionalProducer("haloToXena", this.haloConnection);
        Receiver receiver = new Receiver() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.4
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                TransactionNotStartedErrorTest.access$408();
                buildTransactionalProducer2.send("halo to troy because of hector");
                if (TransactionNotStartedErrorTest.hectorToHaloCtr >= 500) {
                    synchronized (TransactionNotStartedErrorTest.this.lock) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        Receiver receiver2 = new Receiver() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.5
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                TransactionNotStartedErrorTest.access$508();
                buildTransactionalProducer.send("halo to hector because of xena");
                if (TransactionNotStartedErrorTest.xenaToHaloCtr >= 500) {
                    synchronized (TransactionNotStartedErrorTest.this.lock) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        Receiver receiver3 = new Receiver() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.6
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                TransactionNotStartedErrorTest.access$608();
                buildTransactionalProducer3.send("halo to xena because of troy");
                if (TransactionNotStartedErrorTest.troyToHaloCtr >= 500) {
                    synchronized (TransactionNotStartedErrorTest.this.lock) {
                        TransactionNotStartedErrorTest.this.lock.notifyAll();
                    }
                }
            }
        };
        buildReceiver(this.haloConnection, "hectorToHalo", true, receiver);
        buildReceiver(this.haloConnection, "xenaToHalo", true, receiver2);
        buildReceiver(this.haloConnection, "troyToHalo", true, receiver3);
        this.haloConnection.start();
        this.troyConnection.start();
        buildProducer2.start();
        this.xenaConnection.start();
        buildProducer3.start();
        this.hectorConnection.start();
        buildProducer.start();
        waitForMessagesToBeDelivered();
        assertEquals(hectorToHaloCtr, 500);
        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
        assertEquals(xenaToHaloCtr, 500);
        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
        assertEquals(troyToHaloCtr, 500);
        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
        assertEquals(haloToHectorCtr, 500);
        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
        assertEquals(haloToXenaCtr, 500);
        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
        assertEquals(haloToTroyCtr, 500);
        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
    }

    protected void waitForMessagesToBeDelivered() {
        long currentTimeMillis = 1500000 <= 0 ? 0L : System.currentTimeMillis();
        synchronized (this.lock) {
            boolean z = true;
            for (long j = 1500000; z && j >= 0; j = 1500000 - (System.currentTimeMillis() - currentTimeMillis)) {
                try {
                    this.lock.wait(200L);
                } catch (InterruptedException e) {
                    LOG.error(e.toString());
                }
                z = hectorToHaloCtr < 500 || xenaToHaloCtr < 500 || troyToHaloCtr < 500 || haloToHectorCtr < 500 || haloToXenaCtr < 500 || haloToTroyCtr < 500;
            }
        }
    }

    public MessageSender buildTransactionalProducer(String str, Connection connection) throws Exception {
        return new MessageSender(str, connection, true, false);
    }

    public Thread buildProducer(Connection connection, final String str) throws Exception {
        final Session createSession = connection.createSession(false, 1);
        final MessageSender messageSender = new MessageSender(str, connection, false, false);
        return new Thread() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public synchronized void run() {
                for (int i = 0; i < 500; i++) {
                    try {
                        messageSender.send(str);
                        if (createSession.getTransacted()) {
                            createSession.commit();
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("on " + str + " send", e);
                    }
                }
            }
        };
    }

    public void buildReceiver(Connection connection, String str, boolean z, final Receiver receiver) throws Exception {
        final Session createSession = z ? connection.createSession(true, 0) : connection.createSession(false, 1);
        createSession.createConsumer(createSession.createQueue(str)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.TransactionNotStartedErrorTest.8
            public void onMessage(Message message) {
                try {
                    receiver.receive((String) ((ObjectMessage) message).getObject());
                    if (createSession.getTransacted()) {
                        createSession.commit();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    static /* synthetic */ int access$008() {
        int i = haloToHectorCtr;
        haloToHectorCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$208() {
        int i = haloToTroyCtr;
        haloToTroyCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$308() {
        int i = haloToXenaCtr;
        haloToXenaCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$408() {
        int i = hectorToHaloCtr;
        hectorToHaloCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$508() {
        int i = xenaToHaloCtr;
        xenaToHaloCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$608() {
        int i = troyToHaloCtr;
        troyToHaloCtr = i + 1;
        return i;
    }
}
