/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.transaction.TransactionManager;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.concurrent.NotifyingFutureImpl;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.metadata.Metadata;
import org.infinispan.remoting.LocalInvocation;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand;

public class BackupReceiverImpl
implements BackupReceiver {
    private static final Log log = LogFactory.getLog(BackupReceiverImpl.class);
    private static final boolean trace = log.isDebugEnabled();
    private final Cache<Object, Object> cache;
    private final BackupCacheUpdater siteUpdater;

    public BackupReceiverImpl(Cache<Object, Object> cache) {
        this.cache = cache;
        this.siteUpdater = new BackupCacheUpdater(cache);
    }

    @Override
    public Cache getCache() {
        return this.cache;
    }

    @Override
    public Object handleRemoteCommand(VisitableCommand command) throws Throwable {
        return command.acceptVisitor(null, this.siteUpdater);
    }

    @Override
    public void handleStateTransferControl(XSiteStateTransferControlCommand command) throws Exception {
        command.setSiteName(command.getOriginSite());
        this.invokeRemotelyInLocalSite(command);
    }

    @Override
    public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception {
        if (!this.cache.getStatus().allowInvocations()) {
            throw new CacheException("Cache is stopping or terminated: " + (Object)((Object)this.cache.getStatus()));
        }
        ClusteringDependentLogic clusteringDependentLogic = this.cache.getAdvancedCache().getComponentRegistry().getComponent(ClusteringDependentLogic.class);
        HashMap<Address, LinkedList<XSiteState>> primaryOwnersChunks = new HashMap<Address, LinkedList<XSiteState>>();
        if (trace) {
            log.tracef("Received X-Site state transfer '%s'. Splitting by primary owner.", (Object)cmd);
        }
        for (XSiteState state : cmd.getChunk()) {
            Address primaryOwner = clusteringDependentLogic.getPrimaryOwner(state.key());
            LinkedList<XSiteState> primaryOwnerList = (LinkedList<XSiteState>)primaryOwnersChunks.get(primaryOwner);
            if (primaryOwnerList == null) {
                primaryOwnerList = new LinkedList<XSiteState>();
                primaryOwnersChunks.put(primaryOwner, primaryOwnerList);
            }
            primaryOwnerList.add(state);
        }
        List localChunks = (List)primaryOwnersChunks.remove(clusteringDependentLogic.getAddress());
        ArrayList<StatePushTask> tasks = new ArrayList<StatePushTask>(primaryOwnersChunks.size());
        for (Map.Entry entry : primaryOwnersChunks.entrySet()) {
            if (entry.getValue() == null || ((List)entry.getValue()).isEmpty()) continue;
            if (trace) {
                log.tracef("Node '%s' will apply %s", entry.getKey(), entry.getValue());
            }
            StatePushTask task = new StatePushTask((List)entry.getValue(), (Address)entry.getKey(), this.cache);
            tasks.add(task);
            task.executeRemote();
        }
        primaryOwnersChunks.clear();
        if (trace) {
            log.tracef("Local node '%s' will apply %s", (Object)this.cache.getAdvancedCache().getRpcManager().getAddress(), (Object)localChunks);
        }
        if (localChunks != null) {
            LocalInvocation.newInstanceFromCache(this.cache, BackupReceiverImpl.newStatePushCommand(this.cache, localChunks)).call();
            localChunks.clear();
        }
        if (trace) {
            log.tracef("Waiting for the remote tasks...", new Object[0]);
        }
        while (!tasks.isEmpty()) {
            Iterator iterator = tasks.iterator();
            while (iterator.hasNext()) {
                BackupReceiverImpl.awaitRemoteTask(this.cache, (StatePushTask)iterator.next());
                iterator.remove();
            }
        }
        if (!this.cache.getStatus().allowInvocations()) {
            throw new CacheException("Cache is stopping or terminated: " + (Object)((Object)this.cache.getStatus()));
        }
    }

    private static void awaitRemoteTask(Cache<?, ?> cache, StatePushTask task) throws Exception {
        try {
            if (trace) {
                log.tracef("Waiting reply from %s", (Object)task.address);
            }
            Map<Address, Response> responseMap = task.awaitRemote();
            if (trace) {
                log.tracef("Response received is %s", (Object)responseMap);
            }
            if (responseMap.size() > 1 || !responseMap.containsKey(task.address)) {
                throw new IllegalStateException("Shouldn't happen. Map is " + responseMap);
            }
            Response response = responseMap.get(task.address);
            if (response == CacheNotFoundResponse.INSTANCE) {
                if (trace) {
                    log.tracef("Cache not found in node '%s'. Retrying locally!", (Object)task.address);
                }
                if (!cache.getStatus().allowInvocations()) {
                    throw new CacheException("Cache is stopping or terminated: " + (Object)((Object)cache.getStatus()));
                }
                task.executeLocal();
            }
        }
        catch (Exception e) {
            if (!cache.getStatus().allowInvocations()) {
                throw new CacheException("Cache is stopping or terminated: " + (Object)((Object)cache.getStatus()));
            }
            if (cache.getAdvancedCache().getRpcManager().getMembers().contains(task.address)) {
                if (trace) {
                    log.tracef((Throwable)e, "An exception was sent by %s. Retrying!", (Object)task.address);
                }
                task.executeRemote();
            }
            if (trace) {
                log.tracef((Throwable)e, "An exception was sent by %s. Retrying locally!", (Object)task.address);
            }
            task.executeLocal();
        }
    }

    private static XSiteStatePushCommand newStatePushCommand(Cache<?, ?> cache, List<XSiteState> stateList) {
        CommandsFactory commandsFactory = cache.getAdvancedCache().getComponentRegistry().getCommandsFactory();
        return commandsFactory.buildXSiteStatePushCommand(stateList.toArray(new XSiteState[stateList.size()]));
    }

    private Map<Address, Response> invokeRemotelyInLocalSite(CacheRpcCommand command) throws Exception {
        RpcManager rpcManager = this.cache.getAdvancedCache().getRpcManager();
        NotifyingFutureImpl<Map<Address, Response>> remoteFuture = new NotifyingFutureImpl<Map<Address, Response>>();
        HashMap<Address, Response> responseMap = new HashMap<Address, Response>();
        rpcManager.invokeRemotelyInFuture(remoteFuture, null, command, rpcManager.getDefaultRpcOptions(true, false));
        responseMap.put(rpcManager.getAddress(), LocalInvocation.newInstanceFromCache(this.cache, command).call());
        responseMap.putAll((Map)remoteFuture.get());
        return responseMap;
    }

    private static class StatePushTask {
        private final List<XSiteState> chunk;
        private final Address address;
        private final Cache<?, ?> cache;
        private volatile Future<Map<Address, Response>> remoteFuture;

        private StatePushTask(List<XSiteState> chunk, Address address, Cache<?, ?> cache) {
            this.chunk = chunk;
            this.address = address;
            this.cache = cache;
        }

        public void executeRemote() {
            RpcManager rpcManager = this.cache.getAdvancedCache().getRpcManager();
            NotifyingFutureImpl<Map<Address, Response>> future = new NotifyingFutureImpl<Map<Address, Response>>();
            this.remoteFuture = future;
            rpcManager.invokeRemotelyInFuture(future, Collections.singletonList(this.address), BackupReceiverImpl.newStatePushCommand(this.cache, this.chunk), rpcManager.getDefaultRpcOptions(true));
        }

        public Response executeLocal() throws Exception {
            return LocalInvocation.newInstanceFromCache(this.cache, BackupReceiverImpl.newStatePushCommand(this.cache, this.chunk)).call();
        }

        public Map<Address, Response> awaitRemote() throws Exception {
            Future<Map<Address, Response>> future = this.remoteFuture;
            if (future == null) {
                throw new NullPointerException("Should not happen!");
            }
            return future.get();
        }
    }

    public static final class BackupCacheUpdater
    extends AbstractVisitor {
        private static Log log = LogFactory.getLog(BackupCacheUpdater.class);
        private final ConcurrentMap<GlobalTransaction, GlobalTransaction> remote2localTx;
        private final AdvancedCache<Object, Object> backupCache;

        BackupCacheUpdater(Cache<Object, Object> backup) {
            this.backupCache = backup.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_XSITE_BACKUP);
            this.remote2localTx = new ConcurrentHashMap<GlobalTransaction, GlobalTransaction>();
        }

        @Override
        public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
            log.tracef("Processing a remote put %s", (Object)command);
            if (command.isConditional()) {
                return this.backupCache.putIfAbsent(command.getKey(), command.getValue(), command.getMetadata());
            }
            return this.backupCache.put(command.getKey(), command.getValue(), command.getMetadata());
        }

        @Override
        public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
            if (command.isConditional()) {
                return this.backupCache.remove(command.getKey(), command.getValue());
            }
            return this.backupCache.remove(command.getKey());
        }

        @Override
        public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
            if (command.isConditional() && command.getOldValue() != null) {
                return this.backupCache.replace(command.getKey(), command.getOldValue(), command.getNewValue(), command.getMetadata());
            }
            return this.backupCache.replace(command.getKey(), command.getNewValue(), command.getMetadata());
        }

        @Override
        public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
            Metadata metadata = command.getMetadata();
            this.backupCache.putAll(command.getMap(), metadata.lifespan(), TimeUnit.MILLISECONDS, metadata.maxIdle(), TimeUnit.MILLISECONDS);
            return null;
        }

        @Override
        public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
            this.backupCache.clear();
            return null;
        }

        @Override
        public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
            boolean isTransactional = this.isTransactional();
            if (isTransactional) {
                if (!command.hasModifications()) {
                    throw new IllegalStateException("TxInvocationContext has no modifications!");
                }
                this.replayModificationsInTransaction(command, command.isOnePhaseCommit());
            } else {
                this.replayModifications(command);
            }
            return null;
        }

        private boolean isTransactional() {
            return this.backupCache.getCacheConfiguration().transaction().transactionMode() == TransactionMode.TRANSACTIONAL;
        }

        @Override
        public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
            if (!this.isTransactional()) {
                log.cannotRespondToCommit(command.getGlobalTransaction(), this.backupCache.getName());
            } else {
                log.tracef("Committing remote transaction %s", (Object)command.getGlobalTransaction());
                this.completeTransaction(command.getGlobalTransaction(), true);
            }
            return null;
        }

        @Override
        public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
            if (!this.isTransactional()) {
                log.cannotRespondToRollback(command.getGlobalTransaction(), this.backupCache.getName());
            } else {
                log.tracef("Rolling back remote transaction %s", (Object)command.getGlobalTransaction());
                this.completeTransaction(command.getGlobalTransaction(), false);
            }
            return null;
        }

        private void completeTransaction(GlobalTransaction globalTransaction, boolean commit) throws Throwable {
            TransactionTable txTable = this.txTable();
            GlobalTransaction localTxId = (GlobalTransaction)this.remote2localTx.remove(globalTransaction);
            if (localTxId == null) {
                throw new CacheException("Couldn't find a local transaction corresponding to remote transaction " + globalTransaction);
            }
            LocalTransaction localTx = txTable.getLocalTransaction(localTxId);
            if (localTx == null) {
                throw new IllegalStateException("Local tx not found but present in the tx table!");
            }
            TransactionManager txManager = this.txManager();
            txManager.resume(localTx.getTransaction());
            if (commit) {
                txManager.commit();
            } else {
                txManager.rollback();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void replayModificationsInTransaction(PrepareCommand command, boolean onePhaseCommit) throws Throwable {
            TransactionManager tm = this.txManager();
            boolean replaySuccessful = false;
            try {
                tm.begin();
                this.replayModifications(command);
                replaySuccessful = true;
            }
            finally {
                LocalTransaction localTx = this.txTable().getLocalTransaction(tm.getTransaction());
                if (localTx != null) {
                    localTx.setFromRemoteSite(true);
                    if (onePhaseCommit) {
                        if (replaySuccessful) {
                            log.tracef("Committing remotely originated tx %s as it is 1PC", (Object)command.getGlobalTransaction());
                            tm.commit();
                        } else {
                            log.tracef("Rolling back remotely originated tx %s", (Object)command.getGlobalTransaction());
                            tm.rollback();
                        }
                    } else {
                        this.remote2localTx.put(command.getGlobalTransaction(), localTx.getGlobalTransaction());
                        tm.suspend();
                    }
                }
            }
        }

        private TransactionManager txManager() {
            return this.backupCache.getAdvancedCache().getTransactionManager();
        }

        public TransactionTable txTable() {
            return this.backupCache.getComponentRegistry().getComponent(TransactionTable.class);
        }

        private void replayModifications(PrepareCommand command) throws Throwable {
            for (WriteCommand c : command.getModifications()) {
                c.acceptVisitor(null, this);
            }
        }
    }
}

