/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.JournalCommand;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KahaDBTransactionStore
implements TransactionStore {
    static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap();
    private final WireFormat wireFormat = new OpenWireFormat();
    private final KahaDBStore theStore;

    public KahaDBTransactionStore(KahaDBStore theStore) {
        this.theStore = theStore;
    }

    public MessageStore proxy(MessageStore messageStore) {
        return new ProxyMessageStore(messageStore){

            @Override
            public void addMessage(ConnectionContext context, Message send) throws IOException {
                KahaDBTransactionStore.this.addMessage(context, this.getDelegate(), send);
            }

            @Override
            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, this.getDelegate(), message);
            }

            @Override
            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
                KahaDBTransactionStore.this.removeMessage(context, this.getDelegate(), ack);
            }

            @Override
            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
                KahaDBTransactionStore.this.removeAsyncMessage(context, this.getDelegate(), ack);
            }
        };
    }

    public TopicMessageStore proxy(TopicMessageStore messageStore) {
        return new ProxyTopicMessageStore(messageStore){

            @Override
            public void addMessage(ConnectionContext context, Message send) throws IOException {
                KahaDBTransactionStore.this.addMessage(context, this.getDelegate(), send);
            }

            @Override
            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, this.getDelegate(), message);
            }

            @Override
            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
                KahaDBTransactionStore.this.removeMessage(context, this.getDelegate(), ack);
            }

            @Override
            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
                KahaDBTransactionStore.this.removeAsyncMessage(context, this.getDelegate(), ack);
            }

            @Override
            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)this.getDelegate(), clientId, subscriptionName, messageId, ack);
            }
        };
    }

    @Override
    public void prepare(TransactionId txid) throws IOException {
        this.inflightTransactions.remove(txid);
        KahaTransactionInfo info = this.getTransactionInfo(txid);
        this.theStore.store((JournalCommand)new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
    }

    public Tx getTx(Object txid) {
        Tx tx = this.inflightTransactions.get(txid);
        if (tx == null) {
            tx = new Tx();
            this.inflightTransactions.put(txid, tx);
        }
        return tx;
    }

    @Override
    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException {
        if (txid != null) {
            if (!txid.isXATransaction() && this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                Tx tx;
                if (preCommit != null) {
                    preCommit.run();
                }
                if ((tx = this.inflightTransactions.remove(txid)) != null) {
                    List<Future<Object>> results = tx.commit();
                    boolean doneSomething = false;
                    for (Future<Object> result : results) {
                        try {
                            result.get();
                        }
                        catch (InterruptedException e) {
                            this.theStore.brokerService.handleIOException(new IOException(e.getMessage()));
                        }
                        catch (ExecutionException e) {
                            this.theStore.brokerService.handleIOException(new IOException(e.getMessage()));
                        }
                        catch (CancellationException e) {
                            // empty catch block
                        }
                        if (result.isCancelled()) continue;
                        doneSomething = true;
                    }
                    if (postCommit != null) {
                        postCommit.run();
                    }
                    if (doneSomething) {
                        KahaTransactionInfo info = this.getTransactionInfo(txid);
                        this.theStore.store((JournalCommand)new KahaCommitCommand().setTransactionInfo(info), true, null, null);
                    }
                } else if (postCommit != null) {
                    postCommit.run();
                }
            } else {
                KahaTransactionInfo info = this.getTransactionInfo(txid);
                this.theStore.store((JournalCommand)new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
                this.forgetRecoveredAcks(txid);
            }
        } else {
            LOG.error("Null transaction passed on commit");
        }
    }

    @Override
    public void rollback(TransactionId txid) throws IOException {
        if (txid.isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
            KahaTransactionInfo info = this.getTransactionInfo(txid);
            this.theStore.store((JournalCommand)new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
            this.forgetRecoveredAcks(txid);
        } else {
            this.inflightTransactions.remove(txid);
        }
    }

    protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
        if (txid.isXATransaction()) {
            XATransactionId xaTid = (XATransactionId)txid;
            this.theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
        }
    }

    @Override
    public void start() throws Exception {
    }

    @Override
    public void stop() throws Exception {
    }

    @Override
    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
        for (Map.Entry entry : this.theStore.preparedTransactions.entrySet()) {
            XATransactionId xid = (XATransactionId)entry.getKey();
            ArrayList<Message> messageList = new ArrayList<Message>();
            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
            for (MessageDatabase.Operation op : (List)entry.getValue()) {
                if (op.getClass() == MessageDatabase.AddOpperation.class) {
                    MessageDatabase.AddOpperation addOp = (MessageDatabase.AddOpperation)op;
                    Message msg = (Message)this.wireFormat.unmarshal(new DataInputStream(((KahaAddMessageCommand)addOp.getCommand()).getMessage().newInput()));
                    messageList.add(msg);
                    continue;
                }
                MessageDatabase.RemoveOpperation rmOp = (MessageDatabase.RemoveOpperation)op;
                Buffer ackb = ((KahaRemoveMessageCommand)rmOp.getCommand()).getAck();
                MessageAck ack = (MessageAck)this.wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
                ackList.add(ack);
            }
            Message[] addedMessages = new Message[messageList.size()];
            MessageAck[] acks = new MessageAck[ackList.size()];
            messageList.toArray(addedMessages);
            ackList.toArray(acks);
            xid.setPreparedAcks(ackList);
            this.theStore.trackRecoveredAcks(ackList);
            listener.recover(xid, addedMessages, acks);
        }
    }

    void addMessage(ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
        if (message.getTransactionId() != null) {
            if (message.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                destination.addMessage(context, message);
            } else {
                Tx tx = this.getTx(message.getTransactionId());
                tx.add(new AddMessageCommand(context){

                    @Override
                    public Message getMessage() {
                        return message;
                    }

                    @Override
                    public Future<Object> run(ConnectionContext ctx) throws IOException {
                        destination.addMessage(ctx, message);
                        return AbstractMessageStore.FUTURE;
                    }
                });
            }
        } else {
            destination.addMessage(context, message);
        }
    }

    Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
        if (message.getTransactionId() != null) {
            if (message.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                destination.addMessage(context, message);
                return AbstractMessageStore.FUTURE;
            }
            Tx tx = this.getTx(message.getTransactionId());
            tx.add(new AddMessageCommand(context){

                @Override
                public Message getMessage() {
                    return message;
                }

                @Override
                public Future<Object> run(ConnectionContext ctx) throws IOException {
                    return destination.asyncAddQueueMessage(ctx, message);
                }
            });
            return AbstractMessageStore.FUTURE;
        }
        return destination.asyncAddQueueMessage(context, message);
    }

    Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
        if (message.getTransactionId() != null) {
            if (message.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                destination.addMessage(context, message);
                return AbstractMessageStore.FUTURE;
            }
            Tx tx = this.getTx(message.getTransactionId());
            tx.add(new AddMessageCommand(context){

                @Override
                public Message getMessage() {
                    return message;
                }

                @Override
                public Future<Object> run(ConnectionContext ctx) throws IOException {
                    return destination.asyncAddTopicMessage(ctx, message);
                }
            });
            return AbstractMessageStore.FUTURE;
        }
        return destination.asyncAddTopicMessage(context, message);
    }

    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) throws IOException {
        if (ack.isInTransaction()) {
            if (ack.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                destination.removeMessage(context, ack);
            } else {
                Tx tx = this.getTx(ack.getTransactionId());
                tx.add(new RemoveMessageCommand(context){

                    @Override
                    public MessageAck getMessageAck() {
                        return ack;
                    }

                    @Override
                    public Future<Object> run(ConnectionContext ctx) throws IOException {
                        destination.removeMessage(ctx, ack);
                        return AbstractMessageStore.FUTURE;
                    }
                });
            }
        } else {
            destination.removeMessage(context, ack);
        }
    }

    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) throws IOException {
        if (ack.isInTransaction()) {
            if (ack.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                destination.removeMessage(context, ack);
            } else {
                Tx tx = this.getTx(ack.getTransactionId());
                tx.add(new RemoveMessageCommand(context){

                    @Override
                    public MessageAck getMessageAck() {
                        return ack;
                    }

                    @Override
                    public Future<Object> run(ConnectionContext ctx) throws IOException {
                        destination.removeMessage(ctx, ack);
                        return AbstractMessageStore.FUTURE;
                    }
                });
            }
        } else {
            destination.removeAsyncMessage(context, ack);
        }
    }

    final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, final MessageId messageId, final MessageAck ack) throws IOException {
        if (ack.isInTransaction()) {
            if (ack.getTransactionId().isXATransaction() || !this.theStore.isConcurrentStoreAndDispatchTransactions()) {
                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
            } else {
                Tx tx = this.getTx(ack.getTransactionId());
                tx.add(new RemoveMessageCommand(context){

                    @Override
                    public MessageAck getMessageAck() {
                        return ack;
                    }

                    @Override
                    public Future<Object> run(ConnectionContext ctx) throws IOException {
                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
                        return AbstractMessageStore.FUTURE;
                    }
                });
            }
        } else {
            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
        }
    }

    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
        return this.theStore.getTransactionIdTransformer().transform(txid);
    }

    public abstract class RemoveMessageCommand {
        private final ConnectionContext ctx;

        RemoveMessageCommand(ConnectionContext ctx) {
            this.ctx = ctx;
        }

        abstract MessageAck getMessageAck();

        Future<Object> run() throws IOException {
            return this.run(this.ctx);
        }

        abstract Future<Object> run(ConnectionContext var1) throws IOException;
    }

    public abstract class AddMessageCommand {
        private final ConnectionContext ctx;

        AddMessageCommand(ConnectionContext ctx) {
            this.ctx = ctx;
        }

        abstract Message getMessage();

        Future<Object> run() throws IOException {
            return this.run(this.ctx);
        }

        abstract Future<Object> run(ConnectionContext var1) throws IOException;
    }

    public class Tx {
        private final ArrayList<AddMessageCommand> messages = new ArrayList();
        private final ArrayList<RemoveMessageCommand> acks = new ArrayList();

        public void add(AddMessageCommand msg) {
            this.messages.add(msg);
        }

        public void add(RemoveMessageCommand ack) {
            this.acks.add(ack);
        }

        public Message[] getMessages() {
            Message[] rc = new Message[this.messages.size()];
            int count = 0;
            for (AddMessageCommand cmd : this.messages) {
                rc[count++] = cmd.getMessage();
            }
            return rc;
        }

        public MessageAck[] getAcks() {
            MessageAck[] rc = new MessageAck[this.acks.size()];
            int count = 0;
            for (RemoveMessageCommand cmd : this.acks) {
                rc[count++] = cmd.getMessageAck();
            }
            return rc;
        }

        public List<Future<Object>> commit() throws IOException {
            ArrayList<Future<Object>> results = new ArrayList<Future<Object>>();
            for (AddMessageCommand addMessageCommand : this.messages) {
                results.add(addMessageCommand.run());
            }
            for (RemoveMessageCommand removeMessageCommand : this.acks) {
                removeMessageCommand.run();
                results.add(removeMessageCommand.run());
            }
            return results;
        }
    }
}

