package org.apache.activemq.store.kahadb;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-5.4.0-fusesource-SNAPSHOT.jar:org/apache/activemq/store/kahadb/KahaDBTransactionStore.class */
public class KahaDBTransactionStore implements TransactionStore {
    static final Log LOG = LogFactory.getLog(KahaDBTransactionStore.class);
    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<>();
    private final WireFormat wireFormat = new OpenWireFormat();
    private final KahaDBStore theStore;

    /* loaded from: input_file:activemq-core-5.4.0-fusesource-SNAPSHOT.jar:org/apache/activemq/store/kahadb/KahaDBTransactionStore$AddMessageCommand.class */
    public abstract class AddMessageCommand {
        private final ConnectionContext ctx;

        AddMessageCommand(ConnectionContext connectionContext) {
            this.ctx = connectionContext;
        }

        abstract Message getMessage();

        Future<Object> run() throws IOException {
            return run(this.ctx);
        }

        abstract Future<Object> run(ConnectionContext connectionContext) throws IOException;
    }

    /* loaded from: input_file:activemq-core-5.4.0-fusesource-SNAPSHOT.jar:org/apache/activemq/store/kahadb/KahaDBTransactionStore$RemoveMessageCommand.class */
    public abstract class RemoveMessageCommand {
        private final ConnectionContext ctx;

        RemoveMessageCommand(ConnectionContext connectionContext) {
            this.ctx = connectionContext;
        }

        abstract MessageAck getMessageAck();

        Future<Object> run() throws IOException {
            return run(this.ctx);
        }

        abstract Future<Object> run(ConnectionContext connectionContext) throws IOException;
    }

    /* loaded from: input_file:activemq-core-5.4.0-fusesource-SNAPSHOT.jar:org/apache/activemq/store/kahadb/KahaDBTransactionStore$Tx.class */
    public class Tx {
        private final ArrayList<AddMessageCommand> messages = new ArrayList<>();
        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<>();

        public Tx() {
        }

        public void add(AddMessageCommand addMessageCommand) {
            this.messages.add(addMessageCommand);
        }

        public void add(RemoveMessageCommand removeMessageCommand) {
            this.acks.add(removeMessageCommand);
        }

        public Message[] getMessages() {
            Message[] messageArr = new Message[this.messages.size()];
            int i = 0;
            Iterator<AddMessageCommand> it = this.messages.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                messageArr[i2] = it.next().getMessage();
            }
            return messageArr;
        }

