/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorePerDestinationTest {
    static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class);
    static final int maxFileLength = 102400;
    static final int numToSend = 5000;
    final Vector<Throwable> exceptions = new Vector();
    BrokerService brokerService;

    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(false);
        broker.setPersistenceAdapter(kaha);
        return broker;
    }

    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
        kaha.setJournalMaxFileLength(102400);
        kaha.setCleanupInterval(5000L);
        if (delete) {
            kaha.deleteAllMessages();
        }
        return kaha;
    }

    @Before
    public void prepareCleanBrokerWithMultiStore() throws Exception {
        this.prepareBrokerWithMultiStore(true);
    }

    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (deleteAllMessages) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        FilteredKahaDBPersistenceAdapter theRest = new FilteredKahaDBPersistenceAdapter();
        theRest.setPersistenceAdapter(this.createStore(deleteAllMessages));
        adapters.add(theRest);
        FilteredKahaDBPersistenceAdapter fastQStore = new FilteredKahaDBPersistenceAdapter();
        fastQStore.setPersistenceAdapter(this.createStore(deleteAllMessages));
        fastQStore.setDestination((ActiveMQDestination)new ActiveMQQueue("FastQ"));
        adapters.add(fastQStore);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
        this.brokerService = this.createBroker((PersistenceAdapter)multiKahaDBPersistenceAdapter);
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void testTransactedSendReceive() throws Exception {
        this.brokerService.start();
        this.sendMessages(true, "SlowQ", 1, 0L);
        Assert.assertEquals((String)"got one", (long)1L, (long)this.receiveMessages(true, "SlowQ", 1));
    }

    @Test
    public void testTransactedSendReceiveAcrossStores() throws Exception {
        this.brokerService.start();
        this.sendMessages(true, "SlowQ,FastQ", 1, 0L);
        Assert.assertEquals((String)"got one", (long)2L, (long)this.receiveMessages(true, "SlowQ,FastQ", 2));
    }

    @Test
    public void testCommitRecovery() throws Exception {
        this.doTestRecovery(true);
    }

    @Test
    public void testRollbackRecovery() throws Exception {
        this.doTestRecovery(false);
    }

    public void doTestRecovery(final boolean haveOutcome) throws Exception {
        final MultiKahaDBPersistenceAdapter persistenceAdapter = (MultiKahaDBPersistenceAdapter)this.brokerService.getPersistenceAdapter();
        MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(persistenceAdapter){

            public void persistOutcome(MultiKahaDBTransactionStore.Tx tx, TransactionId txid) throws IOException {
                if (haveOutcome) {
                    super.persistOutcome(tx, txid);
                }
                try {
                    persistenceAdapter.stop();
                }
                catch (Exception e) {
                    LOG.error("ex on stop ", (Throwable)e);
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        };
        persistenceAdapter.setTransactionStore(transactionStore);
        this.brokerService.start();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    StorePerDestinationTest.this.sendMessages(true, "SlowQ,FastQ", 1, 0L);
                }
                catch (Exception expected) {
                    LOG.info("expected", (Throwable)expected);
                }
            }
        });
        this.brokerService.waitUntilStopped();
        executorService.shutdownNow();
        this.prepareBrokerWithMultiStore(false);
        this.brokerService.start();
        Assert.assertEquals((String)"expect to get the recovered message", (long)(haveOutcome ? 2L : 0L), (long)this.receiveMessages(false, "SlowQ,FastQ", 2));
        Assert.assertEquals((String)"all transactions are complete", (long)0L, (long)this.brokerService.getBroker().getPreparedTransactions(null).length);
    }

    @Test
    public void testDirectoryDefault() throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        FilteredKahaDBPersistenceAdapter otherFilteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter otherStore = this.createStore(false);
        File someOtherDisk = new File("target" + File.separator + "someOtherDisk");
        otherStore.setDirectory(someOtherDisk);
        otherFilteredKahaDBPersistenceAdapter.setPersistenceAdapter(otherStore);
        otherFilteredKahaDBPersistenceAdapter.setDestination((ActiveMQDestination)new ActiveMQQueue("Other"));
        adapters.add(otherFilteredKahaDBPersistenceAdapter);
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapterDefault = new FilteredKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter storeDefault = this.createStore(false);
        filteredKahaDBPersistenceAdapterDefault.setPersistenceAdapter(storeDefault);
        adapters.add(filteredKahaDBPersistenceAdapterDefault);
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
        Assert.assertEquals((Object)multiKahaDBPersistenceAdapter.getDirectory(), (Object)storeDefault.getDirectory().getParentFile());
        Assert.assertEquals((Object)someOtherDisk, (Object)otherStore.getDirectory().getParentFile());
    }

    @Test
    public void testSlowFastDestinationsStoreUsage() throws Exception {
        this.brokerService.start();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    StorePerDestinationTest.this.sendMessages(false, "SlowQ", 50, 500L);
                }
                catch (Exception e) {
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    StorePerDestinationTest.this.sendMessages(false, "FastQ", 5000, 0L);
                }
                catch (Exception e) {
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    Assert.assertEquals((String)"Got all sent", (long)5000L, (long)StorePerDestinationTest.this.receiveMessages(false, "FastQ", 5000));
                }
                catch (Exception e) {
                    StorePerDestinationTest.this.exceptions.add(e);
                }
            }
        });
        executorService.shutdown();
        Assert.assertTrue((String)"consumers executor finished on time", (boolean)executorService.awaitTermination(300L, TimeUnit.SECONDS));
        final SystemUsage usage = this.brokerService.getSystemUsage();
        Assert.assertTrue((String)"Store is not hogged", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long storeUsage = usage.getStoreUsage().getUsage();
                LOG.info("Store Usage: " + storeUsage);
                return storeUsage < 512000L;
            }
        }));
        Assert.assertTrue((String)"no exceptions", (boolean)this.exceptions.isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessages(boolean transacted, String destName, int count, long sleep) throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = cf.createConnection();
        try {
            Session session = transacted ? connection.createSession(true, 0) : connection.createSession(false, 1);
            MessageProducer producer = session.createProducer((Destination)new ActiveMQQueue(destName));
            for (int i = 0; i < count; ++i) {
                if (sleep > 0L) {
                    TimeUnit.MILLISECONDS.sleep(sleep);
                }
                producer.send((Message)session.createTextMessage(this.createContent(i)));
            }
            if (transacted) {
                session.commit();
            }
        }
        finally {
            connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int receiveMessages(boolean transacted, String destName, int max) throws JMSException {
        int rc = 0;
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = cf.createConnection();
        try {
            connection.start();
            Session session = transacted ? connection.createSession(true, 0) : connection.createSession(false, 1);
            MessageConsumer messageConsumer = session.createConsumer((Destination)new ActiveMQQueue(destName));
            while (rc < max && messageConsumer.receive(4000L) != null) {
                if (!transacted || ++rc % 200 != 0) continue;
                session.commit();
            }
            if (transacted) {
                session.commit();
            }
            int n = rc;
            return n;
        }
        finally {
            connection.close();
        }
    }

    private String createContent(int i) {
        StringBuilder sb = new StringBuilder(i + ":");
        while (sb.length() < 1024) {
            sb.append("*");
        }
        return sb.toString();
    }
}

