package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/MissingDataFileTest.class */
public class MissingDataFileTest extends TestCase {
    private static int hectorToHaloCtr;
    private static int xenaToHaloCtr;
    private static int troyToHaloCtr;
    private static int haloToHectorCtr;
    private static int haloToXenaCtr;
    private static int haloToTroyCtr;
    private BrokerService broker;
    private Connection hectorConnection;
    private Connection xenaConnection;
    private Connection troyConnection;
    private Connection haloConnection;
    private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
    private static int counter = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP;
    protected static final String payload = new String(new byte[DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP]);
    private String hectorToHalo = "hectorToHalo";
    private String xenaToHalo = "xenaToHalo";
    private String troyToHalo = "troyToHalo";
    private String haloToHector = "haloToHector";
    private String haloToXena = "haloToXena";
    private String haloToTroy = "haloToTroy";
    private final Object lock = new Object();
    final boolean useTopic = false;
    final boolean useSleep = true;

    public Connection createConnection() throws JMSException {
        return new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL).createConnection();
    }

    public Session createSession(Connection connection, boolean z) throws JMSException {
        return connection.createSession(z, 1);
    }

    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector(NetworkedSyncTest.broker1URL).setName("Default");
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(10485760L);
        this.broker.setSystemUsage(systemUsage);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(16384);
        kahaDBPersistenceAdapter.setCleanupInterval(500L);
        this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    public void tearDown() throws Exception {
        this.hectorConnection.close();
        this.xenaConnection.close();
        this.troyConnection.close();
        this.haloConnection.close();
        this.broker.stop();
    }

    public void testForNoDataFoundError() throws Exception {
        startBroker();
        this.hectorConnection = createConnection();
        Thread buildProducer = buildProducer(this.hectorConnection, this.hectorToHalo, false, false);
        buildReceiver(this.hectorConnection, this.haloToHector, false, new Receiver() { // from class: org.apache.activemq.bugs.MissingDataFileTest.1
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                MissingDataFileTest.access$008();
                if (MissingDataFileTest.haloToHectorCtr >= MissingDataFileTest.counter) {
                    synchronized (MissingDataFileTest.this.lock) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(MissingDataFileTest.haloToHectorCtr);
            }
        }, false);
        this.troyConnection = createConnection();
        Thread buildProducer2 = buildProducer(this.troyConnection, this.troyToHalo);
        buildReceiver(this.hectorConnection, this.haloToTroy, false, new Receiver() { // from class: org.apache.activemq.bugs.MissingDataFileTest.2
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                MissingDataFileTest.access$308();
                if (MissingDataFileTest.haloToTroyCtr >= MissingDataFileTest.counter) {
                    synchronized (MissingDataFileTest.this.lock) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(MissingDataFileTest.haloToTroyCtr);
            }
        }, false);
        this.xenaConnection = createConnection();
        Thread buildProducer3 = buildProducer(this.xenaConnection, this.xenaToHalo);
        buildReceiver(this.xenaConnection, this.haloToXena, false, new Receiver() { // from class: org.apache.activemq.bugs.MissingDataFileTest.3
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                MissingDataFileTest.access$408();
                if (MissingDataFileTest.haloToXenaCtr >= MissingDataFileTest.counter) {
                    synchronized (MissingDataFileTest.this.lock) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(MissingDataFileTest.haloToXenaCtr);
            }
        }, false);
        this.haloConnection = createConnection();
        final MessageSender buildTransactionalProducer = buildTransactionalProducer(this.haloToHector, this.haloConnection, false);
        final MessageSender buildTransactionalProducer2 = buildTransactionalProducer(this.haloToTroy, this.haloConnection, false);
        final MessageSender buildTransactionalProducer3 = buildTransactionalProducer(this.haloToXena, this.haloConnection, false);
        Receiver receiver = new Receiver() { // from class: org.apache.activemq.bugs.MissingDataFileTest.4
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                MissingDataFileTest.access$508();
                buildTransactionalProducer2.send(MissingDataFileTest.payload);
                if (MissingDataFileTest.hectorToHaloCtr >= MissingDataFileTest.counter) {
                    synchronized (MissingDataFileTest.this.lock) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                    MissingDataFileTest.this.possiblySleep(MissingDataFileTest.hectorToHaloCtr);
                }
            }
        };
        Receiver receiver2 = new Receiver() { // from class: org.apache.activemq.bugs.MissingDataFileTest.5
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                MissingDataFileTest.access$608();
                buildTransactionalProducer.send(MissingDataFileTest.payload);
                if (MissingDataFileTest.xenaToHaloCtr >= MissingDataFileTest.counter) {
                    synchronized (MissingDataFileTest.this.lock) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(MissingDataFileTest.xenaToHaloCtr);
            }
        };
        Receiver receiver3 = new Receiver() { // from class: org.apache.activemq.bugs.MissingDataFileTest.6
            @Override // org.apache.activemq.bugs.Receiver
            public void receive(String str) throws Exception {
                MissingDataFileTest.access$708();
                buildTransactionalProducer3.send(MissingDataFileTest.payload);
                if (MissingDataFileTest.troyToHaloCtr >= MissingDataFileTest.counter) {
                    synchronized (MissingDataFileTest.this.lock) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
            }
        };
        buildReceiver(this.haloConnection, this.hectorToHalo, true, receiver, false);
        buildReceiver(this.haloConnection, this.xenaToHalo, true, receiver2, false);
        buildReceiver(this.haloConnection, this.troyToHalo, true, receiver3, false);
        this.haloConnection.start();
        this.troyConnection.start();
        buildProducer2.start();
        this.xenaConnection.start();
        buildProducer3.start();
        this.hectorConnection.start();
        buildProducer.start();
        waitForMessagesToBeDelivered();
        assertEquals(hectorToHaloCtr, counter);
        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
        assertEquals(xenaToHaloCtr, counter);
        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
        assertEquals(troyToHaloCtr, counter);
        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
        assertEquals(haloToHectorCtr, counter);
        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
        assertEquals(haloToXenaCtr, counter);
        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
        assertEquals(haloToTroyCtr, counter);
        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
    }

    protected void possiblySleep(int i) throws InterruptedException {
        if (i % 100 == 0) {
            Thread.sleep(5000L);
        }
    }

    protected void waitForMessagesToBeDelivered() {
        long j = counter * 1000;
        long currentTimeMillis = j <= 0 ? 0L : System.currentTimeMillis();
        synchronized (this.lock) {
            boolean z = true;
            for (long j2 = j; z && j2 >= 0; j2 = j - (System.currentTimeMillis() - currentTimeMillis)) {
                try {
                    this.lock.wait(200L);
                } catch (InterruptedException e) {
                    LOG.error(e.toString());
                }
                z = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter;
            }
        }
    }

    public MessageSender buildTransactionalProducer(String str, Connection connection, boolean z) throws Exception {
        return new MessageSender(str, connection, true, z);
    }

    public Thread buildProducer(Connection connection, String str) throws Exception {
        return buildProducer(connection, str, false, false);
    }

    public Thread buildProducer(Connection connection, final String str, boolean z, boolean z2) throws Exception {
        final MessageSender messageSender = new MessageSender(str, connection, z, z2);
        return new Thread() { // from class: org.apache.activemq.bugs.MissingDataFileTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public synchronized void run() {
                for (int i = 0; i < MissingDataFileTest.counter; i++) {
                    try {
                        messageSender.send(MissingDataFileTest.payload);
                    } catch (Exception e) {
                        throw new RuntimeException("on " + str + " send", e);
                    }
                }
            }
        };
    }

    public void buildReceiver(Connection connection, String str, boolean z, final Receiver receiver, boolean z2) throws Exception {
        final Session createSession = z ? connection.createSession(true, 0) : connection.createSession(false, 1);
        createSession.createConsumer(z2 ? createSession.createTopic(str) : createSession.createQueue(str)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.MissingDataFileTest.8
            public void onMessage(Message message) {
                try {
                    receiver.receive((String) ((ObjectMessage) message).getObject());
                    if (createSession.getTransacted()) {
                        createSession.commit();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    static /* synthetic */ int access$008() {
        int i = haloToHectorCtr;
        haloToHectorCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$308() {
        int i = haloToTroyCtr;
        haloToTroyCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$408() {
        int i = haloToXenaCtr;
        haloToXenaCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$508() {
        int i = hectorToHaloCtr;
        hectorToHaloCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$608() {
        int i = xenaToHaloCtr;
        xenaToHaloCtr = i + 1;
        return i;
    }

    static /* synthetic */ int access$708() {
        int i = troyToHaloCtr;
        troyToHaloCtr = i + 1;
        return i;
    }
}
