package org.infinispan.scattered.impl;

import io.reactivex.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.metadata.InternalMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateConsumerImpl;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final-redhat-00001.jar:org/infinispan/scattered/impl/ScatteredStateConsumerImpl.class */
public class ScatteredStateConsumerImpl extends StateConsumerImpl {
    private static final Log log = LogFactory.getLog(ScatteredStateConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    protected static final long SKIP_OWNERSHIP_FLAGS = FlagBitSets.SKIP_OWNERSHIP_CHECK;

    @Inject
    protected InternalEntryFactory entryFactory;

    @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR)
    @Inject
    protected ExecutorService asyncExecutor;

    @Inject
    protected ScatteredVersionManager svm;

    @GuardedBy("transferMapsLock")
    protected IntSet inboundSegments;
    protected BlockingQueue<InternalCacheEntry> backupQueue;
    protected Collection<Address> backupAddress;
    protected Collection<Address> nonBackupAddresses;
    private int chunkSize;
    protected AtomicLong chunkCounter = new AtomicLong();
    protected final ConcurrentMap<Address, BlockingQueue<Object>> retrievedEntries = new ConcurrentHashMap();
    protected final ConcurrentMap<Address, BlockingQueue<KeyAndVersion>> invalidations = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-9.4.19.Final-redhat-00001.jar:org/infinispan/scattered/impl/ScatteredStateConsumerImpl$KeyAndVersion.class */
    public static class KeyAndVersion {
        public final Object key;
        public final EntryVersion version;

