package org.infinispan.distexec.mapreduce;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.atomic.Delta;
import org.infinispan.atomic.DeltaAware;
import org.infinispan.commands.read.MapCombineCommand;
import org.infinispan.commands.read.ReduceCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.concurrent.ParallelIterableMap;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distexec.mapreduce.spi.MapReduceTaskLifecycleService;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.marshall.core.MarshalledValue;
import org.infinispan.persistence.CollectionKeyFilter;
import org.infinispan.persistence.CompositeFilter;
import org.infinispan.persistence.PrimaryOwnerFilter;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl.class */
public class MapReduceManagerImpl implements MapReduceManager {
    private static final Log log = LogFactory.getLog(MapReduceManagerImpl.class);
    private ClusteringDependentLogic cdl;
    private EmbeddedCacheManager cacheManager;
    private PersistenceManager persistenceManager;
    private ExecutorService executorService;
    private TimeService timeService;
    private int chunkSize;

    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$CollectableCollector.class */
    private interface CollectableCollector<K, V> extends Collector<K, V> {
        Map<K, List<V>> removeCollectedValues();

        Map<K, List<V>> removeLRUEntry();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DataContainerTask.class */
    public abstract class DataContainerTask<K, V> implements ParallelIterableMap.KeyValueAction<Object, InternalCacheEntry> {
        private DataContainerTask() {
        }

