package org.infinispan.distexec.mapreduce;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.read.MapReduceCommand;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.distexec.mapreduce.spi.MapReduceTaskLifecycleService;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.marshall.Marshaller;
import org.infinispan.marshall.StreamingMarshaller;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.AbstractInProcessFuture;
import org.infinispan.util.concurrent.FutureListener;
import org.infinispan.util.concurrent.NotifyingFuture;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceTask.class */
public class MapReduceTask<KIn, VIn, KOut, VOut> {
    private static final Log log = LogFactory.getLog(MapReduceTask.class);
    private Mapper<KIn, VIn, KOut, VOut> mapper;
    private Reducer<KOut, VOut> reducer;
    private final Collection<KIn> keys;
    private final AdvancedCache<KIn, VIn> cache;
    protected final Marshaller marshaller;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceTask$MapReduceFuture.class */
    public static class MapReduceFuture implements NotifyingNotifiableFuture<Object> {
        private Future<Object> futureResult;

        private MapReduceFuture() {
        }

        @Override // org.infinispan.util.concurrent.NotifyingFuture
        public NotifyingFuture<Object> attachListener(FutureListener<Object> futureListener) {
            return this;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public Object get() throws InterruptedException, ExecutionException {
            return this.futureResult.get();
        }

        @Override // java.util.concurrent.Future
        public Object get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.futureResult.get(j, timeUnit);
        }

        @Override // org.infinispan.util.concurrent.NotifyingNotifiableFuture
        public void notifyDone() {
        }

        @Override // org.infinispan.util.concurrent.NotifyingNotifiableFuture
        public void setNetworkFuture(Future<Object> future) {
            this.futureResult = future;
        }
    }

