package org.apache.activemq.store;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/StorePerDestinationTest.class */
public class StorePerDestinationTest {
    static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class);
    static final int maxFileLength = 102400;
    static final int numToSend = 5000;
    final Vector<Throwable> exceptions = new Vector<>();
    BrokerService brokerService;

    /* JADX INFO: Access modifiers changed from: protected */
    public BrokerService createBroker(PersistenceAdapter persistenceAdapter) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(false);
        brokerService.setPersistenceAdapter(persistenceAdapter);
        return brokerService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PersistenceAdapter createStore(boolean z) throws IOException {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(maxFileLength);
        kahaDBPersistenceAdapter.setCleanupInterval(5000L);
        if (z) {
            kahaDBPersistenceAdapter.deleteAllMessages();
        }
        return kahaDBPersistenceAdapter;
    }

    @Before
    public void prepareCleanBrokerWithMultiStore() throws Exception {
        prepareBrokerWithMultiStore(true);
    }

    public void prepareBrokerWithMultiStore(boolean z) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (z) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList arrayList = new ArrayList();
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(createStore(z));
        arrayList.add(filteredKahaDBPersistenceAdapter);
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter2 = new FilteredKahaDBPersistenceAdapter();
        filteredKahaDBPersistenceAdapter2.setPersistenceAdapter(createStore(z));
        filteredKahaDBPersistenceAdapter2.setDestination(new ActiveMQQueue("FastQ"));
        arrayList.add(filteredKahaDBPersistenceAdapter2);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        this.brokerService = createBroker(multiKahaDBPersistenceAdapter);
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void testTransactedSendReceive() throws Exception {
        this.brokerService.start();
        sendMessages(true, "SlowQ", 1, 0L);
        Assert.assertEquals("got one", 1L, receiveMessages(true, "SlowQ", 1));
    }

    @Test
    public void testTransactedSendReceiveAcrossStores() throws Exception {
        this.brokerService.start();
        sendMessages(true, "SlowQ,FastQ", 1, 0L);
        Assert.assertEquals("got one", 2L, receiveMessages(true, "SlowQ,FastQ", 2));
    }

    @Test
    public void testCommitRecovery() throws Exception {
        doTestRecovery(true);
    }

    @Test
    public void testRollbackRecovery() throws Exception {
        doTestRecovery(false);
    }

    public void doTestRecovery(final boolean z) throws Exception {
        final MultiKahaDBPersistenceAdapter persistenceAdapter = this.brokerService.getPersistenceAdapter();
        persistenceAdapter.setTransactionStore(new MultiKahaDBTransactionStore(persistenceAdapter) { // from class: org.apache.activemq.store.StorePerDestinationTest.1
            public void persistOutcome(MultiKahaDBTransactionStore.Tx tx, TransactionId transactionId) throws IOException {
                if (z) {
                    super.persistOutcome(tx, transactionId);
                }
                try {
                    persistenceAdapter.stop();
                } catch (Exception e) {
                    StorePerDestinationTest.LOG.error("ex on stop ", e);
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        this.brokerService.start();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.StorePerDestinationTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StorePerDestinationTest.this.sendMessages(true, "SlowQ,FastQ", 1, 0L);
                } catch (Exception e) {
                    StorePerDestinationTest.LOG.info("expected", e);
                }
            }
        });
        this.brokerService.waitUntilStopped();
        newCachedThreadPool.shutdownNow();
        prepareBrokerWithMultiStore(false);
        this.brokerService.start();
        Assert.assertEquals("expect to get the recovered message", z ? 2L : 0L, receiveMessages(false, "SlowQ,FastQ", 2));
        Assert.assertEquals("all transactions are complete", 0L, this.brokerService.getBroker().getPreparedTransactions((ConnectionContext) null).length);
    }

    @Test
    public void testDirectoryDefault() throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        ArrayList arrayList = new ArrayList();
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        PersistenceAdapter createStore = createStore(false);
        File file = new File("target" + File.separator + "someOtherDisk");
        createStore.setDirectory(file);
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(createStore);
        filteredKahaDBPersistenceAdapter.setDestination(new ActiveMQQueue("Other"));
        arrayList.add(filteredKahaDBPersistenceAdapter);
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter2 = new FilteredKahaDBPersistenceAdapter();
        PersistenceAdapter createStore2 = createStore(false);
        filteredKahaDBPersistenceAdapter2.setPersistenceAdapter(createStore2);
        arrayList.add(filteredKahaDBPersistenceAdapter2);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        Assert.assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), createStore2.getDirectory().getParentFile());
        Assert.assertEquals(file, createStore.getDirectory().getParentFile());
    }

    @Test
    public void testSlowFastDestinationsStoreUsage() throws Exception {
        this.brokerService.start();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.StorePerDestinationTest.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StorePerDestinationTest.this.sendMessages(false, "SlowQ", 50, 500L);
                } catch (Exception e) {
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.StorePerDestinationTest.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StorePerDestinationTest.this.sendMessages(false, "FastQ", StorePerDestinationTest.numToSend, 0L);
                } catch (Exception e) {
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.StorePerDestinationTest.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Assert.assertEquals("Got all sent", 5000L, StorePerDestinationTest.this.receiveMessages(false, "FastQ", StorePerDestinationTest.numToSend));
                } catch (Exception e) {
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        newCachedThreadPool.shutdown();
        Assert.assertTrue("consumers executor finished on time", newCachedThreadPool.awaitTermination(300L, TimeUnit.SECONDS));
        final SystemUsage systemUsage = this.brokerService.getSystemUsage();
        Assert.assertTrue("Store is not hogged", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.StorePerDestinationTest.6
            public boolean isSatisified() throws Exception {
                long usage = systemUsage.getStoreUsage().getUsage();
                StorePerDestinationTest.LOG.info("Store Usage: " + usage);
                return usage < 512000;
            }
        }));
        Assert.assertTrue("no exceptions", this.exceptions.isEmpty());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessages(boolean z, String str, int i, long j) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        try {
            Session createSession = z ? createConnection.createSession(true, 0) : createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue(str));
            for (int i2 = 0; i2 < i; i2++) {
                if (j > 0) {
                    TimeUnit.MILLISECONDS.sleep(j);
                }
                createProducer.send(createSession.createTextMessage(createContent(i2)));
            }
            if (z) {
                createSession.commit();
            }
        } finally {
            createConnection.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int receiveMessages(boolean z, String str, int i) throws JMSException {
        int i2 = 0;
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        try {
            createConnection.start();
            Session createSession = z ? createConnection.createSession(true, 0) : createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(new ActiveMQQueue(str));
            while (i2 < i && createConsumer.receive(4000L) != null) {
                i2++;
                if (z && i2 % 200 == 0) {
                    createSession.commit();
                }
            }
            if (z) {
                createSession.commit();
            }
            return i2;
        } finally {
            createConnection.close();
        }
    }

    private String createContent(int i) {
        StringBuilder sb = new StringBuilder(i + ":");
        while (sb.length() < 1024) {
            sb.append("*");
        }
        return sb.toString();
    }
}
