package org.infinispan.xsite.irac;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.irac.IracVersionGenerator;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.XSiteResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.ExponentialBackOffImpl;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.infinispan.xsite.status.DefaultTakeOfflineManager;
import org.infinispan.xsite.status.SiteState;
import org.infinispan.xsite.status.TakeOfflineManager;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:m2repo/org/infinispan/infinispan-core/11.0.9.Final/infinispan-core-11.0.9.Final.jar:org/infinispan/xsite/irac/DefaultIracManager.class */
public class DefaultIracManager implements IracManager, Runnable {
    private static final Log log = LogFactory.getLog(DefaultIracManager.class);
    private static final boolean trace = log.isTraceEnabled();

    @Inject
    RpcManager rpcManager;

    @Inject
    Configuration config;

    @Inject
    TakeOfflineManager takeOfflineManager;

    @Inject
    ClusteringDependentLogic clusteringDependentLogic;

    @Inject
    CommandsFactory commandsFactory;

    @Inject
    IracVersionGenerator iracVersionGenerator;
    private final Map<Object, Object> updatedKeys = new ConcurrentHashMap();
    private final Semaphore senderNotifier = new Semaphore(0);
    private volatile ExponentialBackOff backOff = new ExponentialBackOffImpl();
    private volatile boolean hasClear;
    private volatile Collection<XSiteBackup> asyncBackups;
    private volatile Thread sender;
    private volatile boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:m2repo/org/infinispan/infinispan-core/11.0.9.Final/infinispan-core-11.0.9.Final.jar:org/infinispan/xsite/irac/DefaultIracManager$CleanupTask.class */
    public class CleanupTask implements Runnable {
        final Object key;
        final int segmentId;
        final Object lockOwner;
        volatile IracMetadata tombstone;

