package org.infinispan.distexec.mapreduce;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.atomic.Delta;
import org.infinispan.atomic.DeltaAware;
import org.infinispan.commands.read.MapCombineCommand;
import org.infinispan.commands.read.ReduceCommand;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distexec.mapreduce.spi.MapReduceTaskLifecycleService;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.loaders.CacheLoader;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.MarshalledValue;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.9.Final.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl.class */
public class MapReduceManagerImpl implements MapReduceManager {
    private static final Log log = LogFactory.getLog(MapReduceManagerImpl.class);
    private static final int CANCELLATION_CHECK_FREQUENCY = 20;
    private Address localAddress;
    private EmbeddedCacheManager cacheManager;
    private CacheLoaderManager cacheLoaderManager;
    private ExecutorService executorService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.9.Final.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$CollectableCollector.class */
    public interface CollectableCollector<K, V> extends Collector<K, V> {
        Map<K, List<V>> collectedValues();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.9.Final.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DefaultCollector.class */
    public static class DefaultCollector<KOut, VOut> implements CollectableCollector<KOut, VOut> {
        private final Map<KOut, List<VOut>> store;

        private DefaultCollector() {
            this.store = ConcurrentMapFactory.makeConcurrentMap();
        }

        @Override // org.infinispan.distexec.mapreduce.Collector
        public void emit(KOut kout, VOut vout) {
            List<VOut> list = this.store.get(kout);
            if (list == null) {
                list = new LinkedList();
                this.store.put(kout, list);
            }
            list.add(vout);
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public Map<KOut, List<VOut>> collectedValues() {
            return this.store;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.9.Final.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DeltaAwareList.class */
    public static class DeltaAwareList<E> extends LinkedList<E> implements DeltaAware, Delta {
        private static final long serialVersionUID = 2176345973026460708L;

        public DeltaAwareList(Collection<? extends E> collection) {
            super(collection);
        }

        public DeltaAwareList(E e) {
            add(e);
        }

        @Override // org.infinispan.atomic.DeltaAware
        public Delta delta() {
            return new DeltaAwareList((Collection) this);
        }

        @Override // org.infinispan.atomic.DeltaAware
        public void commit() {
            clear();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.infinispan.atomic.Delta
        public DeltaAware merge(DeltaAware deltaAware) {
            if (deltaAware == null || !(deltaAware instanceof DeltaAwareList)) {
                return this;
            }
            List list = (List) deltaAware;
            Iterator it = iterator();
            while (it.hasNext()) {
                list.add(it.next());
            }
            return (DeltaAware) list;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.9.Final.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$IntermediateCompositeKey.class */
    public static final class IntermediateCompositeKey<V> implements Serializable {
        private static final long serialVersionUID = 4434717760740027918L;
        private final String taskId;
        private final V key;

        public IntermediateCompositeKey(String str, V v) {
            this.taskId = str;
            this.key = v;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + (this.key == null ? 0 : this.key.hashCode()))) + (this.taskId == null ? 0 : this.taskId.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof IntermediateCompositeKey)) {
                return false;
            }
            IntermediateCompositeKey intermediateCompositeKey = (IntermediateCompositeKey) obj;
            if (this.key == null) {
                if (intermediateCompositeKey.key != null) {
                    return false;
                }
            } else if (!this.key.equals(intermediateCompositeKey.key)) {
                return false;
            }
            return this.taskId == null ? intermediateCompositeKey.taskId == null : this.taskId.equals(intermediateCompositeKey.taskId);
        }

        public String toString() {
            return "IntermediateCompositeKey [taskId=" + this.taskId + ", key=" + this.key + "]";
        }
    }

    @Inject
    public void init(EmbeddedCacheManager embeddedCacheManager, CacheLoaderManager cacheLoaderManager, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService) {
        this.cacheManager = embeddedCacheManager;
        this.cacheLoaderManager = cacheLoaderManager;
        this.localAddress = embeddedCacheManager.getAddress();
        this.executorService = executorService;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KIn, VIn, KOut, VOut> Map<KOut, List<VOut>> mapAndCombineForLocalReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        return combineForLocalReduction(mapCombineCommand, map(mapCombineCommand));
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KIn, VIn, KOut, VOut> Set<KOut> mapAndCombineForDistributedReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        try {
            return combine(mapCombineCommand, map(mapCombineCommand));
        } catch (Exception e) {
            throw new CacheException(e);
        }
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KOut, VOut> Map<KOut, VOut> reduce(ReduceCommand<KOut, VOut> reduceCommand) throws InterruptedException {
        Cache<?, ?> cache = this.cacheManager.getCache(reduceCommand.getCacheName());
        Set<KOut> keys = reduceCommand.getKeys();
        String taskId = reduceCommand.getTaskId();
        Reducer<KOut, VOut> reducer = reduceCommand.getReducer();
        boolean isEmitCompositeIntermediateKeys = reduceCommand.isEmitCompositeIntermediateKeys();
        boolean z = keys == null || keys.isEmpty();
        Cache cache2 = this.cacheManager.getCache(reduceCommand.getCacheName());
        HashMap hashMap = new HashMap();
        if (z) {
            throw new IllegalStateException("Reduce phase of MapReduceTask " + taskId + " on node " + this.localAddress + " executed with empty input keys");
        }
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        log.tracef("For m/r task %s invoking %s at %s", taskId, reduceCommand, this.localAddress);
        int i = 0;
        try {
            mapReduceTaskLifecycleService.onPreExecute(reducer, cache);
            for (KOut kout : keys) {
                int i2 = i + 1;
                i = i2 + 1;
                if (checkInterrupt(i2) && Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                VOut reduce = reducer.reduce(kout, (isEmitCompositeIntermediateKeys ? (List) cache2.get(new IntermediateCompositeKey(taskId, kout)) : (List) cache2.get(kout)).iterator());
                hashMap.put(kout, reduce);
                log.tracef("For m/r task %s reduced %s to %s at %s ", taskId, kout, reduce, this.localAddress);
            }
            return hashMap;
        } finally {
            mapReduceTaskLifecycleService.onPostExecute(reducer);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <KIn, VIn, KOut, VOut> CollectableCollector<KOut, VOut> map(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        Cache cache = this.cacheManager.getCache(mapCombineCommand.getCacheName());
        Set<KIn> keys = mapCombineCommand.getKeys();
        HashSet hashSet = null;
        Mapper<KIn, VIn, KOut, VOut> mapper = mapCombineCommand.getMapper();
        DistributionManager distributionManager = cache.getAdvancedCache().getDistributionManager();
        boolean z = (keys == null || keys.isEmpty()) ? false : true;
        Set<KIn> set = keys;
        if (z) {
            hashSet = new HashSet(keys);
        } else {
            set = filterLocalPrimaryOwner(cache.keySet(), distributionManager);
        }
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        DefaultCollector defaultCollector = new DefaultCollector();
        log.tracef("For m/r task %s invoking %s with input keys %s", mapCombineCommand.getTaskId(), mapCombineCommand, set);
        int i = 0;
        try {
            mapReduceTaskLifecycleService.onPreExecute(mapper, cache);
            for (KIn kin : set) {
                int i2 = i;
                i++;
                if (checkInterrupt(i2) && Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                mapper.map(kin, cache.get(kin), defaultCollector);
                if (z) {
                    hashSet.remove(kin);
                }
            }
            Set<KIn> filterLocalPrimaryOwner = z ? filterLocalPrimaryOwner(hashSet, distributionManager) : filterLocalPrimaryOwner(loadAllKeysFromCacheLoaderUsingFilter(set), distributionManager);
            log.tracef("For m/r task %s cache loader input keys %s", mapCombineCommand.getTaskId(), filterLocalPrimaryOwner);
            int i3 = 0;
            for (KIn kin2 : filterLocalPrimaryOwner) {
                int i4 = i3;
                i3++;
                if (checkInterrupt(i4) && Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                Object loadValueFromCacheLoader = loadValueFromCacheLoader(kin2);
                if (loadValueFromCacheLoader != null) {
                    mapper.map(kin2, loadValueFromCacheLoader, defaultCollector);
                }
            }
            return defaultCollector;
        } finally {
            mapReduceTaskLifecycleService.onPostExecute(mapper);
        }
    }

    protected <KIn, VIn, KOut, VOut> Set<KOut> combine(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, CollectableCollector<KOut, VOut> collectableCollector) throws Exception {
        VOut vout;
        String taskId = mapCombineCommand.getTaskId();
        boolean isEmitCompositeIntermediateKeys = mapCombineCommand.isEmitCompositeIntermediateKeys();
        Reducer<KOut, VOut> combiner = mapCombineCommand.getCombiner();
        HashSet hashSet = new HashSet();
        Cache cache = isEmitCompositeIntermediateKeys ? this.cacheManager.getCache(MapReduceTask.DEFAULT_TMP_CACHE_CONFIGURATION_NAME) : this.cacheManager.getCache(taskId);
        if (cache == null) {
            throw new IllegalStateException("Temporary cache for MapReduceTask " + taskId + " not found on " + this.localAddress);
        }
        DistributionManager distributionManager = cache.getAdvancedCache().getDistributionManager();
        if (combiner != null) {
            Cache<?, ?> cache2 = this.cacheManager.getCache(mapCombineCommand.getCacheName());
            log.tracef("For m/r task %s invoking combiner %s at %s", taskId, mapCombineCommand, this.localAddress);
            MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            try {
                mapReduceTaskLifecycleService.onPreExecute(combiner, cache2);
                for (Map.Entry<KOut, List<VOut>> entry : collectableCollector.collectedValues().entrySet()) {
                    List<VOut> value = entry.getValue();
                    if (value.size() > 1) {
                        vout = combiner.reduce(entry.getKey(), value.iterator());
                        concurrentHashMap.put(entry.getKey(), vout);
                    } else {
                        vout = value.get(0);
                        concurrentHashMap.put(entry.getKey(), vout);
                    }
                    log.tracef("For m/r task %s combined %s to %s at %s", taskId, entry.getKey(), vout, this.localAddress);
                }
                for (Map.Entry entry2 : mapKeysToNodes(distributionManager, taskId, concurrentHashMap.keySet(), isEmitCompositeIntermediateKeys).entrySet()) {
                    List list = (List) entry2.getValue();
                    try {
                        log.tracef("For m/r task %s migrating intermediate keys %s to %s", taskId, list, entry2.getKey());
                        for (Object obj : list) {
                            DeltaAwareList deltaAwareList = new DeltaAwareList(concurrentHashMap.get(obj));
                            if (isEmitCompositeIntermediateKeys) {
                                cache.put(new IntermediateCompositeKey(taskId, obj), deltaAwareList);
                            } else {
                                cache.put(obj, deltaAwareList);
                            }
                            hashSet.add(obj);
                        }
                    } catch (Exception e) {
                        throw new CacheException("Could not move intermediate keys/values for M/R task " + taskId, e);
                    }
                }
            } finally {
                mapReduceTaskLifecycleService.onPostExecute(combiner);
            }
        } else {
            Map<KOut, List<VOut>> collectedValues = collectableCollector.collectedValues();
            for (Map.Entry entry3 : mapKeysToNodes(distributionManager, taskId, collectedValues.keySet(), isEmitCompositeIntermediateKeys).entrySet()) {
                List list2 = (List) entry3.getValue();
                try {
                    log.tracef("For m/r task %s migrating intermediate keys %s to %s", taskId, list2, entry3.getKey());
                    for (Object obj2 : list2) {
                        DeltaAwareList deltaAwareList2 = new DeltaAwareList((Collection) collectedValues.get(obj2));
                        if (isEmitCompositeIntermediateKeys) {
                            cache.put(new IntermediateCompositeKey(taskId, obj2), deltaAwareList2);
                        } else {
                            cache.put(obj2, deltaAwareList2);
                        }
                        hashSet.add(obj2);
                    }
                } catch (Exception e2) {
                    throw new CacheException("Could not move intermediate keys/values for M/R task " + taskId, e2);
                }
            }
        }
        return hashSet;
    }

    private <KIn, VIn, KOut, VOut> Map<KOut, List<VOut>> combineForLocalReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, CollectableCollector<KOut, VOut> collectableCollector) {
        Map<KOut, List<VOut>> collectedValues;
        String taskId = mapCombineCommand.getTaskId();
        Reducer<KOut, VOut> combiner = mapCombineCommand.getCombiner();
        if (combiner != null) {
            collectedValues = new HashMap();
            log.tracef("For m/r task %s invoking combiner %s at %s", taskId, mapCombineCommand, this.localAddress);
            MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
            try {
                mapReduceTaskLifecycleService.onPreExecute(combiner, this.cacheManager.getCache(mapCombineCommand.getCacheName()));
                for (Map.Entry<KOut, List<VOut>> entry : collectableCollector.collectedValues().entrySet()) {
                    List<VOut> value = entry.getValue();
                    LinkedList linkedList = new LinkedList();
                    VOut reduce = value.size() > 1 ? combiner.reduce(entry.getKey(), value.iterator()) : value.get(0);
                    linkedList.add(reduce);
                    collectedValues.put(entry.getKey(), linkedList);
                    log.tracef("For m/r task %s combined %s to %s at %s", taskId, entry.getKey(), reduce, this.localAddress);
                }
            } finally {
                mapReduceTaskLifecycleService.onPostExecute(combiner);
            }
        } else {
            collectedValues = collectableCollector.collectedValues();
        }
        return collectedValues;
    }

    private boolean checkInterrupt(int i) {
        return i % 20 == 0;
    }

    protected <KIn> Set<KIn> loadAllKeysFromCacheLoaderUsingFilter(Set<KIn> set) {
        Set<Object> emptySet = InfinispanCollections.emptySet();
        CacheLoader resolveCacheLoader = resolveCacheLoader();
        if (resolveCacheLoader != null) {
            try {
                emptySet = resolveCacheLoader.loadAllKeys(set);
            } catch (CacheLoaderException e) {
                throw new CacheException("Could not load key/value entries from cacheloader", e);
            }
        }
        return (Set<KIn>) emptySet;
    }

    protected <KIn, KOut> KOut loadValueFromCacheLoader(KIn kin) {
        Object obj = null;
        CacheLoader resolveCacheLoader = resolveCacheLoader();
        if (resolveCacheLoader != null) {
            try {
                InternalCacheEntry load = resolveCacheLoader.load(kin);
                if (load != null) {
                    Object value = load.getValue();
                    obj = value instanceof MarshalledValue ? ((MarshalledValue) value).get() : value;
                }
            } catch (CacheLoaderException e) {
                throw new CacheException("Could not load key/value entries from cacheloader", e);
            }
        }
        return (KOut) obj;
    }

    protected CacheLoader resolveCacheLoader() {
        CacheLoader cacheLoader = null;
        if (this.cacheLoaderManager != null && this.cacheLoaderManager.isEnabled()) {
            cacheLoader = this.cacheLoaderManager.getCacheLoader();
        }
        return cacheLoader;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <T> Map<Address, List<T>> mapKeysToNodes(DistributionManager distributionManager, String str, Collection<T> collection, boolean z) {
        HashMap hashMap = new HashMap();
        for (T t : collection) {
            Address primaryLocation = z ? distributionManager.getPrimaryLocation(new IntermediateCompositeKey(str, t)) : distributionManager.getPrimaryLocation(t);
            List list = (List) hashMap.get(primaryLocation);
            if (list == null) {
                list = new ArrayList();
                hashMap.put(primaryLocation, list);
            }
            list.add(t);
        }
        return hashMap;
    }

    protected <KIn> Set<KIn> filterLocalPrimaryOwner(Set<KIn> set, DistributionManager distributionManager) {
        HashSet hashSet = new HashSet();
        for (KIn kin : set) {
            Address primaryLocation = distributionManager.getPrimaryLocation(kin);
            if (primaryLocation != null && primaryLocation.equals(this.localAddress)) {
                hashSet.add(kin);
            }
        }
        return hashSet;
    }
}