        public MessageAck[] getAcks() {
            MessageAck[] messageAckArr = new MessageAck[this.acks.size()];
            int i = 0;
            Iterator<RemoveMessageCommand> it = this.acks.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                messageAckArr[i2] = it.next().getMessageAck();
            }
            return messageAckArr;
        }

        public List<Future<Object>> commit() throws IOException {
            ArrayList arrayList = new ArrayList();
            Iterator<AddMessageCommand> it = this.messages.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().run());
            }
            Iterator<RemoveMessageCommand> it2 = this.acks.iterator();
            while (it2.hasNext()) {
                RemoveMessageCommand next = it2.next();
                next.run();
                arrayList.add(next.run());
            }
            return arrayList;
        }
    }

    public KahaDBTransactionStore(KahaDBStore kahaDBStore) {
        this.theStore = kahaDBStore;
    }

    public MessageStore proxy(MessageStore messageStore) {
        return new ProxyMessageStore(messageStore) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.1
            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
                KahaDBTransactionStore.this.addMessage(connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public Future<Object> asyncAddQueueMessage(ConnectionContext connectionContext, Message message) throws IOException {
                return KahaDBTransactionStore.this.asyncAddQueueMessage(connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                KahaDBTransactionStore.this.removeMessage(connectionContext, getDelegate(), messageAck);
            }

            @Override // org.apache.activemq.store.ProxyMessageStore, org.apache.activemq.store.MessageStore
            public void removeAsyncMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                KahaDBTransactionStore.this.removeAsyncMessage(connectionContext, getDelegate(), messageAck);
            }
        };
    }

    public TopicMessageStore proxy(TopicMessageStore topicMessageStore) {
        return new ProxyTopicMessageStore(topicMessageStore) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.2
            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
                KahaDBTransactionStore.this.addMessage(connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public Future<Object> asyncAddTopicMessage(ConnectionContext connectionContext, Message message) throws IOException {
                return KahaDBTransactionStore.this.asyncAddTopicMessage(connectionContext, getDelegate(), message);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                KahaDBTransactionStore.this.removeMessage(connectionContext, getDelegate(), messageAck);
            }

            @Override // org.apache.activemq.store.ProxyTopicMessageStore, org.apache.activemq.store.MessageStore
            public void removeAsyncMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
                KahaDBTransactionStore.this.removeAsyncMessage(connectionContext, getDelegate(), messageAck);
            }
        };
    }

    @Override // org.apache.activemq.store.TransactionStore
    public void prepare(TransactionId transactionId) throws IOException {
        this.inflightTransactions.remove(transactionId);
        this.theStore.store(new KahaPrepareCommand().setTransactionInfo(getTransactionInfo(transactionId)), true, null, null);
    }

    public Tx getTx(Object obj) {
        Tx tx = this.inflightTransactions.get(obj);
        if (tx == null) {
            tx = new Tx();
            this.inflightTransactions.put(obj, tx);
        }
        return tx;
    }

    @Override // org.apache.activemq.store.TransactionStore
    public void commit(TransactionId transactionId, boolean z, Runnable runnable, Runnable runnable2) throws IOException {
        if (transactionId == null) {
            LOG.error("Null transaction passed on commit");
            return;
        }
        if (transactionId.isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            KahaTransactionInfo transactionInfo = getTransactionInfo(transactionId);
            synchronized (this) {
                this.theStore.store(new KahaCommitCommand().setTransactionInfo(transactionInfo), true, runnable, runnable2);
            }
            return;
        }
        if (runnable != null) {
            runnable.run();
        }
        Tx remove = this.inflightTransactions.remove(transactionId);
        if (remove == null) {
            if (runnable2 != null) {
                runnable2.run();
                return;
            }
            return;
        }
        boolean z2 = false;
        for (Future<Object> future : remove.commit()) {
            try {
                future.get();
            } catch (InterruptedException e) {
                this.theStore.brokerService.handleIOException(new IOException(e.getMessage()));
            } catch (CancellationException e2) {
            } catch (ExecutionException e3) {
                this.theStore.brokerService.handleIOException(new IOException(e3.getMessage()));
            }
            if (!future.isCancelled()) {
                z2 = true;
            }
        }
        if (runnable2 != null) {
            runnable2.run();
        }
        if (z2) {
            this.theStore.store(new KahaCommitCommand().setTransactionInfo(getTransactionInfo(transactionId)), true, null, null);
        }
    }

    @Override // org.apache.activemq.store.TransactionStore
    public void rollback(TransactionId transactionId) throws IOException {
        if (!transactionId.isXATransaction() && !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            this.inflightTransactions.remove(transactionId);
        } else {
            this.theStore.store(new KahaRollbackCommand().setTransactionInfo(getTransactionInfo(transactionId)), false, null, null);
        }
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
    }

    @Override // org.apache.activemq.store.TransactionStore
    public synchronized void recover(TransactionRecoveryListener transactionRecoveryListener) throws IOException {
        for (Map.Entry<TransactionId, List<MessageDatabase.Operation>> entry : this.theStore.preparedTransactions.entrySet()) {
            XATransactionId xATransactionId = (XATransactionId) entry.getKey();
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (MessageDatabase.Operation operation : entry.getValue()) {
                if (operation.getClass() == MessageDatabase.AddOpperation.class) {
                    arrayList.add((Message) this.wireFormat.unmarshal(new DataInputStream(((MessageDatabase.AddOpperation) operation).getCommand().getMessage().newInput())));
                } else {
                    arrayList2.add((MessageAck) this.wireFormat.unmarshal(new DataInputStream(((MessageDatabase.RemoveOpperation) operation).getCommand().getAck().newInput())));
                }
            }
            Message[] messageArr = new Message[arrayList.size()];
            MessageAck[] messageAckArr = new MessageAck[arrayList2.size()];
            arrayList.toArray(messageArr);
            arrayList2.toArray(messageAckArr);
            transactionRecoveryListener.recover(xATransactionId, messageArr, messageAckArr);
        }
    }

    void addMessage(ConnectionContext connectionContext, final MessageStore messageStore, final Message message) throws IOException {
        if (message.getTransactionId() == null) {
            messageStore.addMessage(connectionContext, message);
        } else if (message.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            messageStore.addMessage(connectionContext, message);
        } else {
            getTx(message.getTransactionId()).add(new AddMessageCommand(connectionContext) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.3
                @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.AddMessageCommand
                public Message getMessage() {
                    return message;
                }

                @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.AddMessageCommand
                public Future<Object> run(ConnectionContext connectionContext2) throws IOException {
                    messageStore.addMessage(connectionContext2, message);
                    return AbstractMessageStore.FUTURE;
                }
            });
        }
    }

    Future<Object> asyncAddQueueMessage(ConnectionContext connectionContext, final MessageStore messageStore, final Message message) throws IOException {
        if (message.getTransactionId() == null) {
            return messageStore.asyncAddQueueMessage(connectionContext, message);
        }
        if (message.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            messageStore.addMessage(connectionContext, message);
            return AbstractMessageStore.FUTURE;
        }
        getTx(message.getTransactionId()).add(new AddMessageCommand(connectionContext) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.4
            @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.AddMessageCommand
            public Message getMessage() {
                return message;
            }

            @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.AddMessageCommand
            public Future<Object> run(ConnectionContext connectionContext2) throws IOException {
                return messageStore.asyncAddQueueMessage(connectionContext2, message);
            }
        });
        return AbstractMessageStore.FUTURE;
    }

    Future<Object> asyncAddTopicMessage(ConnectionContext connectionContext, final MessageStore messageStore, final Message message) throws IOException {
        if (message.getTransactionId() == null) {
            return messageStore.asyncAddTopicMessage(connectionContext, message);
        }
        if (message.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            messageStore.addMessage(connectionContext, message);
            return AbstractMessageStore.FUTURE;
        }
        getTx(message.getTransactionId()).add(new AddMessageCommand(connectionContext) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.5
            @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.AddMessageCommand
            public Message getMessage() {
                return message;
            }

            @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.AddMessageCommand
            public Future run(ConnectionContext connectionContext2) throws IOException {
                return messageStore.asyncAddTopicMessage(connectionContext2, message);
            }
        });
        return AbstractMessageStore.FUTURE;
    }

    final void removeMessage(ConnectionContext connectionContext, final MessageStore messageStore, final MessageAck messageAck) throws IOException {
        if (!messageAck.isInTransaction()) {
            messageStore.removeMessage(connectionContext, messageAck);
        } else if (messageAck.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            messageStore.removeMessage(connectionContext, messageAck);
        } else {
            getTx(messageAck.getTransactionId()).add(new RemoveMessageCommand(connectionContext) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.6
                @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.RemoveMessageCommand
                public MessageAck getMessageAck() {
                    return messageAck;
                }

                @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.RemoveMessageCommand
                public Future<Object> run(ConnectionContext connectionContext2) throws IOException {
                    messageStore.removeMessage(connectionContext2, messageAck);
                    return AbstractMessageStore.FUTURE;
                }
            });
        }
    }

    final void removeAsyncMessage(ConnectionContext connectionContext, final MessageStore messageStore, final MessageAck messageAck) throws IOException {
        if (!messageAck.isInTransaction()) {
            messageStore.removeAsyncMessage(connectionContext, messageAck);
        } else if (messageAck.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            messageStore.removeAsyncMessage(connectionContext, messageAck);
        } else {
            getTx(messageAck.getTransactionId()).add(new RemoveMessageCommand(connectionContext) { // from class: org.apache.activemq.store.kahadb.KahaDBTransactionStore.7
                @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.RemoveMessageCommand
                public MessageAck getMessageAck() {
                    return messageAck;
                }

                @Override // org.apache.activemq.store.kahadb.KahaDBTransactionStore.RemoveMessageCommand
                public Future<Object> run(ConnectionContext connectionContext2) throws IOException {
                    messageStore.removeMessage(connectionContext2, messageAck);
                    return AbstractMessageStore.FUTURE;
                }
            });
        }
    }

    private KahaTransactionInfo getTransactionInfo(TransactionId transactionId) {
        return this.theStore.createTransactionInfo(transactionId);
    }
}