        public KeyAndVersion(Object obj, EntryVersion entryVersion) {
            this.key = obj;
            this.version = entryVersion;
        }
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl
    public void start() {
        super.start();
        this.chunkSize = this.configuration.clustering().stateTransfer().chunkSize();
        this.backupQueue = new ArrayBlockingQueue(this.chunkSize);
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl, org.infinispan.statetransfer.StateConsumer
    public CompletableFuture<Void> onTopologyUpdate(CacheTopology cacheTopology, boolean z) {
        Address nextMember = getNextMember(cacheTopology);
        this.backupAddress = nextMember == null ? Collections.emptySet() : Collections.singleton(nextMember);
        this.nonBackupAddresses = new ArrayList(cacheTopology.getActualMembers());
        this.nonBackupAddresses.remove(nextMember);
        this.nonBackupAddresses.remove(this.rpcManager.getAddress());
        return super.onTopologyUpdate(cacheTopology, z);
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl
    protected void beforeTopologyInstalled(int i, boolean z, ConsistentHash consistentHash, ConsistentHash consistentHash2) {
        for (int i2 = 0; i2 < consistentHash2.getNumSegments(); i2++) {
            if (!consistentHash2.isSegmentLocalToNode(this.rpcManager.getAddress(), i2)) {
                cancelTransfers(IntSets.immutableSet(i2));
                this.svm.unregisterSegment(i2);
            }
        }
        IntSet ownedSegments = getOwnedSegments(consistentHash2);
        if (consistentHash != null && !ownedSegments.isEmpty()) {
            ownedSegments.removeAll(getOwnedSegments(consistentHash));
        }
        this.svm.setTopologyId(i);
        if (consistentHash == null || !this.isFetchEnabled) {
            log.trace("This is the first topology or state transfer is disabled, not expecting any state transfer.");
            this.svm.setOwnedSegments(ownedSegments);
        } else {
            if (ownedSegments.isEmpty()) {
                return;
            }
            this.svm.setValuesTransferTopology(i);
            PrimitiveIterator.OfInt it = ownedSegments.iterator();
            while (it.hasNext()) {
                this.svm.registerSegment(it.nextInt());
            }
        }
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl
    protected void handleSegments(boolean z, IntSet intSet, IntSet intSet2) {
        if (!z) {
            log.trace("This is not a rebalance, not doing anything...");
            return;
        }
        if (intSet.isEmpty()) {
            log.trace("No segments missing");
            return;
        }
        synchronized (this.transferMapsLock) {
            this.inboundSegments = IntSets.mutableFrom(intSet);
        }
        this.chunkCounter.set(0L);
        if (trace) {
            log.tracef("Revoking all segments, chunk counter reset to 0", new Object[0]);
        }
        this.rpcManager.blocking(this.rpcManager.invokeCommandOnAll(this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.CONFIRM_REVOKED_SEGMENTS, this.rpcManager.getAddress(), this.cacheTopology.getTopologyId(), intSet), MapResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions()).whenComplete((map, th) -> {
            if (th == null) {
                try {
                    this.svm.startKeyTransfer(intSet);
                    requestKeyTransfer(intSet);
                    return;
                } catch (SuspectException e) {
                    log.tracef("Key transfer source %s was suspected, another source will be selected", e.getSuspect());
                    return;
                } catch (Throwable th) {
                    log.failedToRequestSegments(this.cacheName, null, intSet, th);
                    return;
                }
            }
            if (this.cache.wired().getStatus() == ComponentStatus.RUNNING) {
                log.failedConfirmingRevokedSegments(th);
            } else {
                log.debug("Failed confirming revoked segments", th);
            }
            PrimitiveIterator.OfInt it = intSet.iterator();
            while (it.hasNext()) {
                this.svm.notifyKeyTransferFinished(((Integer) it.next()).intValue(), false, false);
            }
            notifyEndOfStateTransferIfNeeded();
        }));
    }

    private void requestKeyTransfer(IntSet intSet) {
        boolean z = false;
        synchronized (this.transferMapsLock) {
            ArrayList<Address> arrayList = new ArrayList(this.cacheTopology.getActualMembers());
            Collections.shuffle(arrayList);
            for (Address address : arrayList) {
                if (!address.equals(this.rpcManager.getAddress())) {
                    z = true;
                    InboundTransferTask inboundTransferTask = new InboundTransferTask(intSet, address, this.cacheTopology.getTopologyId(), this.rpcManager, this.commandsFactory, this.configuration.clustering().stateTransfer().timeout(), this.cacheName, true);
                    addTransfer(inboundTransferTask, intSet);
                    this.stateRequestExecutor.executeAsync(() -> {
                        log.tracef("Requesting keys for segments %s from %s", inboundTransferTask.getSegments(), inboundTransferTask.getSource());
                        return inboundTransferTask.requestKeys().whenComplete((r5, th) -> {
                            onTaskCompletion(inboundTransferTask);
                        });
                    });
                }
            }
        }
        if (z) {
            return;
        }
        log.trace("No keys in transfer, finishing segments " + intSet);
        PrimitiveIterator.OfInt it = intSet.iterator();
        while (it.hasNext()) {
            this.svm.notifyKeyTransferFinished(((Integer) it.next()).intValue(), false, false);
        }
        notifyEndOfStateTransferIfNeeded();
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl
    protected void onTaskCompletion(InboundTransferTask inboundTransferTask) {
        boolean z;
        IntSet immutableEmptySet = IntSets.immutableEmptySet();
        if (trace) {
            log.tracef("Inbound transfer finished %s: %s", inboundTransferTask, inboundTransferTask.isCompletedSuccessfully() ? "successfully" : "unsuccessfuly");
        }
        synchronized (this.transferMapsLock) {
            PrimitiveIterator.OfInt it = inboundTransferTask.getSegments().iterator();
            while (it.hasNext()) {
                int nextInt = it.nextInt();
                List<InboundTransferTask> list = this.transfersBySegment.get(Integer.valueOf(nextInt));
                if (list == null) {
                    log.tracef("Transfers for segment %d have not been found.", nextInt);
                } else {
                    list.remove(inboundTransferTask);
                    if (list.isEmpty()) {
                        this.transfersBySegment.remove(Integer.valueOf(nextInt));
                        if (trace) {
                            log.tracef("All transfer tasks for segment %d have completed.", nextInt);
                        }
                        this.svm.notifyKeyTransferFinished(nextInt, inboundTransferTask.isCompletedSuccessfully(), inboundTransferTask.isCancelled());
                        switch (immutableEmptySet.size()) {
                            case 0:
                                immutableEmptySet = IntSets.immutableSet(nextInt);
                                continue;
                            case 1:
                                immutableEmptySet = IntSets.mutableCopyFrom(immutableEmptySet);
                                break;
                        }
                        immutableEmptySet.add(nextInt);
                    }
                }
            }
        }
        if (immutableEmptySet.isEmpty()) {
            log.tracef("Not requesting any values yet because no segments have been completed.", new Object[0]);
        } else if (inboundTransferTask.isCompletedSuccessfully()) {
            IntSet intSet = immutableEmptySet;
            log.tracef("Requesting values from segments %s, for in-memory keys", intSet);
            this.dataContainer.forEach(intSet, internalCacheEntry -> {
                if (!(internalCacheEntry.getMetadata() instanceof RemoteMetadata)) {
                    backupEntry(internalCacheEntry);
                    Iterator<Address> it2 = this.nonBackupAddresses.iterator();
                    while (it2.hasNext()) {
                        invalidate(internalCacheEntry.getKey(), internalCacheEntry.getMetadata().version(), it2.next());
                    }
                    return;
                }
                Address address = ((RemoteMetadata) internalCacheEntry.getMetadata()).getAddress();
                retrieveEntry(internalCacheEntry.getKey(), address);
                for (Address address2 : this.cacheTopology.getActualMembers()) {
                    if (!address2.equals(address)) {
                        invalidate(internalCacheEntry.getKey(), internalCacheEntry.getMetadata().version(), address2);
                    }
                }
            });
            AdvancedCacheLoader stateTransferProvider = this.persistenceManager.getStateTransferProvider();
            if (stateTransferProvider != null) {
                try {
                    CollectionKeyFilter collectionKeyFilter = new CollectionKeyFilter(new ReadOnlyDataContainerBackedKeySet(this.dataContainer));
                    AdvancedCacheLoader.CacheLoaderTask cacheLoaderTask = (marshalledEntry, taskContext) -> {
                        if (intSet.contains(this.keyPartitioner.getSegment(marshalledEntry.getKey()))) {
                            try {
                                InternalMetadata metadata = marshalledEntry.getMetadata();
                                if (metadata instanceof RemoteMetadata) {
                                    Address address = ((RemoteMetadata) metadata).getAddress();
                                    retrieveEntry(marshalledEntry.getKey(), address);
                                    for (Address address2 : this.cacheTopology.getActualMembers()) {
                                        if (!address2.equals(address)) {
                                            invalidate(marshalledEntry.getKey(), metadata.version(), address2);
                                        }
                                    }
                                } else {
                                    backupEntry(this.entryFactory.create((InternalEntryFactory) marshalledEntry.getKey(), marshalledEntry.getValue(), (Metadata) marshalledEntry.getMetadata()));
                                    Iterator<Address> it2 = this.nonBackupAddresses.iterator();
                                    while (it2.hasNext()) {
                                        invalidate(marshalledEntry.getKey(), metadata.version(), it2.next());
                                    }
                                }
                            } catch (CacheException e) {
                                log.failedLoadingValueFromCacheStore(marshalledEntry.getKey(), e);
                            }
                        }
                    };
                    Objects.requireNonNull(collectionKeyFilter);
                    Flowable.fromPublisher(stateTransferProvider.publishEntries(collectionKeyFilter::accept, true, true)).blockingForEach(marshalledEntry2 -> {
                        cacheLoaderTask.processEntry(marshalledEntry2, null);
                    });
                } catch (CacheException e) {
                    log.failedLoadingKeysFromCacheStore(e);
                }
            }
        }
        synchronized (this.transferMapsLock) {
            this.inboundSegments.removeAll(immutableEmptySet);
            log.tracef("Unfinished inbound segments: " + this.inboundSegments, new Object[0]);
            z = this.inboundSegments.isEmpty();
        }
        if (z) {
            for (Map.Entry<Address, BlockingQueue<Object>> entry : this.retrievedEntries.entrySet()) {
                BlockingQueue<Object> value = entry.getValue();
                ArrayList arrayList = new ArrayList(value.size());
                value.drainTo(arrayList);
                if (!arrayList.isEmpty()) {
                    getValuesAndApply(entry.getKey(), arrayList);
                }
            }
            ArrayList arrayList2 = new ArrayList(this.backupQueue.size());
            this.backupQueue.drainTo(arrayList2);
            if (!arrayList2.isEmpty()) {
                backupEntries(arrayList2);
            }
            for (Map.Entry<Address, BlockingQueue<KeyAndVersion>> entry2 : this.invalidations.entrySet()) {
                BlockingQueue<KeyAndVersion> value2 = entry2.getValue();
                ArrayList arrayList3 = new ArrayList(value2.size());
                value2.drainTo(arrayList3);
                if (!arrayList3.isEmpty()) {
                    invalidate(arrayList3, entry2.getKey());
                }
            }
        }
        removeTransfer(inboundTransferTask);
        if (trace) {
            log.tracef("Inbound transfer removed, chunk counter is %s", this.chunkCounter.get());
        }
        if (this.chunkCounter.get() == 0) {
            notifyEndOfStateTransferIfNeeded();
        }
    }

    private <T> List<T> offerAndDrain(BlockingQueue<T> blockingQueue, T t) {
        ArrayList arrayList = null;
        if (!blockingQueue.offer(t)) {
            arrayList = new ArrayList(this.chunkSize);
            arrayList.add(t);
            blockingQueue.drainTo(arrayList, this.chunkSize - 1);
        } else if (blockingQueue.size() >= this.chunkSize) {
            arrayList = new ArrayList(this.chunkSize);
            blockingQueue.drainTo(arrayList, this.chunkSize);
        }
        return arrayList;
    }

    private void invalidate(Object obj, EntryVersion entryVersion, Address address) {
        List<KeyAndVersion> offerAndDrain = offerAndDrain(this.invalidations.computeIfAbsent(address, address2 -> {
            return new ArrayBlockingQueue(this.chunkSize);
        }), new KeyAndVersion(obj, entryVersion));
        if (offerAndDrain == null || offerAndDrain.isEmpty()) {
            return;
        }
        invalidate(offerAndDrain, address);
    }

    private void invalidate(List<KeyAndVersion> list, Address address) {
        Object[] objArr = new Object[list.size()];
        int[] iArr = new int[list.size()];
        long[] jArr = new long[list.size()];
        int i = 0;
        for (KeyAndVersion keyAndVersion : list) {
            objArr[i] = keyAndVersion.key;
            SimpleClusteredVersion simpleClusteredVersion = (SimpleClusteredVersion) keyAndVersion.version;
            iArr[i] = simpleClusteredVersion.topologyId;
            jArr[i] = simpleClusteredVersion.version;
            i++;
        }
        long incrementAndGet = this.chunkCounter.incrementAndGet();
        if (trace) {
            log.tracef("Invalidating versions on %s, chunk counter incremented to %d", address, Long.valueOf(incrementAndGet));
        }
        this.rpcManager.invokeCommand(address, this.commandsFactory.buildInvalidateVersionsCommand(this.cacheTopology.getTopologyId(), objArr, iArr, jArr, true), SingleResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions()).whenComplete((validResponse, th) -> {
            if (th != null) {
                log.failedInvalidatingRemoteCache(th);
            }
            long decrementAndGet = this.chunkCounter.decrementAndGet();
            if (trace) {
                log.tracef("Versions invalidated on %s, chunk counter decremented to %d", address, Long.valueOf(decrementAndGet));
            }
            if (decrementAndGet == 0) {
                notifyEndOfStateTransferIfNeeded();
            }
        });
    }

    private void backupEntry(InternalCacheEntry internalCacheEntry) {
        List<InternalCacheEntry> offerAndDrain = offerAndDrain(this.backupQueue, internalCacheEntry);
        if (offerAndDrain == null || offerAndDrain.isEmpty()) {
            return;
        }
        backupEntries(offerAndDrain);
    }

    private void backupEntries(List<InternalCacheEntry> list) {
        long incrementAndGet = this.chunkCounter.incrementAndGet();
        if (trace) {
            log.tracef("Backing up entries, chunk counter is %d", incrementAndGet);
        }
        HashMap hashMap = new HashMap();
        for (InternalCacheEntry internalCacheEntry : list) {
            hashMap.put(internalCacheEntry.getKey(), internalCacheEntry.toInternalCacheValue());
        }
        PutMapCommand buildPutMapCommand = this.commandsFactory.buildPutMapCommand(hashMap, null, STATE_TRANSFER_FLAGS);
        buildPutMapCommand.setTopologyId(this.rpcManager.getTopologyId());
        this.rpcManager.invokeCommand(this.backupAddress, buildPutMapCommand, SingleResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions()).whenComplete((validResponse, th) -> {
            if (th != null) {
                try {
                    log.failedOutBoundTransferExecution(th);
                } finally {
                    long decrementAndGet = this.chunkCounter.decrementAndGet();
                    if (trace) {
                        log.tracef("Backed up entries, chunk counter is %d", decrementAndGet);
                    }
                    if (decrementAndGet == 0) {
                        notifyEndOfStateTransferIfNeeded();
                    }
                }
            }
        });
    }

    private void retrieveEntry(Object obj, Address address) {
        List<Object> offerAndDrain = offerAndDrain(this.retrievedEntries.computeIfAbsent(address, address2 -> {
            return new ArrayBlockingQueue(this.chunkSize);
        }), obj);
        if (offerAndDrain == null || offerAndDrain.isEmpty()) {
            return;
        }
        getValuesAndApply(address, offerAndDrain);
    }

    private void getValuesAndApply(Address address, List<Object> list) {
        long incrementAndGet = this.chunkCounter.incrementAndGet();
        if (trace) {
            log.tracef("Retrieving values, chunk counter is %d", incrementAndGet);
        }
        ClusteredGetAllCommand buildClusteredGetAllCommand = this.commandsFactory.buildClusteredGetAllCommand(list, SKIP_OWNERSHIP_FLAGS, null);
        buildClusteredGetAllCommand.setTopologyId(this.rpcManager.getTopologyId());
        this.rpcManager.invokeCommand(address, buildClusteredGetAllCommand, SingleResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions()).whenComplete((validResponse, th) -> {
            try {
                try {
                    if (th != null) {
                        throw log.exceptionProcessingEntryRetrievalValues(th);
                    }
                    applyValues(address, list, validResponse);
                    long decrementAndGet = this.chunkCounter.decrementAndGet();
                    if (trace) {
                        log.tracef("Applied values, chunk counter is %d", decrementAndGet);
                    }
                    if (decrementAndGet == 0) {
                        notifyEndOfStateTransferIfNeeded();
                    }
                } catch (Throwable th) {
                    log.failedProcessingValuesDuringRebalance(th);
                    throw th;
                }
            } catch (Throwable th2) {
                long decrementAndGet2 = this.chunkCounter.decrementAndGet();
                if (trace) {
                    log.tracef("Applied values, chunk counter is %d", decrementAndGet2);
                }
                if (decrementAndGet2 == 0) {
                    notifyEndOfStateTransferIfNeeded();
                }
                throw th2;
            }
        });
    }

    private void applyValues(Address address, List<Object> list, Response response) {
        if (response == null) {
            throw new CacheException("Did not get response from " + address);
        }
        if (!response.isSuccessful()) {
            throw new CacheException("Response from " + address + " is unsuccessful: " + response);
        }
        InternalCacheValue[] internalCacheValueArr = (InternalCacheValue[]) ((SuccessfulResponse) response).getResponseValue();
        if (internalCacheValueArr == null) {
            throw new IllegalStateException();
        }
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            InternalCacheValue internalCacheValue = internalCacheValueArr[i];
            if (internalCacheValue != null) {
                try {
                    this.interceptorChain.invoke(this.icf.createSingleKeyNonTxInvocationContext(), this.commandsFactory.buildPutKeyValueCommand(obj, internalCacheValue.getValue(), this.keyPartitioner.getSegment(obj), new InternalMetadataImpl(internalCacheValue), STATE_TRANSFER_FLAGS));
                } catch (Exception e) {
                    if (!this.cache.wired().getStatus().allowInvocations()) {
                        log.debugf("Cache %s is shutting down, stopping state transfer", this.cacheName);
                        return;
                    }
                    log.problemApplyingStateForKey(e.getMessage(), obj, e);
                }
            }
        }
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl, org.infinispan.statetransfer.StateConsumer
    public void stopApplyingState(int i) {
        this.svm.notifyValueTransferFinished();
        super.stopApplyingState(i);
    }

    @Override // org.infinispan.statetransfer.StateConsumerImpl
    protected void removeStaleData(IntSet intSet) throws InterruptedException {
    }

    private Address getNextMember(CacheTopology cacheTopology) {
        Address address = this.rpcManager.getAddress();
        List<Address> actualMembers = cacheTopology.getActualMembers();
        if (actualMembers.size() <= 1) {
            return null;
        }
        Iterator<Address> it = actualMembers.iterator();
        while (it.hasNext()) {
            if (it.next().equals(address)) {
                return it.hasNext() ? it.next() : actualMembers.get(0);
            }
        }
        return null;
    }
}
