package org.infinispan.server.hotrod.tx.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import javax.security.auth.Subject;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.tx.XidImpl;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.server.hotrod.HotRodHeader;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.server.hotrod.tx.table.CacheNameCollector;
import org.infinispan.server.hotrod.tx.table.CacheXid;
import org.infinispan.server.hotrod.tx.table.GlobalTxTable;
import org.infinispan.server.hotrod.tx.table.TxState;
import org.infinispan.util.ByteString;

/* loaded from: input_file:org/infinispan/server/hotrod/tx/operation/BaseCompleteTransactionOperation.class */
abstract class BaseCompleteTransactionOperation implements CacheNameCollector, Runnable {
    final XidImpl xid;
    final GlobalTxTable globalTxTable;
    final HotRodHeader header;
    final BiConsumer<HotRodHeader, Integer> reply;
    private final HotRodServer server;
    private final Subject subject;
    final ExecutorService asyncExecutor;
    final Collection<ByteString> cacheNames = new ConcurrentLinkedQueue();
    private final AtomicInteger expectedCaches = new AtomicInteger();
    volatile boolean hasErrors = false;
    volatile boolean hasCommits = false;
    volatile boolean hasRollbacks = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseCompleteTransactionOperation(HotRodHeader hotRodHeader, HotRodServer hotRodServer, Subject subject, XidImpl xidImpl, BiConsumer<HotRodHeader, Integer> biConsumer) {
        GlobalComponentRegistry globalComponentRegistry = SecurityActions.getGlobalComponentRegistry(hotRodServer.getCacheManager());
        this.globalTxTable = (GlobalTxTable) globalComponentRegistry.getComponent(GlobalTxTable.class);
        this.asyncExecutor = (ExecutorService) globalComponentRegistry.getComponent(ExecutorService.class, "org.infinispan.executors.async");
        this.header = hotRodHeader;
        this.server = hotRodServer;
        this.subject = subject;
        this.xid = xidImpl;
        this.reply = biConsumer;
    }

    @Override // org.infinispan.server.hotrod.tx.table.CacheNameCollector
    public final void expectedSize(int i) {
        this.expectedCaches.set(i);
    }

    @Override // org.infinispan.server.hotrod.tx.table.CacheNameCollector
    public final void noTransactionFound() {
        if (isTraceEnabled()) {
            log().tracef("[%s] No caches found.", this.xid);
        }
        this.reply.accept(this.header, -4);
    }

    abstract <T> BiFunction<T, Throwable, Void> handler();

    abstract void sendReply();

    abstract CacheRpcCommand buildRemoteCommand(Configuration configuration, CommandsFactory commandsFactory, TxState txState);

    abstract CacheRpcCommand buildForwardCommand(ByteString byteString, long j);

    abstract CompletableFuture<Void> asyncCompleteLocalTransaction(AdvancedCache<?, ?> advancedCache, long j);

    abstract Log log();

    abstract boolean isTraceEnabled();

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyCacheCollected() {
        int decrementAndGet = this.expectedCaches.decrementAndGet();
        if (isTraceEnabled()) {
            log().tracef("[%s] Cache collected. Missing=%s.", this.xid, Integer.valueOf(decrementAndGet));
        }
        if (decrementAndGet == 0) {
            onCachesCollected();
        }
    }

    private void onCachesCollected() {
        if (isTraceEnabled()) {
            log().tracef("[%s] All caches collected: %s", this.xid, this.cacheNames);
        }
        int size = this.cacheNames.size();
        if (size == 0) {
            sendReply();
            return;
        }
        ArrayList arrayList = new ArrayList(size);
        Iterator<ByteString> it = this.cacheNames.iterator();
        while (it.hasNext()) {
            try {
                arrayList.add(completeCache(it.next()));
            } catch (Throwable th) {
                this.hasErrors = true;
            }
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).thenRun(this::sendReply);
    }

    private CompletableFuture<Void> completeCache(ByteString byteString) throws Throwable {
        TxState state = this.globalTxTable.getState(new CacheXid(byteString, this.xid));
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(byteString.toString(), this.header.getVersion(), this.header.getMessageId(), true), this.header, this.subject);
        RpcManager rpcManager = cache.getRpcManager();
        if (rpcManager == null || rpcManager.getAddress().equals(state.getOriginator())) {
            if (isTraceEnabled()) {
                log().tracef("[%s] Completing local executed transaction.", this.xid);
            }
            return asyncCompleteLocalTransaction(cache, state.getTimeout());
        }
        if (rpcManager.getMembers().contains(state.getOriginator())) {
            if (isTraceEnabled()) {
                log().tracef("[%s] Forward remotely executed transaction to %s.", this.xid, state.getOriginator());
            }
            return forwardCompleteCommand(byteString, rpcManager, state);
        }
        if (isTraceEnabled()) {
            log().tracef("[%s] Originator, %s, left the cluster.", this.xid, state.getOriginator());
        }
        return completeWithRemoteCommand(cache, rpcManager, state);
    }

    private CompletableFuture<Void> completeWithRemoteCommand(AdvancedCache<?, ?> advancedCache, RpcManager rpcManager, TxState txState) throws Throwable {
        CommandsFactory commandsFactory = SecurityActions.getComponentRegistry(advancedCache).getCommandsFactory();
        CacheRpcCommand buildRemoteCommand = buildRemoteCommand(advancedCache.getCacheConfiguration(), commandsFactory, txState);
        CompletableFuture completableFuture = rpcManager.invokeCommandOnAll(buildRemoteCommand, VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle(handler()).toCompletableFuture();
        commandsFactory.initializeReplicableCommand(buildRemoteCommand, false);
        return CompletableFuture.allOf(completableFuture, buildRemoteCommand.invokeAsync().handle(handler()));
    }

    private CompletableFuture<Void> forwardCompleteCommand(ByteString byteString, RpcManager rpcManager, TxState txState) {
        return rpcManager.invokeCommand(txState.getOriginator(), buildForwardCommand(byteString, txState.getTimeout()), VoidResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()).handle(handler()).toCompletableFuture();
    }
}
