/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MKahaDBTxRecoveryTest {
    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
    private static final int maxFileLength = 0x2000000;
    private static final String PREFIX_DESTINATION_NAME = "queue";
    private static final String DESTINATION_NAME = "queue.test";
    private static final String DESTINATION_NAME_2 = "queue2.test";
    private static final int CLEANUP_INTERVAL_MILLIS = 500;
    BrokerService broker;
    private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>();

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(true);
        broker.setBrokerName("localhost");
        broker.setPersistenceAdapter(kaha);
        return broker;
    }

    @Test
    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
        this.prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue((String)"Broker doesn't have an Admin View.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return MKahaDBTxRecoveryTest.this.broker.getAdminView() != null;
            }
        }));
        final AtomicBoolean injectFailure = new AtomicBoolean(true);
        final AtomicInteger reps = new AtomicInteger();
        final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
        TransactionIdTransformer faultInjector = new TransactionIdTransformer(){

            public TransactionId transform(TransactionId txid) {
                if (injectFailure.get() && reps.incrementAndGet() > 5) {
                    throw new RuntimeException("Bla");
                }
                return ((TransactionIdTransformer)delegate.get()).transform(txid);
            }
        };
        for (KahaDBPersistenceAdapter pa : this.kahadbs) {
            if (delegate.get() == null) {
                delegate.set(pa.getStore().getTransactionIdTransformer());
            }
            pa.setTransactionIdTransformer(faultInjector);
        }
        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
        f.setAlwaysSyncSend(true);
        Connection c = f.createConnection();
        c.start();
        Session s = c.createSession(true, 0);
        MessageProducer producer = s.createProducer((Destination)new ActiveMQQueue("queue.test,queue2.test"));
        producer.send((Message)s.createTextMessage("HI"));
        try {
            s.commit();
        }
        catch (Exception expected) {
            expected.printStackTrace();
        }
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2)));
        final org.apache.activemq.broker.region.Destination destination1 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
        final org.apache.activemq.broker.region.Destination destination2 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertTrue((String)"Partial commit - one dest has message", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
            }
        }));
        injectFailure.set(false);
        for (int i = 0; i < 100; ++i) {
            producer.send((Message)s.createTextMessage("HI"));
            s.commit();
        }
        this.broker.stop();
        this.prepareBrokerWithMultiStore(false);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
                TimeUnit.SECONDS.sleep(2L);
                throw new RuntimeException("Sorry");
            }
        }});
        this.broker.start();
        this.broker.stop();
        this.prepareBrokerWithMultiStore(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        org.apache.activemq.broker.region.Destination destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
        Assert.assertEquals((long)101L, (long)destination.getMessageStore().getMessageCount());
        destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertEquals((long)101L, (long)destination.getMessageStore().getMessageCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testManualRecoveryOnCorruptTxStore() throws Exception {
        this.prepareBrokerWithMultiStore(true);
        ((MultiKahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).setCheckForCorruption(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue((String)"Broker doesn't have an Admin View.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return MKahaDBTxRecoveryTest.this.broker.getAdminView() != null;
            }
        }));
        final AtomicBoolean injectFailure = new AtomicBoolean(true);
        final AtomicInteger reps = new AtomicInteger();
        final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
        TransactionIdTransformer faultInjector = new TransactionIdTransformer(){

            public TransactionId transform(TransactionId txid) {
                if (injectFailure.get() && reps.incrementAndGet() > 5) {
                    throw new RuntimeException("Bla2");
                }
                return ((TransactionIdTransformer)delegate.get()).transform(txid);
            }
        };
        for (KahaDBPersistenceAdapter pa : this.kahadbs) {
            if (delegate.get() == null) {
                delegate.set(pa.getStore().getTransactionIdTransformer());
            }
            pa.setTransactionIdTransformer(faultInjector);
        }
        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
        f.setAlwaysSyncSend(true);
        Connection c = f.createConnection();
        c.start();
        Session s = c.createSession(true, 0);
        MessageProducer producer = s.createProducer((Destination)new ActiveMQQueue("queue.test,queue2.test"));
        producer.send((Message)s.createTextMessage("HI"));
        try {
            s.commit();
            Assert.fail((String)"Expect commit failure on error injection!");
        }
        catch (Exception expected) {
            expected.printStackTrace();
        }
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2)));
        final org.apache.activemq.broker.region.Destination destination1 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
        final org.apache.activemq.broker.region.Destination destination2 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertTrue((String)"Partial commit - one dest has message", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
            }
        }));
        injectFailure.set(false);
        for (int i = 0; i < 100; ++i) {
            producer.send((Message)s.createTextMessage("HI"));
            s.commit();
        }
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
        BrokerViewMBean brokerViewMBean = (BrokerViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true);
        String pathToDataDir = brokerViewMBean.getDataDirectory();
        this.broker.stop();
        this.corruptTxStoreJournal(pathToDataDir);
        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class);
        final AtomicBoolean foundSomeCorruption = new AtomicBoolean();
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel().equals((Object)Level.ERROR) && event.getMessage().toString().startsWith("Corrupt ")) {
                    LOG.info("received expected log message: " + event.getMessage());
                    foundSomeCorruption.set(true);
                }
            }
        };
        log4jLogger.addAppender((Appender)appender);
        try {
            this.prepareBrokerWithMultiStore(false);
            ((MultiKahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).setCheckForCorruption(true);
            this.broker.start();
            this.broker.waitUntilStarted();
            final org.apache.activemq.broker.region.Destination dest1 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
            final org.apache.activemq.broker.region.Destination dest2 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
            Assert.assertTrue((String)"Partial commit - one dest has message", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return dest1.getMessageStore().getMessageCount() != dest2.getMessageStore().getMessageCount();
                }
            }));
            Assert.assertTrue((String)"broker/store found corruption", (boolean)foundSomeCorruption.get());
            this.broker.stop();
            LOG.info("Check for journal read failure... no checksum");
            foundSomeCorruption.set(false);
            this.prepareBrokerWithMultiStore(false);
            ((MultiKahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).setCheckForCorruption(false);
            this.broker.start();
            this.broker.waitUntilStarted();
            dest1 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
            dest2 = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
            Assert.assertTrue((String)"Partial commit - one dest still has message", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return dest1.getMessageStore().getMessageCount() != dest2.getMessageStore().getMessageCount();
                }
            }));
            Assert.assertTrue((String)"broker/store found corruption without checksum", (boolean)foundSomeCorruption.get());
            ObjectName matchAllPendingTx = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=*");
            Set pendingTx = this.broker.getManagementContext().queryNames(matchAllPendingTx, null);
            Assert.assertFalse((boolean)pendingTx.isEmpty());
            for (ObjectName pendingXAtxOn : pendingTx) {
                RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean)this.broker.getManagementContext().newProxyInstance(pendingXAtxOn, RecoveredXATransactionViewMBean.class, true);
                Assert.assertEquals((String)"matches ", (long)proxy.getFormatId(), (long)61616L);
                proxy.heuristicCommit();
            }
            pendingTx = this.broker.getManagementContext().queryNames(matchAllPendingTx, null);
            Assert.assertTrue((boolean)pendingTx.isEmpty());
            org.apache.activemq.broker.region.Destination destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME));
            Assert.assertEquals((long)101L, (long)destination.getMessageStore().getMessageCount());
            destination = this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2));
            Assert.assertEquals((long)101L, (long)destination.getMessageStore().getMessageCount());
        }
        finally {
            log4jLogger.removeAppender((Appender)appender);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCorruptionDetectedOnTruncateAndIgnored() throws Exception {
        this.prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
        f.setAlwaysSyncSend(true);
        Connection c = f.createConnection();
        c.start();
        Session s = c.createSession(true, 0);
        MessageProducer producer = s.createProducer((Destination)new ActiveMQQueue("queue.test,queue2.test"));
        for (int i = 0; i < 20; ++i) {
            producer.send((Message)s.createTextMessage("HI"));
            s.commit();
        }
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2)));
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
        BrokerViewMBean brokerViewMBean = (BrokerViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true);
        String pathToDataDir = brokerViewMBean.getDataDirectory();
        this.broker.stop();
        this.corruptTxStoreJournalAndTruncate(pathToDataDir);
        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class);
        final AtomicBoolean foundSomeCorruption = new AtomicBoolean();
        final AtomicBoolean ignoringCorruption = new AtomicBoolean();
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel().equals((Object)Level.ERROR) && event.getMessage().toString().startsWith("Corrupt ")) {
                    LOG.info("received expected log message: " + event.getMessage());
                    foundSomeCorruption.set(true);
                } else if (event.getLevel().equals((Object)Level.INFO) && event.getMessage().toString().contains("auto resolving")) {
                    ignoringCorruption.set(true);
                }
            }
        };
        log4jLogger.addAppender((Appender)appender);
        try {
            this.prepareBrokerWithMultiStore(false);
            this.broker.start();
            this.broker.waitUntilStarted();
            Assert.assertTrue((String)"broker/store found corruption", (boolean)foundSomeCorruption.get());
            Assert.assertTrue((String)"broker/store ignored corruption", (boolean)ignoringCorruption.get());
            this.broker.stop();
            foundSomeCorruption.set(false);
            ignoringCorruption.set(false);
            this.prepareBrokerWithMultiStore(false);
            this.broker.start();
            this.broker.waitUntilStarted();
            Assert.assertFalse((String)"broker/store no corruption", (boolean)foundSomeCorruption.get());
            Assert.assertFalse((String)"broker/store no ignored corruption", (boolean)ignoringCorruption.get());
            Connection connection = f.createConnection();
            connection.start();
            Session session = connection.createSession(true, 0);
            MessageProducer messageProducer = session.createProducer((Destination)new ActiveMQQueue("queue.test,queue2.test"));
            for (int i = 0; i < 20; ++i) {
                messageProducer.send((Message)session.createTextMessage("HI"));
                session.commit();
            }
            Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME)));
            Assert.assertNotNull((Object)this.broker.getDestination((ActiveMQDestination)new ActiveMQQueue(DESTINATION_NAME_2)));
            this.broker.stop();
        }
        finally {
            log4jLogger.removeAppender((Appender)appender);
        }
    }

    private void corruptTxStoreJournal(String pathToDataDir) throws Exception {
        this.corruptTxStore(pathToDataDir, false);
    }

    private void corruptTxStoreJournalAndTruncate(String pathToDataDir) throws Exception {
        this.corruptTxStore(pathToDataDir, true);
    }

    private void corruptTxStore(String pathToDataDir, boolean truncate) throws Exception {
        LOG.info("Path to broker datadir: " + pathToDataDir);
        RandomAccessFile randomAccessFile = new RandomAccessFile(String.format("%s/mKahaDB/txStore/db-1.log", pathToDataDir), "rw");
        ByteSequence header = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER);
        byte[] data = new byte[20480];
        ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data, 0, data.length));
        int offset = bs.indexOf(header, 1);
        offset = bs.indexOf(header, offset + 1);
        offset = bs.indexOf(header, offset + 1);
        LOG.info("3rd batch record in file: 1:" + offset);
        offset += Journal.BATCH_CONTROL_RECORD_SIZE;
        offset += 4;
        byte fill = -81;
        LOG.info("Whacking batch record in file:1, at offset: " + ++offset + " with fill:" + fill);
        byte[] bla = new byte[2];
        Arrays.fill(bla, fill);
        randomAccessFile.seek(offset);
        randomAccessFile.write(bla, 0, bla.length);
        if (truncate) {
            randomAccessFile.setLength(randomAccessFile.getFilePointer());
        }
        randomAccessFile.getFD().sync();
    }

    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
        kaha.setJournalMaxFileLength(0x2000000);
        kaha.setCleanupInterval(500L);
        if (delete) {
            kaha.deleteAllMessages();
        }
        this.kahadbs.add(kaha);
        return kaha;
    }

    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (deleteAllMessages) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        adapters.add(this.createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
        adapters.add(this.createFilteredKahaDBByDestinationPrefix("queue2", deleteAllMessages));
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4096);
        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10L);
        this.broker = this.createBroker((PersistenceAdapter)multiKahaDBPersistenceAdapter);
    }

    private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages) throws IOException {
        FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
        template.setPersistenceAdapter((PersistenceAdapter)this.createStore(deleteAllMessages));
        if (destinationPrefix != null) {
            template.setQueue(destinationPrefix + ".>");
        }
        return template;
    }
}