        private CleanupTask(Object obj, int i, Object obj2) {
            this.key = obj;
            this.segmentId = i;
            this.lockOwner = obj2;
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultIracManager.this.removeKey(this.key, this.segmentId, this.lockOwner, this.tombstone);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:m2repo/org/infinispan/infinispan-core/11.0.9.Final/infinispan-core-11.0.9.Final.jar:org/infinispan/xsite/irac/DefaultIracManager$ResponseResult.class */
    public enum ResponseResult {
        OK,
        REMOTE_EXCEPTION,
        NETWORK_EXCEPTION
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:m2repo/org/infinispan/infinispan-core/11.0.9.Final/infinispan-core-11.0.9.Final.jar:org/infinispan/xsite/irac/DefaultIracManager$SendKeyTask.class */
    public class SendKeyTask implements BiConsumer<Object, Object> {
        private final List<CompletionStage<Void>> responses;
        private final List<CleanupTask> cleanupTasks;

        private SendKeyTask() {
            this.responses = new LinkedList();
            this.cleanupTasks = new LinkedList();
        }

        @Override // java.util.function.BiConsumer
        public void accept(Object obj, Object obj2) {
            DistributionInfo distributionInfoKey = DefaultIracManager.this.getDistributionInfoKey(obj);
            if (distributionInfoKey.isPrimary()) {
                if (!distributionInfoKey.isWriteOwner()) {
                    this.cleanupTasks.add(new CleanupTask(obj, distributionInfoKey.segmentId(), obj2));
                    return;
                }
                if (distributionInfoKey.isReadOwner()) {
                    CleanupTask cleanupTask = new CleanupTask(obj, distributionInfoKey.segmentId(), obj2);
                    CompletionStage thenApply = DefaultIracManager.this.fetchEntry(obj, distributionInfoKey.segmentId()).thenApply(internalCacheEntry -> {
                        return internalCacheEntry == null ? DefaultIracManager.this.buildRemoveCommand(cleanupTask) : DefaultIracManager.this.commandsFactory.buildIracPutKeyCommand(internalCacheEntry);
                    });
                    DefaultIracManager defaultIracManager = DefaultIracManager.this;
                    this.responses.add(thenApply.thenCompose(xSiteReplicateCommand -> {
                        return defaultIracManager.sendCommandToAllBackups(xSiteReplicateCommand);
                    }).thenRun(cleanupTask));
                }
            }
        }

        void await() throws InterruptedException {
            this.cleanupTasks.forEach((v0) -> {
                v0.run();
            });
            boolean z = false;
            Iterator<CompletionStage<Void>> it = this.responses.iterator();
            while (it.hasNext()) {
                if (DefaultIracManager.this.awaitResponses(it.next()) == ResponseResult.NETWORK_EXCEPTION) {
                    z = true;
                }
            }
            if (z) {
                DefaultIracManager.this.backOff.backoffSleep();
            } else {
                DefaultIracManager.this.backOff.reset();
            }
        }
    }

    private static Collection<XSiteBackup> asyncBackups(Configuration configuration, String str) {
        return (Collection) configuration.sites().asyncBackupsStream().filter(backupConfiguration -> {
            return !str.equals(backupConfiguration.site());
        }).map(backupConfiguration2 -> {
            return new XSiteBackup(backupConfiguration2.site(), true, backupConfiguration2.replicationTimeout());
        }).collect(Collectors.toList());
    }

    private static Stream<?> keyStream(WriteCommand writeCommand) {
        return writeCommand.getAffectedKeys().stream();
    }

    private static boolean backupToRemoteSite(WriteCommand writeCommand) {
        return !writeCommand.hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP);
    }

    private static IntSet newIntSet(Address address) {
        return IntSets.mutableEmptySet();
    }

    @Start
    public void start() {
        Transport transport = this.rpcManager.getTransport();
        transport.checkCrossSiteAvailable();
        this.asyncBackups = asyncBackups(this.config, transport.localSiteName());
        if (trace) {
            log.tracef("Async remote sites found: %s", (String) this.asyncBackups.stream().map((v0) -> {
                return v0.getSiteName();
            }).collect(Collectors.joining(", ")));
        }
        Thread thread = this.sender;
        if (thread != null) {
            thread.interrupt();
        }
        this.senderNotifier.drainPermits();
        this.running = true;
        this.hasClear = false;
        Thread thread2 = new Thread(this, "irac-sender-thread-" + transport.getAddress());
        this.sender = thread2;
        thread2.start();
    }

    @Stop
    public void stop() {
        this.running = false;
        Thread thread = this.sender;
        if (thread != null) {
            thread.interrupt();
        }
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void trackUpdatedKey(Object obj, Object obj2) {
        if (trace) {
            log.tracef("Tracking key for %s: %s", obj2, obj);
        }
        this.updatedKeys.put(obj, obj2);
        this.senderNotifier.release();
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public <K> void trackUpdatedKeys(Collection<K> collection, Object obj) {
        if (trace) {
            log.tracef("Tracking keys for %s: %s", obj, collection);
        }
        if (collection.isEmpty()) {
            return;
        }
        collection.forEach(obj2 -> {
            this.updatedKeys.put(obj2, obj);
        });
        this.senderNotifier.release();
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void trackKeysFromTransaction(Stream<WriteCommand> stream, GlobalTransaction globalTransaction) {
        keysFromMods(stream).forEach(obj -> {
            if (trace) {
                log.tracef("Tracking key for %s: %s", globalTransaction, obj);
            }
            this.updatedKeys.put(obj, globalTransaction);
        });
        this.senderNotifier.release();
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void trackClear() {
        if (trace) {
            log.trace("Tracking clear request");
        }
        this.hasClear = true;
        this.updatedKeys.clear();
        this.senderNotifier.release();
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void cleanupKey(Object obj, Object obj2, IracMetadata iracMetadata) {
        boolean remove = this.updatedKeys.remove(obj, obj2);
        this.iracVersionGenerator.removeTombstone(obj, iracMetadata);
        if (trace) {
            log.tracef("Removing key '%s'. LockOwner='%s', removed=%s", obj, obj2, Boolean.valueOf(remove));
        }
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void onTopologyUpdate(CacheTopology cacheTopology, CacheTopology cacheTopology2) {
        if (trace) {
            log.trace("[IRAC] Topology Updated. Checking pending keys.");
        }
        Address address = this.rpcManager.getAddress();
        if (cacheTopology2.getMembers().contains(address)) {
            IntSet mutableCopyFrom = IntSets.mutableCopyFrom(cacheTopology2.getWriteConsistentHash().getSegmentsForOwner(address));
            if (cacheTopology.getMembers().contains(address)) {
                mutableCopyFrom.removeAll(cacheTopology.getWriteConsistentHash().getSegmentsForOwner(address));
            }
            if (mutableCopyFrom.isEmpty()) {
                this.senderNotifier.release();
                return;
            }
            HashMap hashMap = new HashMap();
            PrimitiveIterator.OfInt it = mutableCopyFrom.iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                ((IntSet) hashMap.computeIfAbsent(cacheTopology2.getWriteConsistentHash().locatePrimaryOwnerForSegment(intValue), DefaultIracManager::newIntSet)).add(intValue);
            }
            hashMap.forEach(this::sendStateRequest);
            this.senderNotifier.release();
        }
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void requestState(Address address, IntSet intSet) {
        this.updatedKeys.forEach((obj, obj2) -> {
            sendStateIfNeeded(address, intSet, obj, obj2);
        });
    }

    @Override // org.infinispan.xsite.irac.IracManager
    public void receiveState(Object obj, Object obj2, IracMetadata iracMetadata) {
        this.iracVersionGenerator.storeTombstoneIfAbsent(obj, iracMetadata);
        this.updatedKeys.putIfAbsent(obj, obj2);
        this.senderNotifier.release();
    }

    public void sendStateIfNeeded(Address address, IntSet intSet, Object obj, Object obj2) {
        if (intSet.contains(getSegment(obj))) {
            this.rpcManager.sendTo(address, this.commandsFactory.buildIracStateResponseCommand(obj, obj2, this.iracVersionGenerator.getTombstone(obj)), DeliverOrder.NONE);
        }
    }

    public Stream<?> keysFromMods(Stream<WriteCommand> stream) {
        return stream.filter((v0) -> {
            return v0.isSuccessful();
        }).filter(DefaultIracManager::backupToRemoteSite).flatMap(DefaultIracManager::keyStream).filter(this::isWriteOwner);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                this.senderNotifier.acquire();
                this.senderNotifier.drainPermits();
                periodicSend();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public void setBackOff(ExponentialBackOff exponentialBackOff) {
        this.backOff = (ExponentialBackOff) Objects.requireNonNull(exponentialBackOff);
    }

    public boolean isEmpty() {
        return this.updatedKeys.isEmpty();
    }

    private void sendStateRequest(Address address, IntSet intSet) {
        this.rpcManager.sendTo(address, this.commandsFactory.buildIracRequestStateCommand(intSet), DeliverOrder.NONE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ResponseResult awaitResponses(CompletionStage<Void> completionStage) throws InterruptedException {
        try {
            completionStage.toCompletableFuture().get();
            return ResponseResult.OK;
        } catch (ExecutionException e) {
            if (trace) {
                log.trace("IRAC update not successful.", e);
            }
            this.senderNotifier.release();
            return DefaultTakeOfflineManager.isCommunicationError(e) ? ResponseResult.NETWORK_EXCEPTION : ResponseResult.REMOTE_EXCEPTION;
        }
    }

    private void periodicSend() throws InterruptedException {
        if (trace) {
            log.tracef("[IRAC] Sending keys to remote site(s). Has clear? %s, keys: %s", Boolean.valueOf(this.hasClear), this.updatedKeys.keySet());
        }
        if (this.hasClear) {
            switch (awaitResponses(sendCommandToAllBackups(this.commandsFactory.buildIracClearKeysCommand()))) {
                case REMOTE_EXCEPTION:
                    this.backOff.reset();
                    return;
                case NETWORK_EXCEPTION:
                    this.backOff.backoffSleep();
                    return;
                case OK:
                    this.hasClear = false;
                    this.backOff.reset();
                    break;
            }
        }
        try {
            SendKeyTask sendKeyTask = new SendKeyTask();
            this.updatedKeys.forEach(sendKeyTask);
            sendKeyTask.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } catch (Throwable th) {
            log.unexpectedErrorFromIrac(th);
        }
    }

    private XSiteResponse sendToRemoteSite(XSiteBackup xSiteBackup, XSiteReplicateCommand xSiteReplicateCommand) {
        XSiteResponse invokeXSite = this.rpcManager.invokeXSite(xSiteBackup, xSiteReplicateCommand);
        this.takeOfflineManager.registerRequest(invokeXSite);
        return invokeXSite;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeKey(Object obj, int i, Object obj2, IracMetadata iracMetadata) {
        if (trace) {
            log.tracef("Replication completed for key '%s'. Lock Owner='%s'", obj, obj2);
        }
        DistributionInfo distributionInfo = getDistributionInfo(i);
        this.rpcManager.sendToMany(distributionInfo.writeOwners(), this.commandsFactory.buildIracCleanupKeyCommand(obj, obj2, iracMetadata), DeliverOrder.NONE);
        cleanupKey(obj, obj2, iracMetadata);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DistributionInfo getDistributionInfoKey(Object obj) {
        return this.clusteringDependentLogic.getCacheTopology().getDistribution(obj);
    }

    private DistributionInfo getDistributionInfo(int i) {
        return this.clusteringDependentLogic.getCacheTopology().getDistribution(Integer.valueOf(i));
    }

    private int getSegment(Object obj) {
        return this.clusteringDependentLogic.getCacheTopology().getSegment(obj);
    }

    private boolean isWriteOwner(Object obj) {
        return getDistributionInfoKey(obj).isWriteOwner();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletionStage<Void> sendCommandToAllBackups(XSiteReplicateCommand xSiteReplicateCommand) {
        if (xSiteReplicateCommand == null) {
            return CompletableFutures.completedNull();
        }
        AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        for (XSiteBackup xSiteBackup : this.asyncBackups) {
            if (this.takeOfflineManager.getSiteState(xSiteBackup.getSiteName()) != SiteState.OFFLINE) {
                aggregateCompletionStage.dependsOn(sendToRemoteSite(xSiteBackup, xSiteReplicateCommand));
            }
        }
        return aggregateCompletionStage.freeze();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public XSiteReplicateCommand buildRemoveCommand(CleanupTask cleanupTask) {
        Object obj = cleanupTask.key;
        IracMetadata tombstone = this.iracVersionGenerator.getTombstone(obj);
        if (tombstone == null) {
            return null;
        }
        cleanupTask.tombstone = tombstone;
        return this.commandsFactory.buildIracRemoveKeyCommand(obj, tombstone);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletionStage<InternalCacheEntry<Object, Object>> fetchEntry(Object obj, int i) {
        return this.clusteringDependentLogic.getEntryLoader().loadAndStoreInDataContainer(obj, i);
    }
}
