package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.control.StateTransferControlCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.jgroups.Event;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-5.1.0.ALPHA2.jar:org/infinispan/statetransfer/BaseStateTransferManagerImpl.class */
public abstract class BaseStateTransferManagerImpl implements StateTransferManager {
    private static final Log log = LogFactory.getLog(BaseStateTransferManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    protected CacheLoaderManager cacheLoaderManager;
    protected Configuration configuration;
    protected RpcManager rpcManager;
    private CacheManagerNotifier notifier;
    protected CommandsFactory cf;
    protected DataContainer dataContainer;
    protected InterceptorChain interceptorChain;
    protected InvocationContextContainer icc;
    protected CacheNotifier cacheNotifier;
    protected StateTransferLock stateTransferLock;
    protected volatile ConsistentHash chOld;
    private volatile int oldViewId;
    protected volatile ConsistentHash chNew;
    private volatile int newViewId;
    private final CountDownLatch joinStartedLatch = new CountDownLatch(1);
    private final CountDownLatch joinCompletedLatch = new CountDownLatch(1);
    private final ReclosableLatch stateTransferInProgressLatch = new ReclosableLatch(false);
    protected final ViewChangeListener listener = new ViewChangeListener();
    protected final PushConfirmationsMap pushConfirmations = new PushConfirmationsMap();
    protected final ExecutorService rehashExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1), new ThreadFactory() { // from class: org.infinispan.statetransfer.BaseStateTransferManagerImpl.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("Rehasher," + BaseStateTransferManagerImpl.this.configuration.getName() + "," + BaseStateTransferManagerImpl.this.rpcManager.getTransport().getAddress());
            return thread;
        }
    }, new ThreadPoolExecutor.DiscardOldestPolicy());

    @Listener
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.1.0.ALPHA2.jar:org/infinispan/statetransfer/BaseStateTransferManagerImpl$ViewChangeListener.class */
    public class ViewChangeListener {
        public ViewChangeListener() {
        }

        @Merged
        @ViewChanged
        public void handleViewChange(ViewChangedEvent viewChangedEvent) {
            BaseStateTransferManagerImpl.this.newViewReceived(viewChangedEvent.getViewId(), viewChangedEvent.getNewMembers(), false, viewChangedEvent.isMergeView());
        }
    }

    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier cacheManagerNotifier, CommandsFactory commandsFactory, DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer invocationContextContainer, CacheLoaderManager cacheLoaderManager, CacheNotifier cacheNotifier, StateTransferLock stateTransferLock) {
        this.cacheLoaderManager = cacheLoaderManager;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.notifier = cacheManagerNotifier;
        this.cf = commandsFactory;
        this.stateTransferLock = stateTransferLock;
        this.dataContainer = dataContainer;
        this.interceptorChain = interceptorChain;
        this.icc = invocationContextContainer;
        this.cacheNotifier = cacheNotifier;
    }

    @Start(priority = 21)
    private void start() throws Exception {
        if (trace) {
            log.tracef("Starting distribution manager on " + getAddress(), new Object[0]);
        }
        this.notifier.addListener(this.listener);
        Transport transport = this.rpcManager.getTransport();
        this.oldViewId = transport.getViewId();
        List<Address> members = transport.getMembers();
        this.chOld = createConsistentHash(members);
        newViewReceived(this.oldViewId, members, true, false);
        this.joinStartedLatch.countDown();
    }

    protected abstract ConsistentHash createConsistentHash(List<Address> list);

    @Override // org.infinispan.statetransfer.StateTransferManager
    @Start(priority = Event.USER_DEFINED)
    public void waitForJoinToComplete() throws InterruptedException {
        this.joinCompletedLatch.await(this.configuration.getRehashWaitTime(), TimeUnit.MILLISECONDS);
    }

    @Stop(priority = 20)
    public void stop() {
        this.notifier.removeListener(this.listener);
        this.rehashExecutor.shutdownNow();
        this.joinStartedLatch.countDown();
        this.joinCompletedLatch.countDown();
        this.stateTransferInProgressLatch.open();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Address getAddress() {
        return this.rpcManager.getAddress();
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public boolean hasJoinStarted() {
        return isLatchOpen(this.joinStartedLatch);
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void waitForJoinToStart() throws InterruptedException {
        this.joinStartedLatch.await(this.configuration.getRehashWaitTime(), TimeUnit.MILLISECONDS);
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public boolean isJoinComplete() {
        return isLatchOpen(this.joinCompletedLatch);
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public boolean isStateTransferInProgress() {
        return !isLatchOpen(this.stateTransferInProgressLatch);
    }

    public void waitForStateTransferToStart(int i) throws InterruptedException {
        while (isLatchOpen(this.stateTransferInProgressLatch) && this.newViewId < i) {
            Thread.sleep(1L);
        }
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void waitForStateTransferToComplete() throws InterruptedException {
        this.stateTransferInProgressLatch.await(this.configuration.getRehashWaitTime(), TimeUnit.MILLISECONDS);
    }

    private boolean isLatchOpen(CountDownLatch countDownLatch) {
        try {
            return countDownLatch.await(0L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return true;
        }
    }

    private boolean isLatchOpen(ReclosableLatch reclosableLatch) {
        try {
            return reclosableLatch.await(0L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return true;
        }
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void nodeCompletedPush(Address address, int i) {
        this.pushConfirmations.confirmPush(address, i);
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void requestJoin(Address address, int i) {
        this.pushConfirmations.confirmJoin(address, i);
    }

    @Override // org.infinispan.statetransfer.StateTransferManager
    public void applyState(Collection<InternalCacheEntry> collection, Address address, int i) throws InterruptedException {
        waitForStateTransferToStart(i);
        if (i < this.newViewId) {
            log.debugf("Rejecting state pushed by node %s for old rehash %d (last view id we know is %d)", address, Integer.valueOf(i), Integer.valueOf(this.newViewId));
            return;
        }
        log.debugf("Applying new state from %s: received %d keys", address, Integer.valueOf(collection.size()));
        if (trace) {
            log.tracef("Received keys: %s", keys(collection));
        }
        for (InternalCacheEntry internalCacheEntry : collection) {
            InvocationContext createInvocationContext = this.icc.createInvocationContext();
            createInvocationContext.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_LOCKING, Flag.SKIP_OWNERSHIP_CHECK);
            try {
                this.interceptorChain.invoke(createInvocationContext, this.cf.buildPutKeyValueCommand(internalCacheEntry.getKey(), internalCacheEntry.getValue(), internalCacheEntry.getLifespan(), internalCacheEntry.getMaxIdle(), createInvocationContext.getFlags()));
            } catch (Exception e) {
                log.problemApplyingStateForKey(e.getMessage(), internalCacheEntry.getKey());
            }
        }
        if (trace) {
            log.tracef("After applying state data container has %d keys", Integer.valueOf(this.dataContainer.size()));
        }
    }

    private Collection<Object> keys(Collection<InternalCacheEntry> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<InternalCacheEntry> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKey());
        }
        return arrayList;
    }

    public boolean startStateTransfer(int i, Collection<Address> collection, boolean z) throws TimeoutException, InterruptedException, PendingStateTransferException {
        if (z) {
            signalJoinStarted(i);
        }
        if (!this.pushConfirmations.waitForClusterToConfirmJoin(i, this.configuration.getRehashWaitTime())) {
            return false;
        }
        this.stateTransferInProgressLatch.close();
        return true;
    }

    public void endStateTransfer() {
        this.oldViewId = this.newViewId;
        this.chOld = this.chNew;
        this.stateTransferInProgressLatch.open();
        this.joinCompletedLatch.countDown();
    }

    void signalJoinStarted(int i) throws InterruptedException, TimeoutException, PendingStateTransferException {
        Address address = getAddress();
        if (trace) {
            log.tracef("Node %s joining the cluster, broadcasting join request.", address, Integer.valueOf(i));
        }
        this.pushConfirmations.confirmJoin(address, i);
        this.rpcManager.invokeRemotely((Collection<Address>) null, this.cf.buildStateTransferCommand(StateTransferControlCommand.Type.REQUEST_JOIN, address, i), ResponseMode.SYNCHRONOUS, this.configuration.getRehashRpcTimeout());
    }

    public void signalPushCompleted(int i) throws InterruptedException, TimeoutException, PendingStateTransferException {
        Address address = getAddress();
        if (trace) {
            log.tracef("Node %s finished pushing state for view %s, broadcasting push complete signal.", address, Integer.valueOf(i));
        }
        this.pushConfirmations.confirmPush(address, i);
        this.rpcManager.invokeRemotely((Collection<Address>) null, this.cf.buildStateTransferCommand(StateTransferControlCommand.Type.PUSH_COMPLETED, address, i), ResponseMode.SYNCHRONOUS, this.configuration.getRehashRpcTimeout());
        if (!this.pushConfirmations.waitForClusterToCompletePush(i, this.configuration.getRehashWaitTime())) {
            throw new PendingStateTransferException();
        }
    }

    public abstract CacheStore getCacheStoreForStateTransfer();

    public void pushStateToNode(NotifyingNotifiableFuture<Object> notifyingNotifiableFuture, int i, Address address, Collection<InternalCacheEntry> collection) throws PendingStateTransferException {
        checkForPendingRehash(i);
        log.debugf("Pushing to node %s %d keys", address, Integer.valueOf(collection.size()));
        log.tracef("Pushing to node %s keys: %s", address, keys(collection));
        this.rpcManager.invokeRemotelyInFuture(Collections.singleton(address), this.cf.buildStateTransferCommand(StateTransferControlCommand.Type.APPLY_STATE, getAddress(), i, collection), false, notifyingNotifiableFuture, this.configuration.getRehashRpcTimeout());
    }

    public boolean isLastViewId(int i) {
        return i == this.newViewId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkForPendingRehash(int i) throws PendingStateTransferException {
        if (i != this.newViewId) {
            throw new PendingStateTransferException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void newViewReceived(int i, List<Address> list, boolean z, boolean z2) {
        log.tracef("Received new cluster view: %d %s", Integer.valueOf(i), list);
        if (z) {
            this.pushConfirmations.initialViewReceived(i, list);
        } else {
            this.pushConfirmations.newViewReceived(i, list, z2);
        }
        this.newViewId = i;
        this.chNew = createConsistentHash(list);
        this.rehashExecutor.submit(createStateTransferTask(i, list, z));
    }

    protected abstract BaseStateTransferTask createStateTransferTask(int i, List<Address> list, boolean z);
}