    public MapReduceTask(Cache<KIn, VIn> cache) {
        if (cache == null) {
            throw new IllegalArgumentException("Can not use null cache for MapReduceTask");
        }
        ensureProperCacheState(cache.getAdvancedCache());
        this.cache = cache.getAdvancedCache();
        this.keys = new LinkedList();
        this.marshaller = (Marshaller) this.cache.getComponentRegistry().getComponent(StreamingMarshaller.class, KnownComponentNames.CACHE_MARSHALLER);
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> onKeys(KIn... kinArr) {
        Collections.addAll(this.keys, kinArr);
        return this;
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> mappedWith(Mapper<KIn, VIn, KOut, VOut> mapper) {
        if (mapper == null) {
            throw new IllegalArgumentException("A valid reference of Mapper is needed");
        }
        this.mapper = mapper;
        return this;
    }

    public MapReduceTask<KIn, VIn, KOut, VOut> reducedWith(Reducer<KOut, VOut> reducer) {
        if (reducer == null) {
            throw new IllegalArgumentException("A valid reference of Mapper is needed");
        }
        this.reducer = reducer;
        return this;
    }

    public Map<KOut, VOut> execute() throws CacheException {
        if (this.mapper == null) {
            throw new NullPointerException("A valid reference of Mapper is not set " + this.mapper);
        }
        if (this.reducer == null) {
            throw new NullPointerException("A valid reference of Reducer is not set " + this.reducer);
        }
        ComponentRegistry componentRegistry = this.cache.getComponentRegistry();
        RpcManager rpcManager = this.cache.getRpcManager();
        InvocationContextContainer invocationContextContainer = this.cache.getInvocationContextContainer();
        DistributionManager distributionManager = this.cache.getDistributionManager();
        InterceptorChain interceptorChain = (InterceptorChain) componentRegistry.getComponent(InterceptorChain.class);
        CommandsFactory commandsFactory = (CommandsFactory) componentRegistry.getComponent(CommandsFactory.class);
        MapReduceCommand mapReduceCommand = null;
        MapReduceCommand mapReduceCommand2 = null;
        HashMap hashMap = new HashMap();
        if (inputTaskKeysEmpty()) {
            mapReduceCommand = commandsFactory.buildMapReduceCommand(this.mapper, this.reducer, rpcManager.getAddress(), this.keys);
            mapReduceCommand2 = mapReduceCommand;
            try {
                log.debugf("Invoking %s across entire cluster ", mapReduceCommand);
                Map<Address, Response> invokeRemotely = rpcManager.invokeRemotely((Collection<Address>) null, (ReplicableCommand) mapReduceCommand, true, false);
                log.debugf("Invoked %s across entire cluster, results are %s", mapReduceCommand, invokeRemotely);
                hashMap.putAll(invokeRemotely);
            } catch (Throwable th) {
                throw new CacheException("Could not invoke MapReduce task on remote nodes ", th);
            }
        } else {
            Map<Address, List<KIn>> mapKeysToNodes = mapKeysToNodes();
            log.debugf("Keys to nodes mapping is " + mapKeysToNodes, new Object[0]);
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<Address, List<KIn>> entry : mapKeysToNodes.entrySet()) {
                Address key = entry.getKey();
                List<KIn> value = entry.getValue();
                if (key.equals(rpcManager.getAddress())) {
                    mapReduceCommand2 = commandsFactory.buildMapReduceCommand(clone(this.mapper), clone(this.reducer), rpcManager.getAddress(), value);
                } else {
                    mapReduceCommand = commandsFactory.buildMapReduceCommand(this.mapper, this.reducer, rpcManager.getAddress(), value);
                    try {
                        log.debugf("Invoking %s on %s", mapReduceCommand, key);
                        MapReduceFuture mapReduceFuture = new MapReduceFuture();
                        arrayList.add(mapReduceFuture);
                        rpcManager.invokeRemotelyInFuture(Collections.singleton(key), mapReduceCommand, mapReduceFuture);
                        log.debugf("Invoked %s on %s ", mapReduceCommand, key);
                    } catch (Exception e) {
                        throw new CacheException("Could not invoke MapReduceTask on remote node " + key, e);
                    }
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    Map map = (Map) ((MapReduceFuture) it.next()).get();
                    hashMap.putAll(map);
                    log.debugf("Received result from future %s", map);
                } catch (Exception e2) {
                    throw new CacheException("Could not retrieve MapReduceTask result from remote node", e2);
                }
            }
        }
        boolean z = mapReduceCommand2 != null;
        Object obj = null;
        if (z) {
            log.debugf("Invoking %s locally", mapReduceCommand);
            mapReduceCommand2.init(commandsFactory, interceptorChain, invocationContextContainer, distributionManager, rpcManager.getAddress());
            try {
                obj = mapReduceCommand2.perform(null);
                log.debugf("Invoked %s locally", mapReduceCommand);
            } catch (Throwable th2) {
                throw new CacheException("Could not invoke MapReduce task locally ", th2);
            }
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry2 : hashMap.entrySet()) {
            Response response = (Response) entry2.getValue();
            if (!response.isSuccessful() || !response.isValid()) {
                if (response instanceof ExceptionResponse) {
                    throw new CacheException("MapReduce task on remote node " + entry2.getKey() + " threw Exception", ((ExceptionResponse) response).getException());
                }
                throw new CacheException("MapReduce task on remote node " + entry2.getKey() + " failed ");
            }
            groupKeys(hashMap2, (Map) ((SuccessfulResponse) response).getResponseValue());
        }
        if (z) {
            groupKeys(hashMap2, (Map) obj);
        }
        HashMap hashMap3 = new HashMap();
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        try {
            mapReduceTaskLifecycleService.onPreExecute(this.reducer);
            for (Map.Entry<KOut, List<VOut>> entry3 : hashMap2.entrySet()) {
                hashMap3.put(entry3.getKey(), this.reducer.reduce(entry3.getKey(), entry3.getValue().iterator()));
            }
            return hashMap3;
        } finally {
            mapReduceTaskLifecycleService.onPostExecute(this.reducer);
        }
    }

    public Future<Map<KOut, VOut>> executeAsynchronously() {
        final Callable<Map<KOut, VOut>> callable = new Callable<Map<KOut, VOut>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.1
            @Override // java.util.concurrent.Callable
            public Map<KOut, VOut> call() throws Exception {
                return MapReduceTask.this.execute();
            }
        };
        return new AbstractInProcessFuture<Map<KOut, VOut>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.2
            @Override // java.util.concurrent.Future
            public Map<KOut, VOut> get() throws InterruptedException, ExecutionException {
                try {
                    return (Map) callable.call();
                } catch (Exception e) {
                    throw new ExecutionException(e);
                }
            }
        };
    }

    public <R> R execute(Collator<KOut, VOut, R> collator) {
        return collator.collate(execute());
    }

    public <R> Future<R> executeAsynchronously(final Collator<KOut, VOut, R> collator) {
        final Callable<R> callable = new Callable<R>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.3
            @Override // java.util.concurrent.Callable
            public R call() throws Exception {
                return (R) MapReduceTask.this.execute(collator);
            }
        };
        return new AbstractInProcessFuture<R>() { // from class: org.infinispan.distexec.mapreduce.MapReduceTask.4
            @Override // java.util.concurrent.Future
            public R get() throws InterruptedException, ExecutionException {
                try {
                    return (R) callable.call();
                } catch (Exception e) {
                    throw new ExecutionException(e);
                }
            }
        };
    }

    protected void groupKeys(Map<KOut, List<VOut>> map, Map<KOut, VOut> map2) {
        List<VOut> list;
        for (Map.Entry<KOut, VOut> entry : map2.entrySet()) {
            if (map.containsKey(entry.getKey())) {
                list = map.get(entry.getKey());
            } else {
                list = new LinkedList();
                map.put(entry.getKey(), list);
            }
            list.add(entry.getValue());
        }
    }

    protected Map<Address, List<KIn>> mapKeysToNodes() {
        DistributionManager distributionManager = this.cache.getDistributionManager();
        HashMap hashMap = new HashMap();
        for (KIn kin : this.keys) {
            Address primaryLocation = distributionManager.getPrimaryLocation(kin);
            List list = (List) hashMap.get(primaryLocation);
            if (list == null) {
                list = new ArrayList();
                hashMap.put(primaryLocation, list);
            }
            list.add(kin);
        }
        return hashMap;
    }

    protected Mapper<KIn, VIn, KOut, VOut> clone(Mapper<KIn, VIn, KOut, VOut> mapper) {
        return (Mapper) Util.cloneWithMarshaller(this.marshaller, mapper);
    }

    protected Reducer<KOut, VOut> clone(Reducer<KOut, VOut> reducer) {
        return (Reducer) Util.cloneWithMarshaller(this.marshaller, reducer);
    }

    private void ensureProperCacheState(AdvancedCache<KIn, VIn> advancedCache) throws NullPointerException, IllegalStateException {
        if (advancedCache.getRpcManager() == null) {
            throw new IllegalStateException("Can not use non-clustered cache for MapReduceTask");
        }
        if (advancedCache.getStatus() != ComponentStatus.RUNNING) {
            throw new IllegalStateException("Invalid cache state " + advancedCache.getStatus());
        }
        if (advancedCache.getDistributionManager() == null) {
            throw new IllegalStateException("Cache mode should be DIST, rather than " + advancedCache.getConfiguration().getCacheModeString());
        }
    }

    private boolean inputTaskKeysEmpty() {
        return this.keys == null || this.keys.isEmpty();
    }
}
