package org.infinispan.distexec.mapreduce;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.CancelCommand;
import org.infinispan.commands.CancellationService;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.CreateCacheCommand;
import org.infinispan.commands.read.MapCombineCommand;
import org.infinispan.commands.read.ReduceCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.AbstractInProcessFuture;
import org.infinispan.commons.util.concurrent.FutureListener;
import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.infinispan.commons.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.distexec.mapreduce.spi.MapReduceTaskLifecycleService;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptionsBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-6.0.0.Alpha2.jar:org/infinispan/distexec/mapreduce/MapReduceTask.class */
public class MapReduceTask<KIn, VIn, KOut, VOut> {
    private static final Log log = LogFactory.getLog(MapReduceTask.class);
    public static final String DEFAULT_TMP_CACHE_CONFIGURATION_NAME = "__tmpMapReduce";
    protected Mapper<KIn, VIn, KOut, VOut> mapper;
    protected Reducer<KOut, VOut> reducer;
    protected Reducer<KOut, VOut> combiner;
    protected final boolean distributeReducePhase;
    protected final boolean useIntermediateSharedCache;
    protected final Collection<KIn> keys;
    protected final AdvancedCache<KIn, VIn> cache;
    protected final Marshaller marshaller;
    protected final MapReduceManager mapReduceManager;
    protected final CancellationService cancellationService;
    protected final List<CancellableTaskPart> cancellableTasks;
    protected final UUID taskId;
    protected final ClusteringDependentLogic clusteringDependentLogic;
    protected final boolean isLocalOnly;
    protected RpcOptionsBuilder rpcOptionsBuilder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.0.0.Alpha2.jar:org/infinispan/distexec/mapreduce/MapReduceTask$CancellableTaskPart.class */
    public interface CancellableTaskPart {
        UUID getUUID();

        Address getExecutionTarget();
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.0.0.Alpha2.jar:org/infinispan/distexec/mapreduce/MapReduceTask$MapReduceTaskFuture.class */
    private class MapReduceTaskFuture<R> extends AbstractInProcessFuture<R> {
        private final Callable<R> call;
        private volatile boolean cancelled = false;
        private volatile boolean done = false;

        public MapReduceTaskFuture(Callable<R> callable) {
            this.call = callable;
        }

        @Override // java.util.concurrent.Future
        public R get() throws InterruptedException, ExecutionException {
            try {
                if (isCancelled()) {
                    throw new CancellationException("MapReduceTask already cancelled");
                }
                try {
                    R call = this.call.call();
                    this.done = true;
                    return call;
                } catch (Exception e) {
                    throw new ExecutionException(e);
                }
            } catch (Throwable th) {
                this.done = true;
                throw th;
            }
        }

        @Override // org.infinispan.commons.util.concurrent.AbstractInProcessFuture, java.util.concurrent.Future
        public boolean cancel(boolean z) {
            if (isCancelled()) {
                return false;
            }
            RpcManager rpcManager = MapReduceTask.this.cache.getRpcManager();
            synchronized (MapReduceTask.this.cancellableTasks) {
                for (CancellableTaskPart cancellableTaskPart : MapReduceTask.this.cancellableTasks) {
                    boolean equals = cancellableTaskPart.getExecutionTarget().equals(rpcManager.getTransport().getAddress());
                    CancelCommand buildCancelCommand = MapReduceTask.this.buildCancelCommand(cancellableTaskPart);
                    if (equals) {
                        buildCancelCommand.init(MapReduceTask.this.cancellationService);
                        try {
                            buildCancelCommand.perform(null);
                        } catch (Throwable th) {
                            MapReduceTask.log.couldNotExecuteCancellationLocally(th.getLocalizedMessage());
                        }
                    } else {
                        rpcManager.invokeRemotely(Collections.singletonList(cancellableTaskPart.getExecutionTarget()), buildCancelCommand, MapReduceTask.this.rpcOptionsBuilder.build());
                    }
                    this.cancelled = true;
                    this.done = true;
                }
            }
            return this.cancelled;
        }

        @Override // org.infinispan.commons.util.concurrent.AbstractInProcessFuture, java.util.concurrent.Future
        public boolean isCancelled() {
            return this.cancelled;
        }

