/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.iteration.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.concurrent.ParallelIterableMap;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.filter.CompositeKeyFilter;
import org.infinispan.filter.CompositeKeyValueFilter;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyFilter;
import org.infinispan.filter.KeyFilterAsKeyValueFilter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.filter.KeyValueFilterAsKeyFilter;
import org.infinispan.filter.KeyValueFilterConverter;
import org.infinispan.iteration.impl.EntryRequestCommand;
import org.infinispan.iteration.impl.EntryResponseCommand;
import org.infinispan.iteration.impl.EntryRetriever;
import org.infinispan.iteration.impl.LocalEntryRetriever;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.util.concurrent.ConcurrentHashSet;

@Listener
public class DistributedEntryRetriever<K, V>
extends LocalEntryRetriever<K, V> {
    private final AtomicReference<ConsistentHash> currentHash = new AtomicReference();
    private DistributionManager distributionManager;
    private PersistenceManager persistenceManager;
    private CommandsFactory commandsFactory;
    private Address localAddress;
    private RpcManager rpcManager;
    private ExecutorService remoteExecutorService;
    private Map<UUID, IterationStatus<? extends Object>> iteratorDetails = CollectionFactory.makeConcurrentMap();
    private ConcurrentMap<UUID, SegmentChangeListener> changeListener = CollectionFactory.makeConcurrentMap();

    public DistributedEntryRetriever(int batchSize, long timeout, TimeUnit unit) {
        super(batchSize, timeout, unit);
    }

    @DataRehashed
    public void dataRehashed(DataRehashedEvent<K, V> event) {
        ConsistentHash startHash = event.getConsistentHashAtStart();
        ConsistentHash endHash = event.getConsistentHashAtEnd();
        boolean trace = this.log.isTraceEnabled();
        if (event.isPre() && startHash != null && endHash != null) {
            this.log.tracef("Data rehash occurring startHash: %s and endHash: %s", (Object)startHash, (Object)endHash);
            if (!this.changeListener.isEmpty()) {
                if (trace) {
                    this.log.tracef("Previous segments %s ", (Object)startHash.getPrimarySegmentsForOwner(this.localAddress));
                    this.log.tracef("After segments %s ", (Object)endHash.getPrimarySegmentsForOwner(this.localAddress));
                }
                HashSet<Integer> beforeSegments = new HashSet<Integer>(startHash.getPrimarySegmentsForOwner(this.localAddress));
                beforeSegments.removeAll(endHash.getPrimarySegmentsForOwner(this.localAddress));
                if (!beforeSegments.isEmpty()) {
                    for (Map.Entry entry : this.changeListener.entrySet()) {
                        if (trace) {
                            this.log.tracef("Notifying %s through SegmentChangeListener", entry.getKey());
                        }
                        ((SegmentChangeListener)entry.getValue()).changedSegments(beforeSegments);
                    }
                } else if (trace) {
                    this.log.tracef("No segments have been removed from data rehash, no notification required", new Object[0]);
                }
            }
        }
    }

    @TopologyChanged
    public void topologyChanged(TopologyChangedEvent<K, V> event) {
        if (event.isPre()) {
            ConsistentHash beforeHash = event.getConsistentHashAtStart();
            ConsistentHash afterHash = event.getConsistentHashAtEnd();
            this.currentHash.set(afterHash);
            boolean trace = this.log.isTraceEnabled();
            if (beforeHash != null && afterHash != null) {
                if (trace) {
                    this.log.tracef("Rehash hashes before %s after %s", (Object)beforeHash, (Object)afterHash);
                }
                HashSet<Address> leavers = new HashSet<Address>(beforeHash.getMembers());
                leavers.removeAll(afterHash.getMembers());
                if (!leavers.isEmpty() && trace) {
                    this.log.tracef("Found leavers are %s", (Object)leavers);
                }
                for (Map.Entry<UUID, IterationStatus<? extends Object>> details : this.iteratorDetails.entrySet()) {
                    Set<Integer> processSegments;
                    UUID identifier = details.getKey();
                    final IterationStatus<? extends Object> status = details.getValue();
                    Set<Integer> remoteSegments = this.findMissingRemoteSegments(status.processedKeys, afterHash);
                    if (!remoteSegments.isEmpty()) {
                        Map.Entry<Address, Set<Integer>> route = this.findOptimalRoute(remoteSegments, afterHash);
                        AtomicReference<Address> awaitingResponsefrom = status.awaitingResponseFrom;
                        Address waitingFor = awaitingResponsefrom.get();
                        boolean sendRequest = leavers.contains(waitingFor);
                        if (sendRequest) {
                            if (trace) {
                                this.log.tracef("Resending new segment request %s for identifier %s since node %s has gone down", (Object)route.getValue(), (Object)identifier, (Object)waitingFor);
                            }
                        } else {
                            sendRequest = waitingFor == null && awaitingResponsefrom.compareAndSet(null, route.getKey());
                            if (sendRequest && trace) {
                                this.log.tracef("There is no pending remote request for identifier %s, sending new one for segments %s", (Object)identifier, (Object)route.getValue());
                            }
                        }
                        if (sendRequest) {
                            if (status.ongoingIterator != null) {
                                this.sendRequest(false, route, identifier, status);
                            } else {
                                awaitingResponsefrom.set(null);
                                if (trace) {
                                    this.log.tracef("Not sending request since iterator has been closed for identifier %s", (Object)identifier);
                                }
                            }
                        }
                    } else {
                        details.getValue().awaitingResponseFrom.set(null);
                    }
                    if ((processSegments = this.findMissingLocalSegments(status.processedKeys, afterHash)).isEmpty()) continue;
                    if (trace) {
                        this.log.tracef("Rehash caused our local node to acquire new segments %s for iteration %s processing", (Object)processSegments, (Object)identifier);
                    }
                    this.startRetrievingValuesLocal(identifier, processSegments, status, new SegmentBatchHandler<K, Object>(){

                        @Override
                        public void handleBatch(UUID identifier, boolean complete, Set<Integer> completedSegments, Set<Integer> inDoubtSegments, Collection<CacheEntry<K, Object>> entries) {
                            DistributedEntryRetriever.this.processData(identifier, DistributedEntryRetriever.this.localAddress, completedSegments, inDoubtSegments, entries);
                        }

                        @Override
                        public void handleException(CacheException e) {
                            status.ongoingIterator.close(e);
                        }
                    });
                }
            }
        }
    }

    @Inject
    public void initialize(DistributionManager distributionManager, PersistenceManager persistenceManager, CommandsFactory commandsFactory, RpcManager rpcManager, @ComponentName(value="org.infinispan.executors.remote") ExecutorService remoteExecutorService) {
        this.distributionManager = distributionManager;
        this.persistenceManager = persistenceManager;
        this.commandsFactory = commandsFactory;
        this.rpcManager = rpcManager;
        this.remoteExecutorService = remoteExecutorService;
    }

    @Override
    @Start
    public void start() {
        super.start();
        this.cache.addListener(this);
        this.localAddress = this.rpcManager.getAddress();
    }

    @Override
    public <C> void startRetrievingValues(final UUID identifier, final Address origin, Set<Integer> segments, KeyValueFilter<? super K, ? super V> filter, Converter<? super K, ? super V, C> converter, Set<Flag> flags) {
        if (this.log.isTraceEnabled()) {
            this.log.tracef("Received entry request for %s from node %s for segments %s", (Object)identifier, (Object)origin, (Object)segments);
        }
        this.wireFilterAndConverterDependencies(filter, converter);
        this.startRetrievingValues(identifier, segments, filter, converter, flags, new SegmentBatchHandler<K, C>(){

            @Override
            public void handleBatch(UUID identifier2, boolean complete, Set<Integer> completedSegments, Set<Integer> inDoubtSegments, Collection<CacheEntry<K, C>> entries) {
                if (DistributedEntryRetriever.this.cache.getStatus() != ComponentStatus.RUNNING) {
                    if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                        DistributedEntryRetriever.this.log.tracef("Cache status is no longer running, all segments are now suspect", new Object[0]);
                    }
                    inDoubtSegments.addAll(completedSegments);
                    completedSegments.clear();
                }
                if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                    DistributedEntryRetriever.this.log.tracef("Sending batch response for %s to origin %s with %s completed segments, %s in doubt segments and %s values", identifier2, origin, completedSegments, inDoubtSegments, entries.size());
                }
                EntryResponseCommand command = DistributedEntryRetriever.this.commandsFactory.buildEntryResponseCommand(identifier2, completedSegments, inDoubtSegments, entries, null);
                DistributedEntryRetriever.this.rpcManager.invokeRemotely(Collections.singleton(origin), command, DistributedEntryRetriever.this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS).timeout(Long.MAX_VALUE, TimeUnit.SECONDS).build());
            }

            @Override
            public void handleException(CacheException e) {
                EntryResponseCommand command = DistributedEntryRetriever.this.commandsFactory.buildEntryResponseCommand(identifier, null, null, null, e);
                DistributedEntryRetriever.this.rpcManager.invokeRemotely(Collections.singleton(origin), command, DistributedEntryRetriever.this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS).timeout(Long.MAX_VALUE, TimeUnit.SECONDS).build());
            }
        });
    }

    private <H, C extends H> void startRetrievingValues(final UUID identifier, final Set<Integer> segments, final KeyValueFilter<? super K, ? super V> filter, final Converter<? super K, ? super V, C> converter, final Set<Flag> flags, final SegmentBatchHandler<K, H> handler) {
        ConsistentHash hash = this.getCurrentHash();
        final HashSet<Integer> inDoubtSegments = new HashSet<Integer>(segments.size());
        boolean canTryProcess = false;
        Iterator<Integer> iter = segments.iterator();
        while (iter.hasNext()) {
            Integer segment = iter.next();
            if (this.localAddress.equals(hash.locatePrimaryOwnerForSegment(segment))) {
                canTryProcess = true;
                continue;
            }
            inDoubtSegments.add(segment);
            iter.remove();
        }
        if (canTryProcess) {
            this.executorService.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Set segmentsToUse = segments;
                    Set inDoubtSegmentsToUse = inDoubtSegments;
                    ConsistentHash hashToUse = DistributedEntryRetriever.this.getCurrentHash();
                    boolean repeat = true;
                    while (repeat) {
                        if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                            DistributedEntryRetriever.this.log.tracef("Starting retrieval of values for identifier %s", (Object)identifier);
                        }
                        SegmentChangeListener segmentChangeListener = new SegmentChangeListener();
                        DistributedEntryRetriever.this.changeListener.put(identifier, segmentChangeListener);
                        try {
                            final Set processedKeys = CollectionFactory.makeSet(DistributedEntryRetriever.this.keyEquivalence);
                            ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<CacheEntry<K, C>>(){

                                @Override
                                public boolean add(CacheEntry<K, C> kcEntry) {
                                    processedKeys.add(kcEntry.getKey());
                                    return super.add(kcEntry);
                                }
                            };
                            MapAction action = new MapAction(identifier, segmentsToUse, inDoubtSegmentsToUse, DistributedEntryRetriever.this.batchSize, converter, handler, queue);
                            LocalEntryRetriever.PassivationListener listener = null;
                            long currentTime = DistributedEntryRetriever.this.timeService.wallClockTime();
                            try {
                                for (InternalCacheEntry entry : DistributedEntryRetriever.this.dataContainer) {
                                    if (entry.isExpired(currentTime)) continue;
                                    InternalCacheEntry clone = DistributedEntryRetriever.this.entryFactory.create(LocalEntryRetriever.unwrapMarshalledvalue(entry.getKey()), LocalEntryRetriever.unwrapMarshalledvalue(entry.getValue()), entry);
                                    Object key = clone.getKey();
                                    if (filter != null) {
                                        if (converter == null && filter instanceof KeyValueFilterConverter) {
                                            Object converted = ((KeyValueFilterConverter)filter).filterAndConvert(key, clone.getValue(), clone.getMetadata());
                                            if (converted == null) continue;
                                            clone.setValue(converted);
                                        } else if (!filter.accept(key, clone.getValue(), clone.getMetadata())) continue;
                                    }
                                    action.apply(key, clone);
                                }
                                if (DistributedEntryRetriever.this.shouldUseLoader(flags) && DistributedEntryRetriever.this.persistenceManager.getStoresAsString().size() > 0) {
                                    if (DistributedEntryRetriever.this.passivationEnabled) {
                                        listener = new LocalEntryRetriever.PassivationListener();
                                        DistributedEntryRetriever.this.cache.addListener(listener);
                                    }
                                    CompositeKeyFilter loaderFilter = filter == null || converter == null && filter instanceof KeyValueFilterConverter ? new CompositeKeyFilter(new SegmentFilter(hashToUse, segmentsToUse), new CollectionKeyFilter(processedKeys)) : new CompositeKeyFilter(new SegmentFilter(hashToUse, segmentsToUse), new CollectionKeyFilter(processedKeys), new KeyValueFilterAsKeyFilter(filter));
                                    if (converter == null && filter instanceof KeyValueFilterConverter) {
                                        action = new MapAction(identifier, segmentsToUse, inDoubtSegmentsToUse, DistributedEntryRetriever.this.batchSize, (KeyValueFilterConverter)filter, handler, queue);
                                    }
                                    DistributedEntryRetriever.this.persistenceManager.processOnAllStores(DistributedEntryRetriever.this.withinThreadExecutor, loaderFilter, new LocalEntryRetriever.KeyValueActionForCacheLoaderTask(DistributedEntryRetriever.this, action), true, true);
                                }
                            }
                            finally {
                                if (listener != null) {
                                    DistributedEntryRetriever.this.cache.removeListener(listener);
                                    AdvancedCache advancedCache = DistributedEntryRetriever.this.cache.getAdvancedCache();
                                    for (Object key : listener.activatedKeys) {
                                        CacheEntry entry;
                                        if (processedKeys.contains(key) || (entry = advancedCache.getCacheEntry(key)) == null) continue;
                                        CacheEntry clone = entry.clone();
                                        if (filter != null) {
                                            if (converter == null && filter instanceof KeyValueFilterConverter) {
                                                Object converted = ((KeyValueFilterConverter)filter).filterAndConvert(key, clone.getValue(), clone.getMetadata());
                                                if (converted == null) continue;
                                                clone.setValue(converted);
                                            } else if (!filter.accept(key, clone.getValue(), clone.getMetadata())) continue;
                                        }
                                        action.apply(clone.getKey(), clone);
                                    }
                                }
                            }
                            HashSet<Integer> completedSegments = new HashSet<Integer>();
                            for (Integer segment : segmentsToUse) {
                                if (DistributedEntryRetriever.this.localAddress.equals(DistributedEntryRetriever.this.getCurrentHash().locatePrimaryOwnerForSegment(segment)) && !segmentChangeListener.changedSegments.contains(segment)) {
                                    completedSegments.add(segment);
                                    continue;
                                }
                                inDoubtSegmentsToUse.add(segment);
                            }
                            ArrayList entriesToSend = new ArrayList(queue);
                            handler.handleBatch(identifier, true, completedSegments, inDoubtSegmentsToUse, entriesToSend);
                            if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                                DistributedEntryRetriever.this.log.tracef("Completed data iteration for request %s with segments %s", (Object)identifier, (Object)segmentsToUse);
                            }
                        }
                        catch (Throwable t) {
                            CacheException e = DistributedEntryRetriever.this.log.exceptionProcessingEntryRetrievalValues(t);
                            handler.handleException(e);
                        }
                        finally {
                            DistributedEntryRetriever.this.changeListener.remove(identifier);
                        }
                        if (repeat = DistributedEntryRetriever.this.shouldRepeatApplication(identifier)) {
                            hashToUse = DistributedEntryRetriever.this.getCurrentHash();
                            IterationStatus status = (IterationStatus)DistributedEntryRetriever.this.iteratorDetails.get(identifier);
                            if (status != null) {
                                segmentsToUse = DistributedEntryRetriever.this.findMissingLocalSegments(status.processedKeys, hashToUse);
                                inDoubtSegmentsToUse.clear();
                                if (!DistributedEntryRetriever.this.log.isTraceEnabled()) continue;
                                if (!segmentsToUse.isEmpty()) {
                                    DistributedEntryRetriever.this.log.tracef("Local retrieval found it should rerun - now finding segments %s for identifier %s", (Object)segmentsToUse, (Object)identifier);
                                    continue;
                                }
                                DistributedEntryRetriever.this.log.tracef("Local retrieval for identifier %s was told to rerun - however no new segments were found, looping around to try again", (Object)identifier);
                                continue;
                            }
                            DistributedEntryRetriever.this.log.tracef("Not repeating local retrieval since iteration was completed", new Object[0]);
                            repeat = false;
                            continue;
                        }
                        if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                            DistributedEntryRetriever.this.log.tracef("Completed request %s for segments %s", (Object)identifier, (Object)segmentsToUse);
                        }
                        repeat = false;
                    }
                }
            });
        } else {
            if (this.log.isTraceEnabled()) {
                this.log.tracef("Our node no longer has any of the segments %s that were requested for %s", (Object)inDoubtSegments, (Object)identifier);
            }
            this.executorService.execute(new Runnable(){

                @Override
                public void run() {
                    Set emptyEntries = Collections.emptySet();
                    handler.handleBatch(identifier, true, segments, inDoubtSegments, emptyEntries);
                }
            });
        }
    }

    private <H, C extends H> void startRetrievingValuesLocal(UUID identifier, Set<Integer> segments, IterationStatus<C> status, SegmentBatchHandler<K, H> handler) {
        boolean shouldRun = this.updatedLocalAndRun(identifier);
        if (shouldRun) {
            if (this.log.isTraceEnabled()) {
                this.log.tracef("Starting local request to retrieve segments %s for identifier %s", (Object)segments, (Object)identifier);
            }
            this.startRetrievingValues(identifier, segments, status.filter, status.converter, status.flags, handler);
        } else if (this.log.isTraceEnabled()) {
            this.log.tracef("Not running local retrieval as another thread is handling it for identifier %s.", (Object)identifier);
        }
    }

    @Override
    public <C> CloseableIterator<CacheEntry<K, C>> retrieveEntries(KeyValueFilter<? super K, ? super V> filter, Converter<? super K, ? super V, ? extends C> converter, Set<Flag> flags, EntryRetriever.SegmentListener listener) {
        if (flags != null && flags.contains((Object)Flag.CACHE_MODE_LOCAL)) {
            this.log.trace("Skipping distributed entry retrieval and processing local only as CACHE_MODE_LOCAL flag was set");
            return super.retrieveEntries(filter, converter, flags, listener);
        }
        ConsistentHash hash = this.getCurrentHash();
        if (!hash.getMembers().contains(this.localAddress)) {
            this.log.trace("Skipping distributed entry retrieval and processing local since we are not part of the consistent hash");
            return super.retrieveEntries(filter, converter, flags, listener);
        }
        UUID identifier = UUID.randomUUID();
        Converter<? super K, ? super V, ? extends C> usedConverter = this.checkForKeyValueFilterConverter(filter, converter);
        if (this.log.isTraceEnabled()) {
            this.log.tracef("Processing entry retrieval request with identifier %s with filter %s and converter %s", (Object)identifier, (Object)filter, (Object)usedConverter);
        }
        DistributedItr itr = new DistributedItr(this.batchSize, identifier, listener, hash);
        this.registerIterator(itr, flags);
        HashSet<Integer> remoteSegments = new HashSet<Integer>();
        AtomicReferenceArray processedKeys = new AtomicReferenceArray(hash.getNumSegments());
        for (int i = 0; i < processedKeys.length(); ++i) {
            processedKeys.set(i, new ConcurrentHashSet());
            remoteSegments.add(i);
        }
        final IterationStatus<? extends C> status = new IterationStatus<C>(itr, listener, filter, usedConverter, flags, processedKeys);
        this.iteratorDetails.put(identifier, status);
        Set<Integer> ourSegments = hash.getPrimarySegmentsForOwner(this.localAddress);
        remoteSegments.removeAll(ourSegments);
        if (!remoteSegments.isEmpty()) {
            this.eventuallySendRequest(identifier, status);
        }
        if (!ourSegments.isEmpty()) {
            this.wireFilterAndConverterDependencies(filter, usedConverter);
            this.startRetrievingValuesLocal(identifier, ourSegments, status, new SegmentBatchHandler<K, C>(){

                @Override
                public void handleBatch(UUID identifier, boolean complete, Set<Integer> completedSegments, Set<Integer> inDoubtSegments, Collection<CacheEntry<K, C>> entries) {
                    DistributedEntryRetriever.this.processData(identifier, DistributedEntryRetriever.this.localAddress, completedSegments, inDoubtSegments, entries);
                }

                @Override
                public void handleException(CacheException e) {
                    status.ongoingIterator.close(e);
                }
            });
        }
        return itr;
    }

    private ConsistentHash getCurrentHash() {
        ConsistentHash hash = this.currentHash.get();
        if (hash == null) {
            this.currentHash.compareAndSet(null, this.distributionManager.getReadConsistentHash());
            hash = this.currentHash.get();
        }
        return hash;
    }

    private <C> boolean eventuallySendRequest(UUID identifier, IterationStatus<? extends Object> status) {
        boolean sent = false;
        while (!sent) {
            if (!this.iteratorDetails.containsKey(identifier)) {
                if (this.log.isTraceEnabled()) {
                    this.log.tracef("Cannot send remote request as our iterator was concurrently closed for %s", (Object)identifier);
                }
                return false;
            }
            ConsistentHash hash = this.getCurrentHash();
            Set<Integer> missingRemoteSegments = this.findMissingRemoteSegments(status.processedKeys, hash);
            if (!missingRemoteSegments.isEmpty()) {
                Map.Entry<Address, Set<Integer>> route = this.findOptimalRoute(missingRemoteSegments, hash);
                if (!status.awaitingResponseFrom.compareAndSet(null, route.getKey())) break;
                sent = this.sendRequest(true, route, identifier, status);
                continue;
            }
            if (!this.log.isTraceEnabled()) break;
            this.log.tracef("Cannot send remote request as there are no longer any remote segments missing for %s", (Object)identifier);
            break;
        }
        return sent;
    }

    private <C> boolean sendRequest(boolean sync, Map.Entry<Address, Set<Integer>> route, UUID identifier, IterationStatus<? extends Object> status) {
        KeyValueFilter filterToSend;
        if (this.log.isTraceEnabled()) {
            this.log.tracef("Sending request to %s for identifier %s", (Object)route, (Object)identifier);
        }
        Address address = route.getKey();
        status.awaitingResponseFrom.set(address);
        Set<Integer> segments = route.getValue();
        HashSet keysToFilter = new HashSet();
        AtomicReferenceArray ourEntries = status.processedKeys;
        for (Integer segment : segments) {
            Set valuesSeen = ourEntries.get(segment);
            if (valuesSeen == null) continue;
            keysToFilter.addAll(valuesSeen);
        }
        if (status.filter == null) {
            if (!keysToFilter.isEmpty()) {
                if (this.log.isTraceEnabled()) {
                    this.log.tracef("Applying filter for %s of keys", (Object)keysToFilter.size());
                }
                filterToSend = new KeyFilterAsKeyValueFilter(new CollectionKeyFilter(keysToFilter));
            } else {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("No filter applied");
                }
                filterToSend = null;
            }
        } else if (!keysToFilter.isEmpty()) {
            if (this.log.isTraceEnabled()) {
                this.log.tracef("Applying filter for %s of keys with provided filter %s", (Object)keysToFilter.size(), (Object)status.filter);
            }
            filterToSend = new CompositeKeyValueFilter(new KeyFilterAsKeyValueFilter(new CollectionKeyFilter(keysToFilter)), status.filter);
        } else {
            if (this.log.isTraceEnabled()) {
                this.log.tracef("Using provided filter %s", (Object)status.filter);
            }
            filterToSend = status.filter;
        }
        EntryRequestCommand command = this.commandsFactory.buildEntryRequestCommand(identifier, segments, filterToSend, status.converter, status.flags);
        try {
            Response response;
            RpcOptions options = this.rpcManager.getRpcOptionsBuilder(sync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS).build();
            Map<Address, Response> responseMap = this.rpcManager.invokeRemotely(Collections.singleton(address), command, options);
            if (sync && !(response = responseMap.values().iterator().next()).isSuccessful()) {
                Exception cause;
                Exception exception = cause = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
                if (this.log.isTraceEnabled()) {
                    this.log.tracef((Throwable)cause, "Unsuccessful response received from node %s for %s, must resend to a new node!", (Object)route.getKey(), (Object)identifier);
                }
                DistributedEntryRetriever.atomicRemove(status.awaitingResponseFrom, address);
                return false;
            }
            return true;
        }
        catch (SuspectException e) {
            if (this.log.isTraceEnabled()) {
                this.log.tracef("Request to %s for %s was suspect, must resend to a new node!", (Object)route, (Object)identifier);
            }
            DistributedEntryRetriever.atomicRemove(status.awaitingResponseFrom, address);
            return false;
        }
    }

    private Set<Integer> findMissingLocalSegments(AtomicReferenceArray<Set<K>> processValues, ConsistentHash hash) {
        Set<Integer> ourSegments = hash.getPrimarySegmentsForOwner(this.localAddress);
        HashSet<Integer> returnSegments = new HashSet<Integer>();
        for (Integer segment : ourSegments) {
            if (processValues.get(segment) == null) continue;
            returnSegments.add(segment);
        }
        return returnSegments;
    }

    private boolean updatedLocalAndRun(UUID identifier) {
        boolean shouldRun = false;
        boolean updated = false;
        IterationStatus<? extends Object> details = this.iteratorDetails.get(identifier);
        if (details != null) {
            AtomicReference<LocalStatus> localRunning = details.localRunning;
            while (!updated) {
                LocalStatus status = localRunning.get();
                if (status == null) {
                    updated = true;
                }
                switch (status) {
                    case IDLE: {
                        updated = shouldRun = localRunning.compareAndSet(LocalStatus.IDLE, LocalStatus.RUNNING);
                        break;
                    }
                    case REPEAT: {
                        updated = true;
                        break;
                    }
                    case RUNNING: {
                        updated = localRunning.compareAndSet(LocalStatus.RUNNING, LocalStatus.REPEAT);
                    }
                }
            }
        }
        return shouldRun;
    }

    private boolean shouldRepeatApplication(UUID identifier) {
        boolean shouldRun = false;
        boolean updated = false;
        IterationStatus<? extends Object> details = this.iteratorDetails.get(identifier);
        if (details != null) {
            AtomicReference<LocalStatus> localRunning = details.localRunning;
            while (!updated) {
                LocalStatus status = localRunning.get();
                if (status == null) {
                    throw new IllegalStateException("Status should never be null");
                }
                switch (status) {
                    case IDLE: {
                        throw new IllegalStateException("This should never be seen as IDLE by the running thread");
                    }
                    case REPEAT: {
                        updated = shouldRun = localRunning.compareAndSet(LocalStatus.REPEAT, LocalStatus.RUNNING);
                        break;
                    }
                    case RUNNING: {
                        updated = localRunning.compareAndSet(LocalStatus.RUNNING, LocalStatus.IDLE);
                    }
                }
            }
        }
        return shouldRun;
    }

    private boolean missingRemoteSegment(AtomicReferenceArray<Set<K>> processValues, ConsistentHash hash) {
        boolean missingRemote = false;
        if (processValues != null) {
            Set<Integer> localSegments = hash.getPrimarySegmentsForOwner(this.localAddress);
            for (int i = 0; i < processValues.length(); ++i) {
                if (processValues.get(i) == null || localSegments.contains(i)) continue;
                missingRemote = true;
                break;
            }
        }
        return missingRemote;
    }

    private Set<Integer> findMissingRemoteSegments(AtomicReferenceArray<Set<K>> processValues, ConsistentHash hash) {
        Set<Integer> localSegments = hash.getPrimarySegmentsForOwner(this.localAddress);
        HashSet<Integer> segments = new HashSet<Integer>();
        if (processValues != null) {
            for (int i = 0; i < processValues.length(); ++i) {
                if (processValues.get(i) == null || localSegments.contains(i)) continue;
                segments.add(i);
            }
        }
        return segments;
    }

    private Map.Entry<Address, Set<Integer>> findOptimalRoute(Set<Integer> segmentsToFind, ConsistentHash hash) {
        Map.Entry route = null;
        int segmentCount = hash.getNumSegments();
        HashMap<Address, HashSet<Integer>> routes = new HashMap<Address, HashSet<Integer>>();
        for (int i = 0; i < segmentCount; ++i) {
            if (segmentsToFind != null && !segmentsToFind.contains(i)) continue;
            Address address = hash.locatePrimaryOwnerForSegment(i);
            HashSet<Integer> segments = (HashSet<Integer>)routes.get(address);
            if (segments == null) {
                segments = new HashSet<Integer>();
                routes.put(address, segments);
            }
            segments.add(i);
        }
        for (Map.Entry mappedRoute : routes.entrySet()) {
            if (((Address)mappedRoute.getKey()).equals(this.localAddress)) continue;
            if (route == null) {
                route = mappedRoute;
                continue;
            }
            if (((Set)route.getValue()).size() <= ((Set)mappedRoute.getValue()).size()) continue;
            route = mappedRoute;
        }
        return route;
    }

    @Override
    public <C> void receiveResponse(UUID identifier, Address origin, Set<Integer> completedSegments, Set<Integer> inDoubtSegments, Collection<CacheEntry<K, C>> entries, CacheException e) {
        IterationStatus<? extends Object> status;
        if (this.log.isTraceEnabled()) {
            this.log.tracef("Processing response for identifier %s", (Object)identifier);
        }
        if (e != null) {
            this.log.tracef("Response for identifier %s contained exception", (Object)identifier, (Object)e);
        } else {
            try {
                this.processData(identifier, origin, completedSegments, inDoubtSegments, entries);
            }
            catch (Throwable t) {
                e = this.log.exceptionProcessingIteratorResponse(identifier, e);
            }
        }
        if (e != null && (status = this.iteratorDetails.get(identifier)) != null) {
            status.ongoingIterator.close(e);
        }
    }

    private <C> void processData(final UUID identifier, Address origin, Set<Integer> completedSegments, Set<Integer> inDoubtSegments, Collection<CacheEntry<K, C>> entries) {
        final IterationStatus<? extends Object> status = this.iteratorDetails.get(identifier);
        if (status != null) {
            final AtomicReferenceArray processedKeys = status.processedKeys;
            final DistributedItr itr = status.ongoingIterator;
            if (this.log.isTraceEnabled()) {
                this.log.tracef("Processing data for identifier %s completedSegments: %s inDoubtSegments: %s entryCount: %s", identifier, completedSegments, inDoubtSegments, entries.size());
            }
            ArrayList nonDuplicateEntries = new ArrayList(entries.size());
            HashMap<Integer, ConcurrentHashSet<Integer>> finishedKeysForSegment = new HashMap<Integer, ConcurrentHashSet<Integer>>();
            for (int completedSegment : completedSegments) {
                if (processedKeys.get(completedSegment) == null) continue;
                finishedKeysForSegment.put(completedSegment, new ConcurrentHashSet());
            }
            ConsistentHash hash = this.getCurrentHash();
            for (CacheEntry<K, C> entry : entries) {
                K key = entry.getKey();
                int segment = hash.getSegment(key);
                Set seenSet = processedKeys.get(segment);
                if (seenSet == null || !seenSet.add(key)) continue;
                ConcurrentHashSet finishedKeys = (ConcurrentHashSet)finishedKeysForSegment.get(segment);
                if (finishedKeys != null) {
                    finishedKeys.add(key);
                }
                nonDuplicateEntries.add(entry);
            }
            itr.addKeysForSegment(finishedKeysForSegment);
            try {
                itr.addEntries(nonDuplicateEntries);
            }
            catch (InterruptedException e) {
                if (this.log.isTraceEnabled()) {
                    this.log.tracef("Iteration thread was interrupted, stopping iteration for identifier %s", (Object)identifier);
                }
                this.completeIteration(identifier);
            }
            if (!completedSegments.isEmpty()) {
                if (this.log.isTraceEnabled()) {
                    this.log.tracef("Completing segments %s for identifier %s", (Object)completedSegments, (Object)identifier);
                }
                for (Integer completeSegment : completedSegments) {
                    processedKeys.set(completeSegment, null);
                }
            }
            if (!completedSegments.isEmpty() || !inDoubtSegments.isEmpty()) {
                Set<Integer> localSegments;
                boolean complete = true;
                hash = this.getCurrentHash();
                boolean isMissingRemoteSegments = this.missingRemoteSegment(processedKeys, hash);
                if (isMissingRemoteSegments) {
                    if (this.log.isTraceEnabled()) {
                        this.log.tracef("Request %s not yet complete, remote segments %s are still missing", (Object)identifier, (Object)this.findMissingRemoteSegments(processedKeys, hash));
                    }
                    complete = false;
                    if (origin != this.localAddress) {
                        if (DistributedEntryRetriever.atomicRemove(status.awaitingResponseFrom, origin)) {
                            if (this.log.isTraceEnabled()) {
                                this.log.tracef("Sending request for %s via remote transport thread", (Object)identifier);
                            }
                            this.remoteExecutorService.submit(new Runnable(){

                                @Override
                                public void run() {
                                    DistributedEntryRetriever.this.eventuallySendRequest(identifier, status);
                                }
                            });
                        } else if (this.log.isTraceEnabled()) {
                            this.log.tracef("Not sending new remote request as %s was either stopped or %s went down", (Object)identifier, (Object)origin);
                        }
                    }
                } else if (origin != this.localAddress) {
                    status.awaitingResponseFrom.set(null);
                    this.remoteExecutorService.submit(new Runnable(){

                        @Override
                        public void run() {
                            while (DistributedEntryRetriever.this.missingRemoteSegment(processedKeys, DistributedEntryRetriever.this.getCurrentHash()) && DistributedEntryRetriever.this.iteratorDetails.containsKey(identifier) && !DistributedEntryRetriever.this.eventuallySendRequest(identifier, status)) {
                                status.awaitingResponseFrom.set(null);
                            }
                        }
                    });
                }
                if (!(localSegments = this.findMissingLocalSegments(processedKeys, hash)).isEmpty()) {
                    if (this.log.isTraceEnabled()) {
                        this.log.tracef("Request %s not yet complete, local segments %s are still missing", (Object)identifier, (Object)localSegments);
                    }
                    complete = false;
                    this.startRetrievingValuesLocal(identifier, localSegments, status, new SegmentBatchHandler<K, Object>(){

                        @Override
                        public void handleBatch(UUID identifier, boolean complete, Set<Integer> completedSegments, Set<Integer> inDoubtSegments, Collection<CacheEntry<K, Object>> entries) {
                            DistributedEntryRetriever.this.processData(identifier, DistributedEntryRetriever.this.localAddress, completedSegments, inDoubtSegments, entries);
                        }

                        @Override
                        public void handleException(CacheException e) {
                            itr.close(e);
                        }
                    });
                }
                if (complete) {
                    this.completeIteration(identifier);
                }
            }
        } else if (this.log.isTraceEnabled()) {
            this.log.tracef("Ignoring values as identifier %s was marked as complete", (Object)identifier);
        }
    }

    private static <V> boolean atomicRemove(AtomicReference<V> ref, V object) {
        V refObject = ref.get();
        if (object.equals(refObject)) {
            return ref.compareAndSet(refObject, null);
        }
        return false;
    }

    private void completeIteration(UUID identifier) {
        IterationStatus<? extends Object> status;
        if (this.log.isTraceEnabled()) {
            this.log.tracef("Processing complete for identifier %s", (Object)identifier);
        }
        if ((status = this.iteratorDetails.get(identifier)) != null) {
            DistributedItr itr = status.ongoingIterator;
            this.partitionListener.iterators.remove(itr);
            itr.close();
        }
    }

    private class SegmentChangeListener {
        private final Set<Integer> changedSegments = new ConcurrentHashSet<Integer>();

        private SegmentChangeListener() {
        }

        public void changedSegments(Set<Integer> changedSegments) {
            if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                DistributedEntryRetriever.this.log.tracef("Adding changed segments %s so iteration can properly suspect them", (Object)changedSegments);
            }
            for (Integer segment : changedSegments) {
                changedSegments.add(segment);
            }
        }
    }

    private static class SegmentFilter<K>
    implements KeyFilter<K> {
        private final ConsistentHash hash;
        private final Set<Integer> segments;

        public SegmentFilter(ConsistentHash hash, Set<Integer> segments) {
            this.hash = hash;
            this.segments = segments;
        }

        @Override
        public boolean accept(K key) {
            return this.segments.contains(this.hash.getSegment(key));
        }
    }

    static interface SegmentBatchHandler<K, C> {
        public void handleBatch(UUID var1, boolean var2, Set<Integer> var3, Set<Integer> var4, Collection<CacheEntry<K, C>> var5);

        public void handleException(CacheException var1);
    }

    private class MapAction<K, V, C>
    implements ParallelIterableMap.KeyValueAction<K, CacheEntry<K, V>> {
        final UUID identifier;
        final Set<Integer> segments;
        final int batchSize;
        final Converter<? super K, ? super V, C> converter;
        final SegmentBatchHandler<K, C> handler;
        final Queue<CacheEntry<K, C>> queue;
        final AtomicInteger insertionCount = new AtomicInteger();

        public MapAction(UUID identifier, Set<Integer> segments, Set<Integer> inDoubtSegments, int batchSize, Converter<? super K, ? super V, C> converter, SegmentBatchHandler<K, C> handler, Queue<CacheEntry<K, C>> queue) {
            this.identifier = identifier;
            this.segments = segments;
            this.batchSize = batchSize;
            this.converter = converter;
            this.handler = handler;
            this.queue = queue;
        }

        @Override
        public void apply(K k, CacheEntry<K, V> kvInternalCacheEntry) {
            ConsistentHash hash = DistributedEntryRetriever.this.getCurrentHash();
            if (this.segments.contains(hash.getSegment(k))) {
                CacheEntry<K, C> clone = kvInternalCacheEntry.clone();
                if (this.converter != null) {
                    C value = this.converter.convert(k, kvInternalCacheEntry.getValue(), kvInternalCacheEntry.getMetadata());
                    if (value == null && this.converter instanceof KeyValueFilterConverter) {
                        return;
                    }
                    clone.setValue(value);
                }
                this.queue.add(clone);
                if (this.insertionCount.incrementAndGet() % this.batchSize == 0) {
                    ArrayList entriesToSend = new ArrayList(this.batchSize);
                    while (entriesToSend.size() != this.batchSize) {
                        entriesToSend.add(this.queue.poll());
                    }
                    Set<Integer> emptySet = Collections.emptySet();
                    this.handler.handleBatch(this.identifier, false, emptySet, emptySet, entriesToSend);
                }
            }
        }
    }

    protected class DistributedItr<C>
    extends LocalEntryRetriever.Itr<C> {
        private final UUID identifier;
        private final ConsistentHash hash;
        private final ConcurrentMap<Integer, Set<K>> keysNeededToComplete;
        private final EntryRetriever.SegmentListener segmentListener;

        public DistributedItr(int batchSize, UUID identifier, EntryRetriever.SegmentListener segmentListener, ConsistentHash hash) {
            super(DistributedEntryRetriever.this, batchSize);
            this.keysNeededToComplete = new ConcurrentHashMap();
            this.identifier = identifier;
            this.hash = hash;
            this.segmentListener = segmentListener;
        }

        @Override
        public CacheEntry<K, C> next() {
            Object entry = super.next();
            Object key = entry.getKey();
            int segment = this.hash.getSegment(key);
            Set keys = (Set)this.keysNeededToComplete.get(segment);
            if (keys != null) {
                keys.remove(key);
                if (keys.isEmpty()) {
                    this.notifyListenerCompletedSegment(segment, true);
                }
            }
            return entry;
        }

        private void notifyListenerCompletedSegment(int segment, boolean fromIterator) {
            if (this.segmentListener != null) {
                if (DistributedEntryRetriever.this.log.isTraceEnabled()) {
                    DistributedEntryRetriever.this.log.tracef("Notifying listener of segment %s being completed for %s", (Object)segment, (Object)this.identifier);
                }
                this.segmentListener.segmentTransferred(segment, fromIterator);
            }
        }

        public void addKeysForSegment(Map<Integer, ConcurrentHashSet<K>> keysForSegment) {
            for (Map.Entry entry : keysForSegment.entrySet()) {
                Set values = entry.getValue();
                if (values.isEmpty()) {
                    if (!this.keysNeededToComplete.containsKey(entry.getKey())) {
                        this.notifyListenerCompletedSegment(entry.getKey(), false);
                        continue;
                    }
                    if (!DistributedEntryRetriever.this.log.isTraceEnabled()) continue;
                    DistributedEntryRetriever.this.log.tracef("No keys found for segment %s, but previous response had keys - so cannot complete segment", (Object)entry.getKey());
                    continue;
                }
                Set prevValues = this.keysNeededToComplete.putIfAbsent(entry.getKey(), values);
                if (prevValues == null) continue;
                for (Object value : values) {
                    prevValues.add(value);
                }
            }
        }

        @Override
        protected void close(CacheException e) {
            super.close(e);
            DistributedEntryRetriever.this.iteratorDetails.remove(this.identifier);
        }

        protected void finalize() throws Throwable {
            super.finalize();
            this.close();
        }
    }

    private static enum LocalStatus {
        RUNNING,
        REPEAT,
        IDLE;

    }

    class IterationStatus<C> {
        final DistributedItr<C> ongoingIterator;
        final EntryRetriever.SegmentListener segmentListener;
        final KeyValueFilter<? super K, ? super V> filter;
        final Converter<? super K, ? super V, ? extends C> converter;
        final Set<Flag> flags;
        final AtomicReferenceArray<Set<K>> processedKeys;
        final AtomicReference<Address> awaitingResponseFrom = new AtomicReference();
        final AtomicReference<LocalStatus> localRunning = new AtomicReference<LocalStatus>(LocalStatus.IDLE);

        public IterationStatus(DistributedItr<C> ongoingIterator, EntryRetriever.SegmentListener segmentListener, KeyValueFilter<? super K, ? super V> filter, Converter<? super K, ? super V, ? extends C> converter, Set<Flag> flags, AtomicReferenceArray<Set<K>> processedKeys) {
            this.ongoingIterator = ongoingIterator;
            this.segmentListener = segmentListener;
            this.filter = filter;
            this.converter = converter;
            this.flags = flags;
            this.processedKeys = processedKeys;
        }
    }
}

