package org.infinispan.statetransfer;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.impl.RemoteTxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.io.UnclosableObjectInputStream;
import org.infinispan.io.UnclosableObjectOutputStream;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.marshall.StreamingMarshaller;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.DistributedSync;
import org.infinispan.transaction.TransactionLog;
import org.infinispan.transaction.TransactionTable;
import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-5.0.0.CR4.jar:org/infinispan/statetransfer/StateTransferManagerImpl.class */
public class StateTransferManagerImpl implements StateTransferManager {
    RpcManager rpcManager;
    AdvancedCache<Object, Object> cache;
    Configuration configuration;
    DataContainer dataContainer;
    CacheLoaderManager clm;
    CacheStore cs;
    StreamingMarshaller marshaller;
    TransactionLog transactionLog;
    InvocationContextContainer invocationContextContainer;
    InterceptorChain interceptorChain;
    CommandsFactory commandsFactory;
    TransactionTable txTable;
    private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final Byte DELIMITER = (byte) 123;
    boolean transientState;
    boolean persistentState;
    boolean alwaysProvideTransientState;
    int maxNonProgressingLogWrites;
    long flushTimeout;
    volatile boolean needToUnblockRPC = false;
    volatile Address stateSender;

    @Inject
    public void injectDependencies(RpcManager rpcManager, AdvancedCache advancedCache, Configuration configuration, DataContainer dataContainer, CacheLoaderManager cacheLoaderManager, StreamingMarshaller streamingMarshaller, TransactionLog transactionLog, InterceptorChain interceptorChain, InvocationContextContainer invocationContextContainer, CommandsFactory commandsFactory, TransactionTable transactionTable) {
        this.rpcManager = rpcManager;
        this.cache = advancedCache;
        this.configuration = configuration;
        this.dataContainer = dataContainer;
        this.clm = cacheLoaderManager;
        this.marshaller = streamingMarshaller;
        this.transactionLog = transactionLog;
        this.invocationContextContainer = invocationContextContainer;
        this.interceptorChain = interceptorChain;
        this.commandsFactory = commandsFactory;
        this.txTable = transactionTable;
    }

    @Start(priority = 55)
    public void start() throws StateTransferException {
        log.tracef("Data container is %s", Util.hexIdHashCode(this.dataContainer));
        this.cs = this.clm == null ? null : this.clm.getCacheStore();
        this.transientState = this.configuration.isFetchInMemoryState();
        this.alwaysProvideTransientState = this.configuration.isAlwaysProvideInMemoryState();
        this.persistentState = this.cs != null && this.clm.isEnabled() && this.clm.isFetchPersistentState() && !this.clm.isShared();
        this.maxNonProgressingLogWrites = this.configuration.getStateRetrievalMaxNonProgressingLogWrites();
        this.flushTimeout = this.configuration.getStateRetrievalLogFlushTimeout();
        if (this.transientState || this.persistentState) {
            long j = 0;
            if (log.isDebugEnabled()) {
                log.debug("Initiating state transfer process");
                j = System.currentTimeMillis();
            }
            this.rpcManager.retrieveState(this.cache.getName(), this.configuration.getStateRetrievalTimeout());
            if (log.isDebugEnabled()) {
                log.debugf("State transfer process completed in %s", Util.prettyPrintTime(System.currentTimeMillis() - j));
            }
        }
    }

