package org.apache.activemq.bugs;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.transport.nio.NIOSSLConcurrencyTest;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/MKahaDBTxRecoveryTest.class */
public class MKahaDBTxRecoveryTest {
    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
    private static final int maxFileLength = 33554432;
    private static final String PREFIX_DESTINATION_NAME = "queue";
    private static final String DESTINATION_NAME = "queue.test";
    private static final String DESTINATION_NAME_2 = "queue2.test";
    private static final int CLEANUP_INTERVAL_MILLIS = 500;
    BrokerService broker;
    private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList();

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected BrokerService createBroker(PersistenceAdapter persistenceAdapter) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.setBrokerName("localhost");
        brokerService.setPersistenceAdapter(persistenceAdapter);
        return brokerService;
    }

    @Test
    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
        prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.1
            public boolean isSatisified() throws Exception {
                return MKahaDBTxRecoveryTest.this.broker.getAdminView() != null;
            }
        }));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.2
            public TransactionId transform(TransactionId transactionId) {
                if (!atomicBoolean.get() || atomicInteger.incrementAndGet() <= 5) {
                    return ((TransactionIdTransformer) atomicReference.get()).transform(transactionId);
                }
                throw new RuntimeException("Bla");
            }
        };
        for (KahaDBPersistenceAdapter kahaDBPersistenceAdapter : this.kahadbs) {
            if (atomicReference.get() == null) {
                atomicReference.set(kahaDBPersistenceAdapter.getStore().getTransactionIdTransformer());
            }
            kahaDBPersistenceAdapter.setTransactionIdTransformer(transactionIdTransformer);
        }
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue("queue.test,queue2.test"));
        createProducer.send(createSession.createTextMessage("HI"));
        try {
            createSession.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
        Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
        final Destination destination = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
        final Destination destination2 = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.3
            public boolean isSatisified() throws Exception {
                return destination2.getMessageStore().getMessageCount() != destination.getMessageStore().getMessageCount();
            }
        }));
        atomicBoolean.set(false);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("HI"));
            createSession.commit();
        }
        this.broker.stop();
        prepareBrokerWithMultiStore(false);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.4
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                TimeUnit.SECONDS.sleep(2L);
                throw new RuntimeException("Sorry");
            }
        }});
        this.broker.start();
        this.broker.stop();
        prepareBrokerWithMultiStore(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertEquals(101L, this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)).getMessageStore().getMessageCount());
        Assert.assertEquals(101L, this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)).getMessageStore().getMessageCount());
    }

    @Test
    public void testManualRecoveryOnCorruptTxStore() throws Exception {
        prepareBrokerWithMultiStore(true);
        this.broker.getPersistenceAdapter().setCheckForCorruption(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        Assert.assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.5
            public boolean isSatisified() throws Exception {
                return MKahaDBTxRecoveryTest.this.broker.getAdminView() != null;
            }
        }));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.6
            public TransactionId transform(TransactionId transactionId) {
                if (!atomicBoolean.get() || atomicInteger.incrementAndGet() <= 5) {
                    return ((TransactionIdTransformer) atomicReference.get()).transform(transactionId);
                }
                throw new RuntimeException("Bla2");
            }
        };
        for (KahaDBPersistenceAdapter kahaDBPersistenceAdapter : this.kahadbs) {
            if (atomicReference.get() == null) {
                atomicReference.set(kahaDBPersistenceAdapter.getStore().getTransactionIdTransformer());
            }
            kahaDBPersistenceAdapter.setTransactionIdTransformer(transactionIdTransformer);
        }
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue("queue.test,queue2.test"));
        createProducer.send(createSession.createTextMessage("HI"));
        try {
            createSession.commit();
            Assert.fail("Expect commit failure on error injection!");
        } catch (Exception e) {
            e.printStackTrace();
        }
        Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
        final Destination destination = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
        final Destination destination2 = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
        Assert.assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.7
            public boolean isSatisified() throws Exception {
                return destination2.getMessageStore().getMessageCount() != destination.getMessageStore().getMessageCount();
            }
        }));
        atomicBoolean.set(false);
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("HI"));
            createSession.commit();
        }
        String dataDirectory = ((BrokerViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"), BrokerViewMBean.class, true)).getDataDirectory();
        this.broker.stop();
        corruptTxStoreJournal(dataDirectory);
        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        DefaultTestAppender defaultTestAppender = new DefaultTestAppender() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.8
            public void doAppend(LoggingEvent loggingEvent) {
                if (loggingEvent.getLevel().equals(Level.ERROR) && loggingEvent.getMessage().toString().startsWith("Corrupt ")) {
                    MKahaDBTxRecoveryTest.LOG.info("received expected log message: " + loggingEvent.getMessage());
                    atomicBoolean2.set(true);
                }
            }
        };
        logger.addAppender(defaultTestAppender);
        try {
            prepareBrokerWithMultiStore(false);
            this.broker.getPersistenceAdapter().setCheckForCorruption(true);
            this.broker.start();
            this.broker.waitUntilStarted();
            final Destination destination3 = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
            final Destination destination4 = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
            Assert.assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.9
                public boolean isSatisified() throws Exception {
                    return destination3.getMessageStore().getMessageCount() != destination4.getMessageStore().getMessageCount();
                }
            }));
            Assert.assertTrue("broker/store found corruption", atomicBoolean2.get());
            this.broker.stop();
            LOG.info("Check for journal read failure... no checksum");
            atomicBoolean2.set(false);
            prepareBrokerWithMultiStore(false);
            this.broker.getPersistenceAdapter().setCheckForCorruption(false);
            this.broker.start();
            this.broker.waitUntilStarted();
            final Destination destination5 = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
            final Destination destination6 = this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
            Assert.assertTrue("Partial commit - one dest still has message", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.10
                public boolean isSatisified() throws Exception {
                    return destination5.getMessageStore().getMessageCount() != destination6.getMessageStore().getMessageCount();
                }
            }));
            Assert.assertTrue("broker/store found corruption without checksum", atomicBoolean2.get());
            ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=*");
            Set queryNames = this.broker.getManagementContext().queryNames(objectName, (QueryExp) null);
            Assert.assertFalse(queryNames.isEmpty());
            Iterator it = queryNames.iterator();
            while (it.hasNext()) {
                RecoveredXATransactionViewMBean recoveredXATransactionViewMBean = (RecoveredXATransactionViewMBean) this.broker.getManagementContext().newProxyInstance((ObjectName) it.next(), RecoveredXATransactionViewMBean.class, true);
                Assert.assertEquals("matches ", recoveredXATransactionViewMBean.getFormatId(), 61616L);
                recoveredXATransactionViewMBean.heuristicCommit();
            }
            Assert.assertTrue(this.broker.getManagementContext().queryNames(objectName, (QueryExp) null).isEmpty());
            Assert.assertEquals(101L, this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)).getMessageStore().getMessageCount());
            Assert.assertEquals(101L, this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)).getMessageStore().getMessageCount());
            logger.removeAppender(defaultTestAppender);
        } catch (Throwable th) {
            logger.removeAppender(defaultTestAppender);
            throw th;
        }
    }

    @Test
    public void testCorruptionDetectedOnTruncateAndIgnored() throws Exception {
        prepareBrokerWithMultiStore(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue("queue.test,queue2.test"));
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createTextMessage("HI"));
            createSession.commit();
        }
        Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
        Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
        String dataDirectory = ((BrokerViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"), BrokerViewMBean.class, true)).getDataDirectory();
        this.broker.stop();
        corruptTxStoreJournalAndTruncate(dataDirectory);
        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        DefaultTestAppender defaultTestAppender = new DefaultTestAppender() { // from class: org.apache.activemq.bugs.MKahaDBTxRecoveryTest.11
            public void doAppend(LoggingEvent loggingEvent) {
                if (loggingEvent.getLevel().equals(Level.ERROR) && loggingEvent.getMessage().toString().startsWith("Corrupt ")) {
                    MKahaDBTxRecoveryTest.LOG.info("received expected log message: " + loggingEvent.getMessage());
                    atomicBoolean.set(true);
                } else if (loggingEvent.getLevel().equals(Level.INFO) && loggingEvent.getMessage().toString().contains("auto resolving")) {
                    atomicBoolean2.set(true);
                }
            }
        };
        logger.addAppender(defaultTestAppender);
        try {
            prepareBrokerWithMultiStore(false);
            this.broker.start();
            this.broker.waitUntilStarted();
            Assert.assertTrue("broker/store found corruption", atomicBoolean.get());
            Assert.assertTrue("broker/store ignored corruption", atomicBoolean2.get());
            this.broker.stop();
            atomicBoolean.set(false);
            atomicBoolean2.set(false);
            prepareBrokerWithMultiStore(false);
            this.broker.start();
            this.broker.waitUntilStarted();
            Assert.assertFalse("broker/store no corruption", atomicBoolean.get());
            Assert.assertFalse("broker/store no ignored corruption", atomicBoolean2.get());
            Connection createConnection2 = activeMQConnectionFactory.createConnection();
            createConnection2.start();
            Session createSession2 = createConnection2.createSession(true, 0);
            MessageProducer createProducer2 = createSession2.createProducer(new ActiveMQQueue("queue.test,queue2.test"));
            for (int i2 = 0; i2 < 20; i2++) {
                createProducer2.send(createSession2.createTextMessage("HI"));
                createSession2.commit();
            }
            Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
            Assert.assertNotNull(this.broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
            this.broker.stop();
            logger.removeAppender(defaultTestAppender);
        } catch (Throwable th) {
            logger.removeAppender(defaultTestAppender);
            throw th;
        }
    }

    private void corruptTxStoreJournal(String str) throws Exception {
        corruptTxStore(str, false);
    }

    private void corruptTxStoreJournalAndTruncate(String str) throws Exception {
        corruptTxStore(str, true);
    }

    private void corruptTxStore(String str, boolean z) throws Exception {
        LOG.info("Path to broker datadir: " + str);
        RandomAccessFile randomAccessFile = new RandomAccessFile(String.format("%s/mKahaDB/txStore/db-1.log", str), "rw");
        ByteSequence byteSequence = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER);
        byte[] bArr = new byte[20480];
        ByteSequence byteSequence2 = new ByteSequence(bArr, 0, randomAccessFile.read(bArr, 0, bArr.length));
        int indexOf = byteSequence2.indexOf(byteSequence, byteSequence2.indexOf(byteSequence, byteSequence2.indexOf(byteSequence, 1) + 1) + 1);
        LOG.info("3rd batch record in file: 1:" + indexOf);
        int i = indexOf + Journal.BATCH_CONTROL_RECORD_SIZE + 4 + 1;
        LOG.info("Whacking batch record in file:1, at offset: " + i + " with fill:-81");
        byte[] bArr2 = new byte[2];
        Arrays.fill(bArr2, (byte) -81);
        randomAccessFile.seek(i);
        randomAccessFile.write(bArr2, 0, bArr2.length);
        if (z) {
            randomAccessFile.setLength(randomAccessFile.getFilePointer());
        }
        randomAccessFile.getFD().sync();
    }

    protected KahaDBPersistenceAdapter createStore(boolean z) throws IOException {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(maxFileLength);
        kahaDBPersistenceAdapter.setCleanupInterval(500L);
        if (z) {
            kahaDBPersistenceAdapter.deleteAllMessages();
        }
        this.kahadbs.add(kahaDBPersistenceAdapter);
        return kahaDBPersistenceAdapter;
    }

    public void prepareBrokerWithMultiStore(boolean z) throws Exception {
        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
        if (z) {
            multiKahaDBPersistenceAdapter.deleteAllMessages();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, z));
        arrayList.add(createFilteredKahaDBByDestinationPrefix("queue2", z));
        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(arrayList);
        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(NIOSSLConcurrencyTest.MESSAGE_SIZE);
        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10L);
        this.broker = createBroker(multiKahaDBPersistenceAdapter);
    }

    private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String str, boolean z) throws IOException {
        FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
        filteredKahaDBPersistenceAdapter.setPersistenceAdapter(createStore(z));
        if (str != null) {
            filteredKahaDBPersistenceAdapter.setQueue(str + ".>");
        }
        return filteredKahaDBPersistenceAdapter;
    }
}
