/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MKahaDBTxRecoveryTest {
    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
    private static final int maxFileLength = 0x2000000;
    private static final String PREFIX_DESTINATION_NAME = "queue";
    private static final String DESTINATION_NAME = "queue.test";
    private static final String DESTINATION_NAME_2 = "queue2.test";
    private static final int CLEANUP_INTERVAL_MILLIS = 500;
    BrokerService broker;
    private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>();

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(true);
        broker.setBrokerName("localhost");
        broker.setPersistenceAdapter(kaha);
        return broker;
    }

    @Test
    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
        this.prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue((String)"Broker doesn't have an Admin View.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return MKahaDBTxRecoveryTest.this.broker.getAdminView() != null;
            }
        }));
        final AtomicBoolean injectFailure = new AtomicBoolean(true);
        final AtomicInteger reps = new AtomicInteger();
        final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
        TransactionIdTransformer faultInjector = new TransactionIdTransformer(){

            public TransactionId transform(TransactionId txid) {
                if (injectFailure.get() && reps.incrementAndGet() > 5) {
                    throw new RuntimeException("Bla");
                }
                return ((TransactionIdTransformer)delegate.get()).transform(txid);
            }
        };
        for (KahaDBPersistenceAdapter pa : this.kahadbs) {
            if (delegate.get() == null) {
                delegate.set(pa.getStore().getTransactionIdTransformer());
            }
            pa.setTransactionIdTransformer(faultInjector);
        }
        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
        f.setAlwaysSyncSend(true);
        Connection c = f.createConnection();
        c.start();
        Session s = c.createSession(true, 0);
        MessageProducer producer = s.createProducer((Destination)new ActiveMQQueue("queue.test,queue2.test"));
        producer.send((Message)s.createTextMessage("HI"));
        try {
            s.commit();
        }
        catch (Exception expected) {
            expected.printStackTrace();
        }
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2)));
        final org.apache.activemq.broker.region.Destination destination1 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
        final org.apache.activemq.broker.region.Destination destination2 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertTrue((String)"Partial commit - one dest has message", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
            }
        }));
        injectFailure.set(false);
        for (int i = 0; i < 100; ++i) {
            producer.send((Message)s.createTextMessage("HI"));
            s.commit();
        }
        this.broker.stop();
        this.prepareBrokerWithMultiStore(false);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
                TimeUnit.SECONDS.sleep(2L);
                throw new RuntimeException("Sorry");
            }
        }});
        this.broker.start();
        this.broker.stop();
        this.prepareBrokerWithMultiStore(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        org.apache.activemq.broker.region.Destination destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
        Assert.assertEquals((long)101L, (long)destination.getMessageStore().getMessageCount());
        destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertEquals((long)101L, (long)destination.getMessageStore().getMessageCount());
    }

    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
        kaha.setJournalMaxFileLength(0x2000000);
        kaha.setCleanupInterval(500L);
        if (delete) {
            kaha.deleteAllMessages();
        }
        this.kahadbs.add(kaha);
        return kaha;
    }

    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (deleteAllMessages) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        adapters.add(this.createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
        adapters.add(this.createFilteredKahaDBByDestinationPrefix("queue2", deleteAllMessages));
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4096);
        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10L);
        this.broker = this.createBroker((PersistenceAdapter)multiKahaDBPersistenceAdapter);
    }

    private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages) throws IOException {
        FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
        template.setPersistenceAdapter((PersistenceAdapter)this.createStore(deleteAllMessages));
        if (destinationPrefix != null) {
            template.setQueue(destinationPrefix + ".>");
        }
        return template;
    }
}