    @Start(priority = 1000)
    public void releaseRPCBlock() throws Exception {
        if (this.needToUnblockRPC) {
            if (trace) {
                log.trace("Stopping RPC block");
            }
            mimicPartialFlushViaRPC(this.stateSender, false);
        }
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void generateState(OutputStream outputStream) throws StateTransferException {
        try {
            try {
                try {
                    boolean activate = this.transactionLog.activate();
                    if (log.isDebugEnabled()) {
                        log.debugf("Generating state.  Can provide? %s", Boolean.valueOf(activate));
                    }
                    ObjectOutput startObjectOutput = this.marshaller.startObjectOutput(outputStream, false);
                    this.marshaller.objectToObjectStream(true, startObjectOutput);
                    this.marshaller.objectToObjectStream(Boolean.valueOf(activate), startObjectOutput);
                    if (activate) {
                        delimit(startObjectOutput);
                        if (this.transientState || this.alwaysProvideTransientState) {
                            generateInMemoryState(startObjectOutput);
                        }
                        delimit(startObjectOutput);
                        if (this.persistentState) {
                            generatePersistentState(startObjectOutput);
                        }
                        delimit(startObjectOutput);
                        generateTransactionLog(startObjectOutput);
                        if (log.isDebugEnabled()) {
                            log.debug("State generated, closing object stream");
                        }
                    } else if (log.isDebugEnabled()) {
                        log.debug("Not providing state!");
                    }
                    this.marshaller.finishObjectOutput(startObjectOutput);
                    if (activate) {
                        this.transactionLog.deactivate();
                    }
                } catch (StateTransferException e) {
                    throw e;
                }
            } catch (Exception e2) {
                throw new StateTransferException(e2);
            }
        } catch (Throwable th) {
            this.marshaller.finishObjectOutput(null);
            if (0 != 0) {
                this.transactionLog.deactivate();
            }
            throw th;
        }
    }

    private void generateTransactionLog(ObjectOutput objectOutput) throws Exception {
        DistributedSync distributedSync = this.rpcManager.getTransport().getDistributedSync();
        try {
            if (trace) {
                log.tracef("Transaction log size is %s", Integer.valueOf(this.transactionLog.size()));
            }
            int i = 0;
            int size = this.transactionLog.size();
            while (size > 0) {
                if (trace) {
                    log.tracef("Tx Log remaining entries = %d", Integer.valueOf(size));
                }
                this.transactionLog.writeCommitLog(this.marshaller, objectOutput);
                int size2 = this.transactionLog.size();
                if (size2 >= size) {
                    i++;
                    if (i >= this.maxNonProgressingLogWrites) {
                        break;
                    }
                }
                size = size2;
            }
            distributedSync.acquireProcessingLock(true, this.configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS);
            delimit(objectOutput);
            objectOutput.flush();
            if (trace) {
                log.trace("Waiting for a distributed sync block");
            }
            distributedSync.blockUntilAcquired(this.flushTimeout, TimeUnit.MILLISECONDS);
            if (trace) {
                log.trace("Distributed sync block received, proceeding with writing commit log");
            }
            this.transactionLog.writeCommitLog(this.marshaller, objectOutput);
            delimit(objectOutput);
            this.transactionLog.writePendingPrepares(this.marshaller, objectOutput);
            delimit(objectOutput);
            objectOutput.flush();
            distributedSync.releaseProcessingLock(true);
        } catch (Throwable th) {
            distributedSync.releaseProcessingLock(true);
            throw th;
        }
    }

    private void processCommitLog(ObjectInput objectInput) throws Exception {
        Object obj;
        if (trace) {
            log.trace("Applying commit log");
        }
        Object objectFromObjectStream = this.marshaller.objectFromObjectStream(objectInput);
        while (true) {
            obj = objectFromObjectStream;
            if (!(obj instanceof TransactionLog.LogEntry)) {
                break;
            }
            InvocationContext createRemoteInvocationContext = this.invocationContextContainer.createRemoteInvocationContext(null);
            WriteCommand[] modifications = ((TransactionLog.LogEntry) obj).getModifications();
            if (trace) {
                log.tracef("Mods = %s", Arrays.toString(modifications));
            }
            for (WriteCommand writeCommand : modifications) {
                this.commandsFactory.initializeReplicableCommand(writeCommand, false);
                createRemoteInvocationContext.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STATUS_CHECK);
                this.interceptorChain.invoke(createRemoteInvocationContext, writeCommand);
            }
            objectFromObjectStream = this.marshaller.objectFromObjectStream(objectInput);
        }
        assertDelimited(obj);
        if (trace) {
            log.trace("Finished applying commit log");
        }
    }

