package org.apache.activemq.store.kahadb;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.class */
public class MultiKahaDBTransactionStore implements TransactionStore {
    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
    private Journal journal;
    final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<>();
    final Set<TransactionId> recoveredPendingCommit = new HashSet();
    private int journalMaxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
    private int journalWriteBatchSize = 4194304;

    /* loaded from: input_file:org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore$Tx.class */
    public class Tx {
        private final Set<TransactionStore> stores = new HashSet();
        private int prepareLocationId = 0;

        public Tx() {
        }

        public void trackStore(TransactionStore transactionStore) {
            this.stores.add(transactionStore);
        }

        public Set<TransactionStore> getStores() {
            return this.stores;
        }

        public void trackPrepareLocation(Location location) {
            this.prepareLocationId = location.getDataFileId();
        }

        public int getPreparedLocationId() {
            return this.prepareLocationId;
        }
    }

    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
    }

    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
        return new ProxyMessageStore(messageStore) { // from class: org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.1
            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
                MultiKahaDBTransactionStore.this.addMessage(transactionStore, connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public Future<Object> asyncAddQueueMessage(ConnectionContext connectionContext, Message message) throws IOException {
                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, connectionContext, getDelegate(), messageAck);
            }

            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public void removeAsyncMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, connectionContext, getDelegate(), messageAck);
            }
        };
    }

    public TopicMessageStore proxy(final TransactionStore transactionStore, TopicMessageStore topicMessageStore) {
        return new ProxyTopicMessageStore(topicMessageStore) { // from class: org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.2
            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
                MultiKahaDBTransactionStore.this.addMessage(transactionStore, connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public Future<Object> asyncAddTopicMessage(ConnectionContext connectionContext, Message message) throws IOException {
                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, connectionContext, getDelegate(), messageAck);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public void removeAsyncMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, connectionContext, getDelegate(), messageAck);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.TopicMessageStore
            public void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, connectionContext, (TopicMessageStore) getDelegate(), str, str2, messageId, messageAck);
            }
        };
    }

    public void deleteAllMessages() {
        IOHelper.deleteChildren(getDirectory());
    }

    public int getJournalMaxFileLength() {
        return this.journalMaxFileLength;
    }

    public void setJournalMaxFileLength(int i) {
        this.journalMaxFileLength = i;
    }

    public int getJournalMaxWriteBatchSize() {
        return this.journalWriteBatchSize;
    }

    public void setJournalMaxWriteBatchSize(int i) {
        this.journalWriteBatchSize = i;
    }

    public Tx getTx(TransactionId transactionId) {
        Tx tx = this.inflightTransactions.get(transactionId);
        if (tx == null) {
            tx = new Tx();
            this.inflightTransactions.put(transactionId, tx);
        }
        return tx;
    }

    public Tx removeTx(TransactionId transactionId) {
        return this.inflightTransactions.remove(transactionId);
    }

    @Override // org.apache.activemq.store.TransactionStore
    public void prepare(TransactionId transactionId) throws IOException {
        Iterator<TransactionStore> it = getTx(transactionId).getStores().iterator();
        while (it.hasNext()) {
            it.next().prepare(transactionId);
        }
    }

    @Override // org.apache.activemq.store.TransactionStore
    public void commit(TransactionId transactionId, boolean z, Runnable runnable, Runnable runnable2) throws IOException {
        if (runnable != null) {
            runnable.run();
        }
        Tx tx = getTx(transactionId);
        if (z) {
            Iterator<TransactionStore> it = tx.getStores().iterator();
            while (it.hasNext()) {
                it.next().commit(transactionId, true, null, null);
            }
        } else if (tx.getStores().size() == 1) {
            Iterator<TransactionStore> it2 = tx.getStores().iterator();
            while (it2.hasNext()) {
                it2.next().commit(transactionId, false, null, null);
            }
        } else {
            Iterator<TransactionStore> it3 = tx.getStores().iterator();
            while (it3.hasNext()) {
                it3.next().prepare(transactionId);
            }
            persistOutcome(tx, transactionId);
            Iterator<TransactionStore> it4 = tx.getStores().iterator();
            while (it4.hasNext()) {
                it4.next().commit(transactionId, true, null, null);
            }
            persistCompletion(transactionId);
        }
        removeTx(transactionId);
        if (runnable2 != null) {
            runnable2.run();
        }
    }

    public void persistOutcome(Tx tx, TransactionId transactionId) throws IOException {
        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(this.multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(transactionId))));
    }

    public void persistCompletion(TransactionId transactionId) throws IOException {
        store(new KahaCommitCommand().setTransactionInfo(this.multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(transactionId)));
    }

    private Location store(JournalCommand<?> journalCommand) throws IOException {
        DataByteArrayOutputStream dataByteArrayOutputStream = new DataByteArrayOutputStream(journalCommand.serializedSizeFramed() + 1);
        dataByteArrayOutputStream.writeByte(journalCommand.type().getNumber());
        journalCommand.writeFramed((OutputStream) dataByteArrayOutputStream);
        Location write = this.journal.write(dataByteArrayOutputStream.toByteSequence(), true);
        this.journal.setLastAppendLocation(write);
        return write;
    }

    @Override // org.apache.activemq.store.TransactionStore
    public void rollback(TransactionId transactionId) throws IOException {
        Tx removeTx = removeTx(transactionId);
        if (removeTx != null) {
            Iterator<TransactionStore> it = removeTx.getStores().iterator();
            while (it.hasNext()) {
                it.next().rollback(transactionId);
            }
        }
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        this.journal = new Journal() { // from class: org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.3
            protected void cleanup() {
                super.cleanup();
                MultiKahaDBTransactionStore.this.txStoreCleanup();
            }
        };
        this.journal.setDirectory(getDirectory());
        this.journal.setMaxFileLength(this.journalMaxFileLength);
        this.journal.setWriteBatchSize(this.journalWriteBatchSize);
        IOHelper.mkdirs(this.journal.getDirectory());
        this.journal.start();
        recoverPendingLocalTransactions();
        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void txStoreCleanup() {
        TreeSet treeSet = new TreeSet(this.journal.getFileMap().keySet());
        Iterator<Tx> it = this.inflightTransactions.values().iterator();
        while (it.hasNext()) {
            treeSet.remove(Integer.valueOf(it.next().getPreparedLocationId()));
        }
        try {
            this.journal.removeDataFiles(treeSet);
        } catch (Exception e) {
            LOG.error(this + ", Failed to remove tx journal datafiles " + treeSet);
        }
    }

    private File getDirectory() {
        return new File(this.multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        this.journal.close();
        this.journal = null;
    }

    private void recoverPendingLocalTransactions() throws IOException {
        Location nextLocation = this.journal.getNextLocation((Location) null);
        while (true) {
            Location location = nextLocation;
            if (location == null) {
                this.recoveredPendingCommit.addAll(this.inflightTransactions.keySet());
                LOG.info("pending local transactions: " + this.recoveredPendingCommit);
                return;
            } else {
                process(load(location));
                nextLocation = this.journal.getNextLocation(location);
            }
        }
    }

    public JournalCommand<?> load(Location location) throws IOException {
        DataByteArrayInputStream dataByteArrayInputStream = new DataByteArrayInputStream(this.journal.read(location));
        KahaEntryType valueOf = KahaEntryType.valueOf(dataByteArrayInputStream.readByte());
        if (valueOf == null) {
            throw new IOException("Could not load journal record. Invalid location: " + location);
        }
        JournalCommand<?> journalCommand = (JournalCommand) valueOf.createMessage();
        journalCommand.mergeFramed((InputStream) dataByteArrayInputStream);
        return journalCommand;
    }

    public void process(JournalCommand<?> journalCommand) throws IOException {
        switch (journalCommand.type()) {
            case KAHA_PREPARE_COMMAND:
                getTx(TransactionIdConversion.convert(((KahaPrepareCommand) journalCommand).getTransactionInfo()));
                return;
            case KAHA_COMMIT_COMMAND:
                removeTx(TransactionIdConversion.convert(((KahaCommitCommand) journalCommand).getTransactionInfo()));
                return;
            case KAHA_TRACE_COMMAND:
                return;
            default:
                throw new IOException("Unexpected command in transaction journal: " + journalCommand);
        }
    }

    @Override // org.apache.activemq.store.TransactionStore
    public synchronized void recover(final TransactionRecoveryListener transactionRecoveryListener) throws IOException {
        for (final KahaDBPersistenceAdapter kahaDBPersistenceAdapter : this.multiKahaDBPersistenceAdapter.adapters) {
            kahaDBPersistenceAdapter.createTransactionStore().recover(new TransactionRecoveryListener() { // from class: org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.4
                @Override // org.apache.activemq.store.TransactionRecoveryListener
                public void recover(XATransactionId xATransactionId, Message[] messageArr, MessageAck[] messageAckArr) {
                    try {
                        MultiKahaDBTransactionStore.this.getTx(xATransactionId).trackStore(kahaDBPersistenceAdapter.createTransactionStore());
                    } catch (IOException e) {
                        MultiKahaDBTransactionStore.LOG.error("Failed to access transaction store: " + kahaDBPersistenceAdapter + " for prepared xa tid: " + xATransactionId, e);
                    }
                    transactionRecoveryListener.recover(xATransactionId, messageArr, messageAckArr);
                }
            });
        }
        try {
            Broker broker = this.multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
            for (TransactionId transactionId : broker.getPreparedTransactions(null)) {
                if (this.multiKahaDBPersistenceAdapter.isLocalXid(transactionId)) {
                    try {
                        if (this.recoveredPendingCommit.contains(transactionId)) {
                            LOG.info("delivering pending commit outcome for tid: " + transactionId);
                            broker.commitTransaction(null, transactionId, false);
                        } else {
                            LOG.info("delivering rollback outcome to store for tid: " + transactionId);
                            broker.forgetTransaction(null, transactionId);
                        }
                        persistCompletion(transactionId);
                    } catch (Exception e) {
                        LOG.error("failed to deliver pending outcome for tid: " + transactionId, e);
                    }
                }
            }
        } catch (Exception e2) {
            LOG.error("failed to resolve pending local transactions", e2);
        }
    }

    void addMessage(TransactionStore transactionStore, ConnectionContext connectionContext, MessageStore messageStore, Message message) throws IOException {
        if (message.getTransactionId() != null) {
            getTx(message.getTransactionId()).trackStore(transactionStore);
        }
        messageStore.addMessage(connectionContext, message);
    }

    Future<Object> asyncAddQueueMessage(TransactionStore transactionStore, ConnectionContext connectionContext, MessageStore messageStore, Message message) throws IOException {
        if (message.getTransactionId() == null) {
            return messageStore.asyncAddQueueMessage(connectionContext, message);
        }
        getTx(message.getTransactionId()).trackStore(transactionStore);
        messageStore.addMessage(connectionContext, message);
        return AbstractMessageStore.FUTURE;
    }

    Future<Object> asyncAddTopicMessage(TransactionStore transactionStore, ConnectionContext connectionContext, MessageStore messageStore, Message message) throws IOException {
        if (message.getTransactionId() == null) {
            return messageStore.asyncAddTopicMessage(connectionContext, message);
        }
        getTx(message.getTransactionId()).trackStore(transactionStore);
        messageStore.addMessage(connectionContext, message);
        return AbstractMessageStore.FUTURE;
    }

    final void removeMessage(TransactionStore transactionStore, ConnectionContext connectionContext, MessageStore messageStore, MessageAck messageAck) throws IOException {
        if (messageAck.getTransactionId() != null) {
            getTx(messageAck.getTransactionId()).trackStore(transactionStore);
        }
        messageStore.removeMessage(connectionContext, messageAck);
    }

    final void removeAsyncMessage(TransactionStore transactionStore, ConnectionContext connectionContext, MessageStore messageStore, MessageAck messageAck) throws IOException {
        if (messageAck.getTransactionId() != null) {
            getTx(messageAck.getTransactionId()).trackStore(transactionStore);
        }
        messageStore.removeAsyncMessage(connectionContext, messageAck);
    }

    final void acknowledge(TransactionStore transactionStore, ConnectionContext connectionContext, TopicMessageStore topicMessageStore, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
        if (messageAck.getTransactionId() != null) {
            getTx(messageAck.getTransactionId()).trackStore(transactionStore);
        }
        topicMessageStore.acknowledge(connectionContext, str, str2, messageId, messageAck);
    }
}
