package org.apache.activemq.store.kahadb;

import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/store/kahadb/KahaDBStoreTest.class */
public class KahaDBStoreTest {
    KahaDBStore.KahaDBMessageStore underTest;
    KahaDBStore store;
    ActiveMQMessage message;
    private static final int MESSAGE_COUNT = 2000;
    ProducerId producerId = new ProducerId("1.1.1");
    private Vector<Throwable> exceptions = new Vector<>();

    @Before
    public void initStore() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("Test");
        this.store = new KahaDBStore();
        this.store.setMaxAsyncJobs(100);
        this.store.setDeleteAllMessages(true);
        this.store.start();
        KahaDBStore kahaDBStore = this.store;
        kahaDBStore.getClass();
        this.underTest = new KahaDBStore.KahaDBMessageStore(kahaDBStore, activeMQQueue);
        this.underTest.start();
        this.message = new ActiveMQMessage();
        this.message.setDestination(activeMQQueue);
    }

    @After
    public void destroyStore() throws Exception {
        if (this.store != null) {
            this.store.stop();
        }
    }

    @Test
    public void testConcurrentStoreAndDispatchQueue() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        int i = 0;
        while (i < 2000) {
            final int i2 = i + 1;
            newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.kahadb.KahaDBStoreTest.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Message copy = KahaDBStoreTest.this.message.copy();
                        copy.setMessageId(new MessageId(KahaDBStoreTest.this.producerId, i2));
                        KahaDBStoreTest.this.underTest.asyncAddQueueMessage((ConnectionContext) null, copy);
                    } catch (Exception e) {
                        KahaDBStoreTest.this.exceptions.add(e);
                    }
                }
            });
            i = i2 + 1;
        }
        ExecutorService newCachedThreadPool2 = Executors.newCachedThreadPool();
        int i3 = 0;
        while (i3 < 2000) {
            final int i4 = i3 + 1;
            newCachedThreadPool2.execute(new Runnable() { // from class: org.apache.activemq.store.kahadb.KahaDBStoreTest.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        MessageAck messageAck = new MessageAck();
                        messageAck.setLastMessageId(new MessageId(KahaDBStoreTest.this.producerId, i4));
                        KahaDBStoreTest.this.underTest.removeAsyncMessage((ConnectionContext) null, messageAck);
                    } catch (Exception e) {
                        KahaDBStoreTest.this.exceptions.add(e);
                    }
                }
            });
            i3 = i4 + 1;
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
        newCachedThreadPool2.shutdown();
        newCachedThreadPool2.awaitTermination(60L, TimeUnit.SECONDS);
        Assert.assertTrue("no exceptions " + this.exceptions, this.exceptions.isEmpty());
    }
}