        V getValue(InternalCacheEntry internalCacheEntry) {
            if (internalCacheEntry == null) {
                return null;
            }
            Object value = internalCacheEntry.getValue();
            if (value instanceof MarshalledValue) {
                value = ((MarshalledValue) value).get();
            }
            return (V) value;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DefaultCollector.class */
    public static final class DefaultCollector<KOut, VOut> implements CollectableCollector<KOut, VOut> {
        private final boolean atomicEmit;
        private Map<KOut, List<VOut>> store;
        private final AtomicInteger emitCount;

        public DefaultCollector(int i, boolean z) {
            this.atomicEmit = z;
            this.store = new LinkedHashMap(i, 0.75f, true);
            this.emitCount = new AtomicInteger();
        }

        public DefaultCollector(boolean z) {
            this(128, z);
        }

        @Override // org.infinispan.distexec.mapreduce.Collector
        public void emit(KOut kout, VOut vout) {
            if (!this.atomicEmit) {
                emitHelper(kout, vout);
            } else {
                synchronized (this) {
                    emitHelper(kout, vout);
                }
            }
        }

        protected void emitHelper(KOut kout, VOut vout) {
            List<VOut> list = this.store.get(kout);
            if (list == null) {
                list = new LinkedList();
                this.store.put(kout, list);
            }
            list.add(vout);
            this.emitCount.incrementAndGet();
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public Map<KOut, List<VOut>> removeCollectedValues() {
            HashMap hashMap;
            synchronized (this) {
                hashMap = new HashMap(this.store);
                this.store.clear();
                this.emitCount.set(0);
            }
            return hashMap;
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public Map<KOut, List<VOut>> removeLRUEntry() {
            Map<KOut, List<VOut>> emptyMap = Collections.emptyMap();
            synchronized (this) {
                Iterator<KOut> it = this.store.keySet().iterator();
                if (it.hasNext()) {
                    KOut next = it.next();
                    List<VOut> remove = this.store.remove(next);
                    emptyMap = Collections.singletonMap(next, remove);
                    this.emitCount.addAndGet(-remove.size());
                }
            }
            return emptyMap;
        }

        public int size() {
            return this.emitCount.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DeltaAwareList.class */
    public static final class DeltaAwareList<E> extends LinkedList<E> implements DeltaAware, Delta {
        private static final long serialVersionUID = 2176345973026460708L;

        public DeltaAwareList(Collection<? extends E> collection) {
            super(collection);
        }

        @Override // org.infinispan.atomic.DeltaAware
        public Delta delta() {
            return new DeltaAwareList(this);
        }

        @Override // org.infinispan.atomic.DeltaAware
        public void commit() {
            clear();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.infinispan.atomic.Delta
        public DeltaAware merge(DeltaAware deltaAware) {
            if (deltaAware == null || !(deltaAware instanceof DeltaAwareList)) {
                return this;
            }
            List list = (List) deltaAware;
            Iterator it = iterator();
            while (it.hasNext()) {
                list.add(it.next());
            }
            return (DeltaAware) list;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$IntermediateCompositeKey.class */
    public static final class IntermediateCompositeKey<V> implements Serializable {
        private static final long serialVersionUID = 4434717760740027918L;
        private final String taskId;
        private final V key;

        public IntermediateCompositeKey(String str, V v) {
            this.taskId = str;
            this.key = v;
        }

        public String getTaskId() {
            return this.taskId;
        }

        public V getKey() {
            return this.key;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + (this.key == null ? 0 : this.key.hashCode()))) + (this.taskId == null ? 0 : this.taskId.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof IntermediateCompositeKey)) {
                return false;
            }
            IntermediateCompositeKey intermediateCompositeKey = (IntermediateCompositeKey) obj;
            if (this.key == null) {
                if (intermediateCompositeKey.key != null) {
                    return false;
                }
            } else if (!this.key.equals(intermediateCompositeKey.key)) {
                return false;
            }
            return this.taskId == null ? intermediateCompositeKey.taskId == null : this.taskId.equals(intermediateCompositeKey.taskId);
        }

        public String toString() {
            return "IntermediateCompositeKey [taskId=" + this.taskId + ", key=" + this.key + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$IntermediateKeyFilter.class */
    public static final class IntermediateKeyFilter<T> implements AdvancedCacheLoader.KeyFilter<IntermediateCompositeKey<T>> {
        private final String taskId;

        public IntermediateKeyFilter(String str) {
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("Invalid task Id " + str);
            }
            this.taskId = str;
        }

        @Override // org.infinispan.persistence.spi.AdvancedCacheLoader.KeyFilter
        public boolean shouldLoadKey(IntermediateCompositeKey<T> intermediateCompositeKey) {
            if (intermediateCompositeKey != null) {
                return this.taskId.equals(intermediateCompositeKey.getTaskId());
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-6.1.0.ER5-redhat-1.jar:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$MapCombineTask.class */
    public final class MapCombineTask<K, V, KOut, VOut> extends DataContainerTask<K, V> implements AdvancedCacheLoader.CacheLoaderTask {
        DefaultCollector<KOut, VOut> collector;
        MapCombineCommand<K, V, KOut, VOut> mcc;
        Set<KOut> intermediateKeys;
        int maxCollectorSize;

        public MapCombineTask(DefaultCollector<KOut, VOut> defaultCollector, MapCombineCommand<K, V, KOut, VOut> mapCombineCommand, int i) {
            super();
            this.collector = defaultCollector;
            this.mcc = mapCombineCommand;
            this.intermediateKeys = Collections.synchronizedSet(new HashSet());
            this.maxCollectorSize = i;
        }

        public Set<KOut> getIntermediateKeys() {
            return this.intermediateKeys;
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.DataContainerTask
        V getValue(InternalCacheEntry internalCacheEntry) {
            if (internalCacheEntry == null) {
                return null;
            }
            Object value = internalCacheEntry.getValue();
            if (value instanceof MarshalledValue) {
                value = ((MarshalledValue) value).get();
            }
            return (V) value;
        }

        @Override // org.infinispan.commons.util.concurrent.ParallelIterableMap.KeyValueAction
        public void apply(Object obj, InternalCacheEntry internalCacheEntry) {
            V value = getValue(internalCacheEntry);
            if (value != null) {
                this.mcc.getMapper().map(obj, value, this.collector);
            }
            combineAndMigrate();
        }

        protected void combineAndMigrate() throws CacheException {
            if (this.collector.size() > this.maxCollectorSize) {
                Map<KOut, List<VOut>> removeLRUEntry = this.collector.removeLRUEntry();
                if (removeLRUEntry.isEmpty()) {
                    return;
                }
                this.intermediateKeys.addAll(MapReduceManagerImpl.this.migrateIntermediateKeysAndValues(this.mcc, MapReduceManagerImpl.this.combine(this.mcc, removeLRUEntry)));
            }
        }

        @Override // org.infinispan.persistence.spi.AdvancedCacheLoader.CacheLoaderTask
        public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
            this.mcc.getMapper().map(marshalledEntry.getKey(), getValue(marshalledEntry), this.collector);
            combineAndMigrate();
        }

        private Object getValue(MarshalledEntry marshalledEntry) {
            Object value = marshalledEntry.getValue();
            return value instanceof MarshalledValue ? ((MarshalledValue) value).get() : value;
        }
    }

    @Inject
    public void init(EmbeddedCacheManager embeddedCacheManager, PersistenceManager persistenceManager, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService, ClusteringDependentLogic clusteringDependentLogic, TimeService timeService, Configuration configuration) {
        this.cacheManager = embeddedCacheManager;
        this.persistenceManager = persistenceManager;
        this.cdl = clusteringDependentLogic;
        this.executorService = executorService;
        this.timeService = timeService;
        this.chunkSize = configuration.clustering().stateTransfer().chunkSize();
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KIn, VIn, KOut, VOut> Map<KOut, List<VOut>> mapAndCombineForLocalReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        return combine(mapCombineCommand, map(mapCombineCommand).removeCollectedValues());
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KIn, VIn, KOut, VOut> Set<KOut> mapAndCombineForDistributedReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        try {
            return mapAndCombine(mapCombineCommand);
        } catch (Exception e) {
            throw new CacheException(e);
        }
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KOut, VOut> Map<KOut, VOut> reduce(ReduceCommand<KOut, VOut> reduceCommand) throws InterruptedException {
        ConcurrentMap makeConcurrentMap = CollectionFactory.makeConcurrentMap(256);
        reduce(reduceCommand, makeConcurrentMap);
        return makeConcurrentMap;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KOut, VOut> void reduce(ReduceCommand<KOut, VOut> reduceCommand, String str) throws InterruptedException {
        reduce(reduceCommand, this.cacheManager.getCache(str));
    }

    protected <KOut, VOut> void reduce(ReduceCommand<KOut, VOut> reduceCommand, final Map<KOut, VOut> map) throws InterruptedException {
        Set<KOut> keys = reduceCommand.getKeys();
        final String taskId = reduceCommand.getTaskId();
        if (keys == null || keys.isEmpty()) {
            throw new IllegalStateException("Reduce phase of MapReduceTask " + taskId + " on node " + this.cdl.getAddress() + " executed with empty input keys");
        }
        final Reducer<KOut, VOut> reducer = reduceCommand.getReducer();
        final boolean isEmitCompositeIntermediateKeys = reduceCommand.isEmitCompositeIntermediateKeys();
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        log.tracef("For m/r task %s invoking %s at %s", taskId, reduceCommand, this.cdl.getAddress());
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        try {
            Cache<?, ?> cache = this.cacheManager.getCache(reduceCommand.getCacheName());
            mapReduceTaskLifecycleService.onPreExecute(reducer, cache);
            AdvancedCacheLoader.KeyFilter keyFilter = AdvancedCacheLoader.KeyFilter.LOAD_ALL_FILTER;
            if (isEmitCompositeIntermediateKeys) {
                keyFilter = new IntermediateKeyFilter(taskId);
            }
            cache.getAdvancedCache().getDataContainer().executeTask(keyFilter, new DataContainerTask<KOut, List<VOut>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceManagerImpl.1
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super();
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.infinispan.commons.util.concurrent.ParallelIterableMap.KeyValueAction
                public void apply(Object obj, InternalCacheEntry internalCacheEntry) {
                    Object key = isEmitCompositeIntermediateKeys ? ((IntermediateCompositeKey) obj).getKey() : obj;
                    List<VOut> value = getValue(internalCacheEntry);
                    if (value == null) {
                        throw new IllegalStateException("Found invalid value in intermediate cache, for key " + key + " during reduce phase execution on " + MapReduceManagerImpl.this.cacheManager.getAddress() + " for M/R task " + taskId);
                    }
                    Object reduce = reducer.reduce(key, value.iterator());
                    map.put(key, reduce);
                    MapReduceManagerImpl.log.tracef("For m/r task %s reduced %s to %s at %s ", taskId, key, reduce, MapReduceManagerImpl.this.cdl.getAddress());
                }
            });
            if (log.isTraceEnabled()) {
                log.tracef("Reduce for task %s took %s milliseconds", reduceCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(reducer);
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Reduce for task %s took %s milliseconds", reduceCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(reducer);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <KIn, VIn, KOut, VOut> CollectableCollector<KOut, VOut> map(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        Cache cache = this.cacheManager.getCache(mapCombineCommand.getCacheName());
        Set<KIn> keys = mapCombineCommand.getKeys();
        final Mapper<KIn, VIn, KOut, VOut> mapper = mapCombineCommand.getMapper();
        boolean z = (keys == null || keys.isEmpty()) ? false : true;
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        final DefaultCollector defaultCollector = new DefaultCollector(!z);
        DataContainer dataContainer = cache.getAdvancedCache().getDataContainer();
        log.tracef("For m/r task %s invoking %s with input keys %s", mapCombineCommand.getTaskId(), mapCombineCommand, keys);
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        try {
            mapReduceTaskLifecycleService.onPreExecute(mapper, cache);
            if (z) {
                for (KIn kin : keys) {
                    mapper.map(kin, cache.get(kin), defaultCollector);
                }
            } else {
                dataContainer.executeTask(new PrimaryOwnerFilter(this.cdl), new DataContainerTask<KIn, VIn>() { // from class: org.infinispan.distexec.mapreduce.MapReduceManagerImpl.2
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                    }

                    @Override // org.infinispan.commons.util.concurrent.ParallelIterableMap.KeyValueAction
                    public void apply(Object obj, InternalCacheEntry internalCacheEntry) {
                        VIn value = getValue(internalCacheEntry);
                        if (value != null) {
                            mapper.map(obj, value, defaultCollector);
                        }
                    }
                });
            }
            if (this.persistenceManager != null && !z) {
                this.persistenceManager.processOnAllStores(new CompositeFilter(new PrimaryOwnerFilter(this.cdl), new CollectionKeyFilter(dataContainer.keySet())), new MapReduceCacheLoaderTask(mapper, defaultCollector), true, false);
            }
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            return defaultCollector;
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <KIn, VIn, KOut, VOut> Set<KOut> mapAndCombine(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws Exception {
        Cache cache = this.cacheManager.getCache(mapCombineCommand.getCacheName());
        Set<KIn> keys = mapCombineCommand.getKeys();
        int maxCollectorSize = mapCombineCommand.getMaxCollectorSize();
        Mapper<KIn, VIn, KOut, VOut> mapper = mapCombineCommand.getMapper();
        boolean z = (keys == null || keys.isEmpty()) ? false : true;
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        DefaultCollector defaultCollector = new DefaultCollector(maxCollectorSize, !z);
        DataContainer dataContainer = cache.getAdvancedCache().getDataContainer();
        log.tracef("For m/r task %s invoking %s with input keys %s", mapCombineCommand.getTaskId(), mapCombineCommand, mapCombineCommand.getKeys());
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        Set<KOut> synchronizedSet = Collections.synchronizedSet(new HashSet());
        try {
            mapReduceTaskLifecycleService.onPreExecute(mapper, cache);
            if (z) {
                for (KIn kin : keys) {
                    mapper.map(kin, cache.get(kin), defaultCollector);
                }
                synchronizedSet.addAll(migrateIntermediateKeysAndValues(mapCombineCommand, combine(mapCombineCommand, defaultCollector.removeCollectedValues())));
            } else {
                MapCombineTask mapCombineTask = new MapCombineTask(defaultCollector, mapCombineCommand, maxCollectorSize);
                dataContainer.executeTask(new PrimaryOwnerFilter(this.cdl), mapCombineTask);
                synchronizedSet.addAll(mapCombineTask.getIntermediateKeys());
                synchronizedSet.addAll(migrateIntermediateKeysAndValues(mapCombineCommand, combine(mapCombineCommand, defaultCollector.removeCollectedValues())));
            }
            if (this.persistenceManager != null && !z) {
                DefaultCollector defaultCollector2 = new DefaultCollector(maxCollectorSize, true);
                CompositeFilter compositeFilter = new CompositeFilter(new PrimaryOwnerFilter(this.cdl), new CollectionKeyFilter(dataContainer.keySet()));
                MapCombineTask mapCombineTask2 = new MapCombineTask(defaultCollector2, mapCombineCommand, maxCollectorSize);
                this.persistenceManager.processOnAllStores(compositeFilter, mapCombineTask2, true, false);
                synchronizedSet.addAll(mapCombineTask2.getIntermediateKeys());
                synchronizedSet.addAll(migrateIntermediateKeysAndValues(mapCombineCommand, combine(mapCombineCommand, defaultCollector2.removeCollectedValues())));
            }
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            return synchronizedSet;
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            throw th;
        }
    }

    protected <KIn, VIn, KOut, VOut> Map<KOut, List<VOut>> combine(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, Map<KOut, List<VOut>> map) {
        Map<KOut, List<VOut>> map2;
        if (mapCombineCommand.hasCombiner()) {
            map2 = new HashMap();
            Reducer<KOut, VOut> combiner = mapCombineCommand.getCombiner();
            Cache<?, ?> cache = this.cacheManager.getCache(mapCombineCommand.getCacheName());
            log.tracef("For m/r task %s invoking combiner %s at %s", mapCombineCommand.getTaskId(), mapCombineCommand, this.cdl.getAddress());
            MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
            long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
            try {
                mapReduceTaskLifecycleService.onPreExecute(combiner, cache);
                for (Map.Entry<KOut, List<VOut>> entry : map.entrySet()) {
                    List<VOut> value = entry.getValue();
                    List<VOut> asList = value.size() == 1 ? value : Arrays.asList(combiner.reduce(entry.getKey(), value.iterator()));
                    map2.put(entry.getKey(), asList);
                    log.tracef("For m/r task %s combined %s to %s at %s", mapCombineCommand.getTaskId(), entry.getKey(), asList, this.cdl.getAddress());
                }
                if (log.isTraceEnabled()) {
                    log.tracef("Combine for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
                }
                mapReduceTaskLifecycleService.onPostExecute(combiner);
            } catch (Throwable th) {
                if (log.isTraceEnabled()) {
                    log.tracef("Combine for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
                }
                mapReduceTaskLifecycleService.onPostExecute(combiner);
                throw th;
            }
        } else {
            map2 = map;
        }
        return map2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <KIn, VIn, KOut, VOut> Set<KOut> migrateIntermediateKeysAndValues(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, Map<KOut, List<VOut>> map) {
        String taskId = mapCombineCommand.getTaskId();
        String intermediateCacheName = mapCombineCommand.getIntermediateCacheName();
        Cache cache = this.cacheManager.getCache(intermediateCacheName);
        if (cache == null) {
            throw new IllegalStateException("Temporary cache for MapReduceTask " + taskId + " named " + intermediateCacheName + " not found on " + this.cdl.getAddress());
        }
        HashSet hashSet = new HashSet();
        DistributionManager distributionManager = cache.getAdvancedCache().getDistributionManager();
        boolean isEmitCompositeIntermediateKeys = mapCombineCommand.isEmitCompositeIntermediateKeys();
        Map mapKeysToNodes = mapKeysToNodes(distributionManager, taskId, map.keySet(), isEmitCompositeIntermediateKeys);
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        AdvancedCache withFlags = cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
        try {
            for (Map.Entry entry : mapKeysToNodes.entrySet()) {
                List list = (List) entry.getValue();
                try {
                    log.tracef("For m/r task %s migrating intermediate keys %s to %s", taskId, list, entry.getKey());
                    for (Object obj : list) {
                        List<VOut> list2 = map.get(obj);
                        int i = this.chunkSize;
                        for (int i2 = 0; i2 < list2.size(); i2 += i) {
                            DeltaAwareList deltaAwareList = new DeltaAwareList(list2.subList(i2, Math.min(list2.size(), i2 + i)));
                            if (isEmitCompositeIntermediateKeys) {
                                withFlags.put(new IntermediateCompositeKey(taskId, obj), deltaAwareList);
                            } else {
                                withFlags.put(obj, deltaAwareList);
                            }
                        }
                        hashSet.add(obj);
                    }
                } catch (Exception e) {
                    throw new CacheException("Could not move intermediate keys/values for M/R task " + taskId, e);
                }
            }
            if (log.isTraceEnabled()) {
                log.tracef("Migrating keys for task %s took %s milliseconds (Migrated %s keys)", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)), Integer.valueOf(hashSet.size()));
            }
            return hashSet;
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Migrating keys for task %s took %s milliseconds (Migrated %s keys)", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)), Integer.valueOf(hashSet.size()));
            }
            throw th;
        }
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <T> Map<Address, List<T>> mapKeysToNodes(DistributionManager distributionManager, String str, Collection<T> collection, boolean z) {
        HashMap hashMap = new HashMap();
        for (T t : collection) {
            Address primaryLocation = z ? distributionManager.getPrimaryLocation(new IntermediateCompositeKey(str, t)) : distributionManager.getPrimaryLocation(t);
            List list = (List) hashMap.get(primaryLocation);
            if (list == null) {
                list = new ArrayList();
                hashMap.put(primaryLocation, list);
            }
            list.add(t);
        }
        return hashMap;
    }

    protected <KIn> Set<KIn> filterLocalPrimaryOwner(Set<KIn> set, DistributionManager distributionManager) {
        HashSet hashSet = new HashSet();
        for (KIn kin : set) {
            Address primaryLocation = distributionManager != null ? distributionManager.getPrimaryLocation(kin) : this.cdl.getAddress();
            if (primaryLocation != null && primaryLocation.equals(this.cdl.getAddress())) {
                hashSet.add(kin);
            }
        }
        return hashSet;
    }
}