    private void applyTransactionLog(ObjectInput objectInput) throws Exception {
        if (trace) {
            log.trace("Integrating transaction log");
        }
        processCommitLog(objectInput);
        this.stateSender = this.rpcManager.getCurrentStateTransferSource();
        mimicPartialFlushViaRPC(this.stateSender, true);
        this.needToUnblockRPC = true;
        try {
            if (trace) {
                log.trace("Retrieving/Applying post-flush commits");
            }
            processCommitLog(objectInput);
            if (trace) {
                log.trace("Retrieving/Applying pending prepares");
            }
            Object objectFromObjectStream = this.marshaller.objectFromObjectStream(objectInput);
            while (objectFromObjectStream instanceof PrepareCommand) {
                PrepareCommand prepareCommand = (PrepareCommand) objectFromObjectStream;
                if (!this.transactionLog.hasPendingPrepare(prepareCommand)) {
                    if (trace) {
                        log.tracef("Applying pending prepare %s", prepareCommand);
                    }
                    this.commandsFactory.initializeReplicableCommand(prepareCommand, false);
                    RemoteTxInvocationContext createRemoteTxInvocationContext = this.invocationContextContainer.createRemoteTxInvocationContext(null);
                    createRemoteTxInvocationContext.setRemoteTransaction(this.txTable.createRemoteTransaction(prepareCommand.getGlobalTransaction(), prepareCommand.getModifications()));
                    createRemoteTxInvocationContext.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STATUS_CHECK);
                    this.interceptorChain.invoke(createRemoteTxInvocationContext, prepareCommand);
                } else if (trace) {
                    log.tracef("Prepare %s not in tx log; not applying", prepareCommand);
                }
                objectFromObjectStream = this.marshaller.objectFromObjectStream(objectInput);
            }
            assertDelimited(objectFromObjectStream);
        } catch (Exception e) {
            if (trace) {
                log.trace("Stopping RPC block");
            }
            mimicPartialFlushViaRPC(this.stateSender, false);
            this.needToUnblockRPC = false;
            throw e;
        }
    }

    private void mimicPartialFlushViaRPC(Address address, boolean z) throws Exception {
        StateTransferControlCommand buildStateTransferControlCommand = this.commandsFactory.buildStateTransferControlCommand(z);
        if (!z) {
            this.rpcManager.getTransport().getDistributedSync().releaseSync();
        }
        this.rpcManager.invokeRemotely(Collections.singletonList(address), buildStateTransferControlCommand, ResponseMode.SYNCHRONOUS, this.configuration.getStateRetrievalTimeout(), true);
        if (z) {
            this.rpcManager.getTransport().getDistributedSync().acquireSync();
        }
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void applyState(InputStream inputStream) throws StateTransferException {
        if (log.isDebugEnabled()) {
            log.debug("Applying state");
        }
        try {
            try {
                try {
                    ObjectInput startObjectInput = this.marshaller.startObjectInput(inputStream, false);
                    if (((Boolean) this.marshaller.objectFromObjectStream(startObjectInput)).booleanValue()) {
                        if (!((Boolean) this.marshaller.objectFromObjectStream(startObjectInput)).booleanValue()) {
                            if (log.isDebugEnabled()) {
                                log.debug("Provider cannot provide state!");
                            }
                            throw new StateTransferException("Provider cannot provide state!");
                        }
                        assertDelimited(startObjectInput);
                        if (this.cs != null) {
                            this.cs.clear();
                        }
                        if (this.transientState) {
                            applyInMemoryState(startObjectInput);
                        }
                        assertDelimited(startObjectInput);
                        if (this.persistentState) {
                            applyPersistentState(startObjectInput);
                        }
                        assertDelimited(startObjectInput);
                        applyTransactionLog(startObjectInput);
                        if (log.isDebugEnabled()) {
                            log.debug("State applied, closing object stream");
                        }
                    }
                    this.marshaller.finishObjectInput(startObjectInput);
                } catch (Exception e) {
                    throw new StateTransferException(e);
                }
            } catch (StateTransferException e2) {
                throw e2;
            }
        } catch (Throwable th) {
            this.marshaller.finishObjectInput(null);
            throw th;
        }
    }

    private void applyInMemoryState(ObjectInput objectInput) throws StateTransferException {
        this.dataContainer.clear();
        try {
            for (InternalCacheEntry internalCacheEntry : (Set) this.marshaller.objectFromObjectStream(objectInput)) {
                this.cache.withFlags(Flag.CACHE_MODE_LOCAL).put(internalCacheEntry.getKey(), internalCacheEntry.getValue(), internalCacheEntry.getLifespan(), TimeUnit.MILLISECONDS, internalCacheEntry.getMaxIdle(), TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            this.dataContainer.clear();
            throw new StateTransferException(e);
        }
    }

    private void generateInMemoryState(ObjectOutput objectOutput) throws StateTransferException {
        try {
            HashSet hashSet = new HashSet();
            for (InternalCacheEntry internalCacheEntry : this.dataContainer) {
                if (!internalCacheEntry.isExpired()) {
                    hashSet.add(internalCacheEntry);
                }
            }
            if (log.isDebugEnabled()) {
                log.debugf("Writing %s StoredEntries to stream", Integer.valueOf(hashSet.size()));
            }
            this.marshaller.objectToObjectStream(hashSet, objectOutput);
        } catch (Exception e) {
            throw new StateTransferException(e);
        }
    }

    private void applyPersistentState(ObjectInput objectInput) throws StateTransferException {
        try {
            this.cs.fromStream(new UnclosableObjectInputStream(objectInput));
        } catch (CacheLoaderException e) {
            throw new StateTransferException(e);
        }
    }

    private void generatePersistentState(ObjectOutput objectOutput) throws StateTransferException {
        try {
            if (trace) {
                log.trace("Generate persistent state");
            }
            this.cs.toStream(new UnclosableObjectOutputStream(objectOutput));
        } catch (CacheLoaderException e) {
            throw new StateTransferException(e);
        }
    }

    private void delimit(ObjectOutput objectOutput) throws IOException {
        this.marshaller.objectToObjectStream(DELIMITER, objectOutput);
    }

    private void assertDelimited(ObjectInput objectInput) throws StateTransferException {
        try {
            assertDelimited(this.marshaller.objectFromObjectStream(objectInput));
        } catch (Exception e) {
            throw new StateTransferException(e);
        }
    }

    private void assertDelimited(Object obj) throws StateTransferException {
        if (obj instanceof Exception) {
            throw new StateTransferException((Exception) obj);
        }
        if (!DELIMITER.equals(obj)) {
            throw new StateTransferException("Expected a delimiter, recieved " + obj);
        }
    }
}