        @Override // org.infinispan.commons.util.concurrent.AbstractInProcessFuture, java.util.concurrent.Future
        public boolean isDone() {
            return this.done;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.0.0.Alpha2.jar:org/infinispan/distexec/mapreduce/MapReduceTask$MapTaskPart.class */
    public class MapTaskPart<V> extends MapReduceTask<KIn, VIn, KOut, VOut>.TaskPart<V> {
        private MapCombineCommand<KIn, VIn, KOut, VOut> mcc;
        private boolean distributedReduce;

        public MapTaskPart(Address address, MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, boolean z) {
            super(address);
            this.mcc = mapCombineCommand;
            this.distributedReduce = z;
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceTask.TaskPart
        public void execute() {
            if (locallyExecuted()) {
                FutureTask futureTask = new FutureTask(this.distributedReduce ? new Callable<Map<Address, ? extends Response>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.MapTaskPart.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Map<Address, ? extends Response> call() throws Exception {
                        return Collections.singletonMap(MapTaskPart.this.getAddress(), SuccessfulResponse.create(MapTaskPart.this.invokeMapCombineLocally()));
                    }
                } : new Callable<Map<Address, ? extends Response>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.MapTaskPart.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Map<Address, ? extends Response> call() throws Exception {
                        return Collections.singletonMap(MapTaskPart.this.getAddress(), SuccessfulResponse.create(MapTaskPart.this.invokeMapCombineLocallyForLocalReduction()));
                    }
                });
                setNetworkFuture(futureTask);
                MapReduceTask.this.mapReduceManager.getExecutorService().submit(futureTask);
            } else {
                RpcManager rpcManager = MapReduceTask.this.cache.getRpcManager();
                try {
                    MapReduceTask.log.debugf("Invoking %s on %s", this.mcc, getExecutionTarget());
                    rpcManager.invokeRemotelyInFuture(Collections.singleton(getExecutionTarget()), this.mcc, MapReduceTask.this.rpcOptionsBuilder.build(), this);
                    MapReduceTask.log.debugf("Invoked %s on %s ", this.mcc, getExecutionTarget());
                } catch (Exception e) {
                    throw new CacheException("Could not invoke map phase of MapReduceTask on remote node " + getExecutionTarget(), e);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<KOut, List<VOut>> invokeMapCombineLocallyForLocalReduction() throws InterruptedException {
            MapReduceTask.log.debugf("Invoking %s locally", this.mcc);
            try {
                MapReduceTask.this.cancellationService.register(Thread.currentThread(), this.mcc.getUUID());
                this.mcc.init(MapReduceTask.this.mapReduceManager);
                Map<KOut, List<VOut>> mapAndCombineForLocalReduction = MapReduceTask.this.mapReduceManager.mapAndCombineForLocalReduction(this.mcc);
                MapReduceTask.this.cancellationService.unregister(this.mcc.getUUID());
                MapReduceTask.log.debugf("Invoked %s locally", this.mcc);
                return mapAndCombineForLocalReduction;
            } catch (Throwable th) {
                MapReduceTask.this.cancellationService.unregister(this.mcc.getUUID());
                MapReduceTask.log.debugf("Invoked %s locally", this.mcc);
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Set<KOut> invokeMapCombineLocally() throws InterruptedException {
            MapReduceTask.log.debugf("Invoking %s locally", this.mcc);
            try {
                MapReduceTask.this.cancellationService.register(Thread.currentThread(), this.mcc.getUUID());
                this.mcc.init(MapReduceTask.this.mapReduceManager);
                Set<KOut> mapAndCombineForDistributedReduction = MapReduceTask.this.mapReduceManager.mapAndCombineForDistributedReduction(this.mcc);
                MapReduceTask.this.cancellationService.unregister(this.mcc.getUUID());
                MapReduceTask.log.debugf("Invoked %s locally", this.mcc);
                return mapAndCombineForDistributedReduction;
            } catch (Throwable th) {
                MapReduceTask.this.cancellationService.unregister(this.mcc.getUUID());
                MapReduceTask.log.debugf("Invoked %s locally", this.mcc);
                throw th;
            }
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceTask.CancellableTaskPart
        public UUID getUUID() {
            return this.mcc.getUUID();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.0.0.Alpha2.jar:org/infinispan/distexec/mapreduce/MapReduceTask$ReduceTaskPart.class */
    public class ReduceTaskPart<V> extends MapReduceTask<KIn, VIn, KOut, VOut>.TaskPart<V> {
        private ReduceCommand<KOut, VOut> rc;
        private String cacheName;

        public ReduceTaskPart(Address address, ReduceCommand<KOut, VOut> reduceCommand, String str) {
            super(address);
            this.rc = reduceCommand;
            this.cacheName = str;
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceTask.TaskPart
        public void execute() {
            if (locallyExecuted()) {
                FutureTask futureTask = new FutureTask(new Callable<Map<Address, ? extends Response>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.ReduceTaskPart.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Map<Address, ? extends Response> call() throws Exception {
                        return Collections.singletonMap(ReduceTaskPart.this.getAddress(), SuccessfulResponse.create(ReduceTaskPart.this.invokeReduceLocally(MapReduceTask.this.cache.getCacheManager().getCache(ReduceTaskPart.this.cacheName))));
                    }
                });
                setNetworkFuture(futureTask);
                MapReduceTask.this.mapReduceManager.getExecutorService().submit(futureTask);
            } else {
                RpcManager rpcManager = MapReduceTask.this.cache.getRpcManager();
                try {
                    MapReduceTask.log.debugf("Invoking %s on %s", this.rc, getExecutionTarget());
                    rpcManager.invokeRemotelyInFuture(Collections.singleton(getExecutionTarget()), this.rc, MapReduceTask.this.rpcOptionsBuilder.build(), this);
                    MapReduceTask.log.debugf("Invoked %s on %s ", this.rc, getExecutionTarget());
                } catch (Exception e) {
                    throw new CacheException("Could not invoke map phase of MapReduceTask on remote node " + getExecutionTarget(), e);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<KOut, VOut> invokeReduceLocally(Cache<Object, Object> cache) {
            this.rc.init(MapReduceTask.this.mapReduceManager);
            try {
                MapReduceTask.log.debugf("Invoking %s locally ", this.rc);
                Map<KOut, VOut> reduce = MapReduceTask.this.mapReduceManager.reduce(this.rc);
                MapReduceTask.log.debugf("Invoked %s locally", this.rc);
                return reduce;
            } catch (Throwable th) {
                throw new CacheException("Could not invoke MapReduce task locally ", th);
            }
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceTask.CancellableTaskPart
        public UUID getUUID() {
            return this.rc.getUUID();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.0.0.Alpha2.jar:org/infinispan/distexec/mapreduce/MapReduceTask$TaskPart.class */
    public abstract class TaskPart<V> implements NotifyingNotifiableFuture<V>, CancellableTaskPart {
        private Future<V> f;
        private final Address executionTarget;
        static final /* synthetic */ boolean $assertionsDisabled;

        public TaskPart(Address address) {
            this.executionTarget = address;
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceTask.CancellableTaskPart
        public Address getExecutionTarget() {
            return this.executionTarget;
        }

        @Override // org.infinispan.commons.util.concurrent.NotifyingFuture
        public NotifyingFuture<V> attachListener(FutureListener<V> futureListener) {
            return this;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public V get() throws InterruptedException, ExecutionException {
            return retrieveResult(this.f.get());
        }

        protected Address getAddress() {
            return MapReduceTask.this.clusteringDependentLogic.getAddress();
        }

        protected boolean locallyExecuted() {
            return getAddress().equals(getExecutionTarget());
        }

        public abstract void execute();

        private V retrieveResult(Object obj) throws ExecutionException {
            if (obj == null) {
                throw new ExecutionException("Execution returned null value", new NullPointerException());
            }
            if (obj instanceof Exception) {
                throw new ExecutionException((Exception) obj);
            }
            Map map = (Map) obj;
            if (!$assertionsDisabled && map.size() != 1) {
                throw new AssertionError();
            }
            for (Map.Entry entry : map.entrySet()) {
                if (entry.getValue() instanceof SuccessfulResponse) {
                    return (V) ((SuccessfulResponse) entry.getValue()).getResponseValue();
                }
            }
            throw new ExecutionException(new IllegalStateException("Invalid response " + obj));
        }

        @Override // java.util.concurrent.Future
        public V get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return retrieveResult(this.f.get(j, timeUnit));
        }

        @Override // org.infinispan.commons.util.concurrent.NotifyingNotifiableFuture
        public void notifyDone() {
        }

        @Override // org.infinispan.commons.util.concurrent.NotifyingNotifiableFuture
        public void setNetworkFuture(Future<V> future) {
            this.f = future;
        }

        static {
            $assertionsDisabled = !MapReduceTask.class.desiredAssertionStatus();
        }
    }

    public MapReduceTask(Cache<KIn, VIn> cache) {
        this(cache, false, false);
    }

    public MapReduceTask(Cache<KIn, VIn> cache, boolean z) {
        this(cache, z, true);
    }

    public MapReduceTask(Cache<KIn, VIn> cache, boolean z, boolean z2) {
        if (cache == null) {
            throw new IllegalArgumentException("Can not use null cache for MapReduceTask");
        }
        ensureProperCacheState(cache.getAdvancedCache());
        this.cache = cache.getAdvancedCache();
        this.keys = new LinkedList();
        this.marshaller = (Marshaller) this.cache.getComponentRegistry().getComponent(StreamingMarshaller.class, KnownComponentNames.CACHE_MARSHALLER);
        this.mapReduceManager = (MapReduceManager) this.cache.getComponentRegistry().getComponent(MapReduceManager.class);
        this.cancellationService = (CancellationService) this.cache.getComponentRegistry().getComponent(CancellationService.class);
        this.taskId = UUID.randomUUID();
        this.distributeReducePhase = z;
        this.useIntermediateSharedCache = z2;
        this.cancellableTasks = Collections.synchronizedList(new ArrayList());
        this.clusteringDependentLogic = (ClusteringDependentLogic) this.cache.getComponentRegistry().getComponent(ClusteringDependentLogic.class);
        this.isLocalOnly = this.cache.getRpcManager() == null;
        this.rpcOptionsBuilder = this.isLocalOnly ? null : new RpcOptionsBuilder(this.cache.getRpcManager().getDefaultRpcOptions(true));
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> onKeys(KIn... kinArr) {
        Collections.addAll(this.keys, kinArr);
        return this;
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> mappedWith(Mapper<KIn, VIn, KOut, VOut> mapper) {
        if (mapper == null) {
            throw new IllegalArgumentException("A valid reference of Mapper is needed");
        }
        this.mapper = mapper;
        return this;
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> reducedWith(Reducer<KOut, VOut> reducer) {
        if (reducer == null) {
            throw new IllegalArgumentException("A valid reference of Reducer is needed");
        }
        this.reducer = reducer;
        return this;
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> combinedWith(Reducer<KOut, VOut> reducer) {
        if (reducer == null) {
            throw new IllegalArgumentException("A valid reference of Reducer/Combiner is needed");
        }
        this.combiner = reducer;
        return this;
    }

    public final MapReduceTask<KIn, VIn, KOut, VOut> timeout(long j, TimeUnit timeUnit) {
        this.rpcOptionsBuilder.timeout(j, timeUnit);
        return this;
    }

    public final long timeout(TimeUnit timeUnit) {
        return this.rpcOptionsBuilder.timeout(timeUnit);
    }

    public Map<KOut, VOut> execute() throws CacheException {
        if (this.mapper == null) {
            throw new NullPointerException("A valid reference of Mapper is not set " + this.mapper);
        }
        if (this.reducer == null) {
            throw new NullPointerException("A valid reference of Reducer is not set " + this.reducer);
        }
        if (this.isLocalOnly || !distributeReducePhase()) {
            try {
                return executeMapPhaseWithLocalReduction();
            } catch (Exception e) {
                throw new CacheException(e);
            }
        }
        boolean useIntermediateSharedCache = useIntermediateSharedCache();
        String str = DEFAULT_TMP_CACHE_CONFIGURATION_NAME;
        if (useIntermediatePerTaskCache()) {
            str = this.taskId.toString();
        }
        try {
            try {
                executeTaskInit(str);
                Map<KOut, VOut> executeReducePhase = executeReducePhase(executeMapPhase(useIntermediateSharedCache), useIntermediateSharedCache);
                if (useIntermediatePerTaskCache()) {
                    this.cache.getCacheManager().removeCache(str);
                }
                return executeReducePhase;
            } catch (Exception e2) {
                throw new CacheException(e2);
            }
        } catch (Throwable th) {
            if (useIntermediatePerTaskCache()) {
                this.cache.getCacheManager().removeCache(str);
            }
            throw th;
        }
    }

    protected boolean distributeReducePhase() {
        return this.distributeReducePhase;
    }

    protected boolean useIntermediateSharedCache() {
        return this.useIntermediateSharedCache;
    }

    protected boolean useIntermediatePerTaskCache() {
        return !useIntermediateSharedCache();
    }

    protected void executeTaskInit(String str) {
        RpcManager rpcManager = this.cache.getRpcManager();
        final CreateCacheCommand buildCreateCacheCommand = ((CommandsFactory) this.cache.getComponentRegistry().getComponent(CommandsFactory.class)).buildCreateCacheCommand(str, DEFAULT_TMP_CACHE_CONFIGURATION_NAME, true, rpcManager.getMembers().size());
        log.debugf("Invoking %s across members %s ", buildCreateCacheCommand, this.cache.getRpcManager().getMembers());
        this.mapReduceManager.getExecutorService().submit(new Callable<Object>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                buildCreateCacheCommand.init(MapReduceTask.this.cache.getCacheManager());
                try {
                    return buildCreateCacheCommand.perform(null);
                } catch (Throwable th) {
                    throw new CacheException("Could not initialize temporary caches for MapReduce task on remote nodes ", th);
                }
            }
        });
        rpcManager.invokeRemotely(this.cache.getRpcManager().getMembers(), buildCreateCacheCommand, this.rpcOptionsBuilder.build());
    }

    protected Set<KOut> executeMapPhase(boolean z) throws InterruptedException, ExecutionException {
        RpcManager rpcManager = this.cache.getRpcManager();
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        if (inputTaskKeysEmpty()) {
            for (Address address : rpcManager.getMembers()) {
                TaskPart createTaskMapPart = createTaskMapPart(address.equals(rpcManager.getAddress()) ? buildMapCombineCommand(this.taskId.toString(), clone(this.mapper), clone(this.combiner), null, true, z) : buildMapCombineCommand(this.taskId.toString(), this.mapper, this.combiner, null, true, z), address, true);
                createTaskMapPart.execute();
                arrayList.add(createTaskMapPart);
            }
        } else {
            for (Map.Entry entry : mapKeysToNodes(this.keys).entrySet()) {
                Address address2 = (Address) entry.getKey();
                Collection<KIn> collection = (Collection) entry.getValue();
                TaskPart createTaskMapPart2 = createTaskMapPart(address2.equals(rpcManager.getAddress()) ? buildMapCombineCommand(this.taskId.toString(), clone(this.mapper), clone(this.combiner), collection, true, z) : buildMapCombineCommand(this.taskId.toString(), this.mapper, this.combiner, collection, true, z), address2, true);
                createTaskMapPart2.execute();
                arrayList.add(createTaskMapPart2);
            }
        }
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                hashSet.addAll((Collection) ((MapTaskPart) it.next()).get());
            }
            return hashSet;
        } finally {
            this.cancellableTasks.clear();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected Map<KOut, VOut> executeMapPhaseWithLocalReduction() throws InterruptedException, ExecutionException {
        RpcManager rpcManager = this.cache.getRpcManager();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        Address address = this.clusteringDependentLogic.getAddress();
        if (inputTaskKeysEmpty()) {
            for (Address address2 : this.isLocalOnly ? Collections.singletonList(address) : rpcManager.getMembers()) {
                TaskPart createTaskMapPart = createTaskMapPart(address2.equals(address) ? buildMapCombineCommand(this.taskId.toString(), clone(this.mapper), clone(this.combiner), null, false, false) : buildMapCombineCommand(this.taskId.toString(), this.mapper, this.combiner, null, false, false), address2, false);
                createTaskMapPart.execute();
                arrayList.add(createTaskMapPart);
            }
        } else {
            for (Map.Entry entry : mapKeysToNodes(this.keys).entrySet()) {
                Address address3 = (Address) entry.getKey();
                Collection<KIn> collection = (Collection) entry.getValue();
                TaskPart createTaskMapPart2 = createTaskMapPart(address3.equals(address) ? buildMapCombineCommand(this.taskId.toString(), clone(this.mapper), clone(this.combiner), collection, false, false) : buildMapCombineCommand(this.taskId.toString(), this.mapper, this.combiner, collection, false, false), address3, false);
                createTaskMapPart2.execute();
                arrayList.add(createTaskMapPart2);
            }
        }
        HashMap hashMap2 = new HashMap();
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                mergeResponse(hashMap, (Map) ((MapTaskPart) it.next()).get());
            }
            MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
            log.tracef("For m/r task %s invoking %s locally", this.taskId, this.reducer);
            try {
                mapReduceTaskLifecycleService.onPreExecute(this.reducer, this.cache);
                for (Map.Entry entry2 : hashMap.entrySet()) {
                    hashMap2.put(entry2.getKey(), this.reducer.reduce(entry2.getKey(), ((List) entry2.getValue()).iterator()));
                }
                return hashMap2;
            } finally {
                mapReduceTaskLifecycleService.onPostExecute(this.reducer);
            }
        } finally {
            this.cancellableTasks.clear();
        }
    }

    protected <V> MapReduceTask<KIn, VIn, KOut, VOut>.MapTaskPart<V> createTaskMapPart(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, Address address, boolean z) {
        MapReduceTask<KIn, VIn, KOut, VOut>.MapTaskPart<V> mapTaskPart = new MapTaskPart<>(address, mapCombineCommand, z);
        this.cancellableTasks.add(mapTaskPart);
        return mapTaskPart;
    }

    protected Map<KOut, VOut> executeReducePhase(Set<KOut> set, boolean z) throws InterruptedException, ExecutionException {
        RpcManager rpcManager = this.cache.getRpcManager();
        String uuid = z ? DEFAULT_TMP_CACHE_CONFIGURATION_NAME : this.taskId.toString();
        Map<Address, ? extends Collection<T>> mapKeysToNodes = mapKeysToNodes(this.cache.getCacheManager().getCache(uuid).getAdvancedCache().getDistributionManager(), set, z);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : mapKeysToNodes.entrySet()) {
            Address address = (Address) entry.getKey();
            Collection<KOut> collection = (Collection) entry.getValue();
            TaskPart createReducePart = createReducePart(address.equals(rpcManager.getAddress()) ? buildReduceCommand(this.taskId.toString(), uuid, clone(this.reducer), collection, z) : buildReduceCommand(this.taskId.toString(), uuid, this.reducer, collection, z), address, uuid);
            createReducePart.execute();
            arrayList.add(createReducePart);
        }
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                hashMap.putAll((Map) ((ReduceTaskPart) it.next()).get());
            }
            return hashMap;
        } finally {
            this.cancellableTasks.clear();
        }
    }

    protected <V> MapReduceTask<KIn, VIn, KOut, VOut>.ReduceTaskPart<V> createReducePart(ReduceCommand<KOut, VOut> reduceCommand, Address address, String str) {
        MapReduceTask<KIn, VIn, KOut, VOut>.ReduceTaskPart<V> reduceTaskPart = new ReduceTaskPart<>(address, reduceCommand, str);
        this.cancellableTasks.add(reduceTaskPart);
        return reduceTaskPart;
    }

    private <K, V> void mergeResponse(Map<K, List<V>> map, Map<K, List<V>> map2) {
        for (Map.Entry<K, List<V>> entry : map2.entrySet()) {
            synchronized (map) {
                List<V> list = map.get(entry.getKey());
                if (list != null) {
                    list.addAll(entry.getValue());
                } else {
                    list = new ArrayList();
                    list.addAll(entry.getValue());
                }
                map.put(entry.getKey(), list);
            }
        }
    }

    private MapCombineCommand<KIn, VIn, KOut, VOut> buildMapCombineCommand(String str, Mapper<KIn, VIn, KOut, VOut> mapper, Reducer<KOut, VOut> reducer, Collection<KIn> collection, boolean z, boolean z2) {
        MapCombineCommand<KIn, VIn, KOut, VOut> buildMapCombineCommand = ((CommandsFactory) this.cache.getComponentRegistry().getComponent(CommandsFactory.class)).buildMapCombineCommand(str, mapper, reducer, collection);
        buildMapCombineCommand.setReducePhaseDistributed(z);
        buildMapCombineCommand.setEmitCompositeIntermediateKeys(z2);
        return buildMapCombineCommand;
    }

    private ReduceCommand<KOut, VOut> buildReduceCommand(String str, String str2, Reducer<KOut, VOut> reducer, Collection<KOut> collection, boolean z) {
        ReduceCommand<KOut, VOut> buildReduceCommand = ((CommandsFactory) this.cache.getComponentRegistry().getComponent(CommandsFactory.class)).buildReduceCommand(str, str2, reducer, collection);
        buildReduceCommand.setEmitCompositeIntermediateKeys(z);
        return buildReduceCommand;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CancelCommand buildCancelCommand(CancellableTaskPart cancellableTaskPart) {
        return ((CommandsFactory) this.cache.getComponentRegistry().getComponent(CommandsFactory.class)).buildCancelCommandCommand(cancellableTaskPart.getUUID());
    }

    public Future<Map<KOut, VOut>> executeAsynchronously() {
        return new MapReduceTaskFuture(new Callable<Map<KOut, VOut>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.2
            @Override // java.util.concurrent.Callable
            public Map<KOut, VOut> call() throws Exception {
                return MapReduceTask.this.execute();
            }
        });
    }

    public <R> R execute(Collator<KOut, VOut, R> collator) {
        return collator.collate(execute());
    }

    public <R> Future<R> executeAsynchronously(final Collator<KOut, VOut, R> collator) {
        return new MapReduceTaskFuture(new Callable<R>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.3
            @Override // java.util.concurrent.Callable
            public R call() throws Exception {
                return (R) MapReduceTask.this.execute(collator);
            }
        });
    }

    protected void aggregateReducedResult(Map<KOut, List<VOut>> map, Map<KOut, VOut> map2) {
        List<VOut> list;
        for (Map.Entry<KOut, VOut> entry : map2.entrySet()) {
            if (map.containsKey(entry.getKey())) {
                list = map.get(entry.getKey());
            } else {
                list = new LinkedList();
                map.put(entry.getKey(), list);
            }
            list.add(entry.getValue());
        }
    }

    protected <T> Map<Address, ? extends Collection<T>> mapKeysToNodes(DistributionManager distributionManager, Collection<T> collection, boolean z) {
        return this.isLocalOnly ? Collections.singletonMap(this.clusteringDependentLogic.getAddress(), collection) : this.mapReduceManager.mapKeysToNodes(distributionManager, this.taskId.toString(), collection, z);
    }

    protected <T> Map<Address, ? extends Collection<T>> mapKeysToNodes(Collection<T> collection, boolean z) {
        return mapKeysToNodes(this.cache.getDistributionManager(), collection, z);
    }

    protected <T> Map<Address, ? extends Collection<T>> mapKeysToNodes(Collection<T> collection) {
        return mapKeysToNodes(collection, false);
    }

    protected Mapper<KIn, VIn, KOut, VOut> clone(Mapper<KIn, VIn, KOut, VOut> mapper) {
        return (Mapper) Util.cloneWithMarshaller(this.marshaller, mapper);
    }

    protected Reducer<KOut, VOut> clone(Reducer<KOut, VOut> reducer) {
        return (Reducer) Util.cloneWithMarshaller(this.marshaller, reducer);
    }

    private void ensureProperCacheState(AdvancedCache<KIn, VIn> advancedCache) throws NullPointerException, IllegalStateException {
        if (advancedCache.getStatus() != ComponentStatus.RUNNING) {
            throw log.invalidCacheState(advancedCache.getStatus().toString());
        }
        if (advancedCache.getRpcManager() != null && advancedCache.getDistributionManager() == null) {
            throw log.requireDistOrReplCache(advancedCache.getCacheConfiguration().clustering().cacheModeString());
        }
    }

    protected boolean inputTaskKeysEmpty() {
        return this.keys == null || this.keys.isEmpty();
    }

    public int hashCode() {
        return (31 * 1) + (this.taskId == null ? 0 : this.taskId.hashCode());
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || !(obj instanceof MapReduceTask)) {
            return false;
        }
        MapReduceTask mapReduceTask = (MapReduceTask) obj;
        return this.taskId == null ? mapReduceTask.taskId == null : this.taskId.equals(mapReduceTask.taskId);
    }

    public String toString() {
        return "MapReduceTask [mapper=" + this.mapper + ", reducer=" + this.reducer + ", combiner=" + this.combiner + ", keys=" + this.keys + ", taskId=" + this.taskId + "]";
    }
}
