package org.wildfly.clustering.cache.infinispan.embedded.affinity;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.infinispan.Cache;
import org.infinispan.affinity.KeyAffinityService;
import org.infinispan.affinity.KeyGenerator;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.BlockingManager;
import org.jboss.logging.Logger;
import org.wildfly.clustering.cache.infinispan.embedded.distribution.KeyDistribution;

@Listener(observation = Listener.Observation.POST)
/* loaded from: input_file:org/wildfly/clustering/cache/infinispan/embedded/affinity/DefaultKeyAffinityService.class */
public class DefaultKeyAffinityService<K> implements KeyAffinityService<K>, Supplier<BlockingQueue<K>> {
    static final int DEFAULT_QUEUE_SIZE = 100;
    private final Cache<? extends K, ?> cache;
    private final KeyGenerator<? extends K> generator;
    private final AtomicReference<KeyAffinityState<K>> currentState;
    private final Predicate<Address> filter;
    private final Executor executor;
    private final BiFunction<Cache<?, ?>, ConsistentHash, KeyDistribution> distributionFactory;
    private final Function<Cache<?, ?>, ConsistentHash> currentConsistentHash;
    private volatile int queueSize;
    private volatile Duration timeout;
    private volatile boolean started;
    private static final Function<Cache<?, ?>, ConsistentHash> CURRENT_CONSISTENT_HASH = cache -> {
        return cache.getAdvancedCache().getDistributionManager().getCacheTopology().getWriteConsistentHash();
    };
    private static final BiFunction<Cache<?, ?>, ConsistentHash, KeyDistribution> KEY_DISTRIBUTION_FACTORY = KeyDistribution::forConsistentHash;
    private static final Logger LOGGER = Logger.getLogger(DefaultKeyAffinityService.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wildfly/clustering/cache/infinispan/embedded/affinity/DefaultKeyAffinityService$GenerateKeysTask.class */
    public static class GenerateKeysTask<K> implements Runnable {
        private final KeyGenerator<? extends K> generator;
        private final KeyDistribution distribution;
        private final Address address;
        private final BlockingQueue<K> keys;

        GenerateKeysTask(KeyGenerator<? extends K> keyGenerator, KeyDistribution keyDistribution, Address address, BlockingQueue<K> blockingQueue) {
            this.generator = keyGenerator;
            this.distribution = keyDistribution;
            this.address = address;
            this.keys = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                Object key = this.generator.getKey();
                if (this.distribution.getPrimaryOwner(key).equals(this.address)) {
                    try {
                        this.keys.put(key);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wildfly/clustering/cache/infinispan/embedded/affinity/DefaultKeyAffinityService$KeyAffinityState.class */
    public interface KeyAffinityState<K> {
        KeyDistribution getDistribution();

        KeyRegistry<K> getRegistry();

        Iterable<Future<?>> getFutures();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultKeyAffinityService(Cache<? extends K, ?> cache, KeyGenerator<? extends K> keyGenerator, Predicate<Address> predicate) {
        this(cache, keyGenerator, predicate, ((BlockingManager) cache.getCacheManager().getGlobalComponentRegistry().getComponent(BlockingManager.class)).asExecutor(DefaultKeyAffinityService.class.getSimpleName()), CURRENT_CONSISTENT_HASH, KEY_DISTRIBUTION_FACTORY);
    }

    DefaultKeyAffinityService(Cache<? extends K, ?> cache, KeyGenerator<? extends K> keyGenerator, Predicate<Address> predicate, Executor executor, Function<Cache<?, ?>, ConsistentHash> function, BiFunction<Cache<?, ?>, ConsistentHash, KeyDistribution> biFunction) {
        this.currentState = new AtomicReference<>();
        this.queueSize = DEFAULT_QUEUE_SIZE;
        this.timeout = Duration.ofMillis(100L);
        this.started = false;
        this.cache = cache;
        this.generator = keyGenerator;
        this.filter = predicate;
        this.executor = executor;
        this.currentConsistentHash = function;
        this.distributionFactory = biFunction;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public void setPollTimeout(Duration duration) {
        this.timeout = duration;
    }

    @Override // java.util.function.Supplier
    public BlockingQueue<K> get() {
        return new ArrayBlockingQueue(this.queueSize);
    }

    public boolean isStarted() {
        return this.started;
    }

    public void start() {
        accept(this.currentConsistentHash.apply(this.cache));
        this.cache.addListener(this);
        this.started = true;
    }

    public void stop() {
        this.started = false;
        this.cache.removeListener(this);
    }

    public K getCollocatedKey(K k) {
        KeyAffinityState<K> keyAffinityState = this.currentState.get();
        if (keyAffinityState == null) {
            throw new IllegalStateException();
        }
        return getCollocatedKey(keyAffinityState, k);
    }

    private K getCollocatedKey(KeyAffinityState<K> keyAffinityState, K k) {
        K poll = poll(keyAffinityState.getRegistry(), keyAffinityState.getDistribution().getPrimaryOwner(k));
        if (poll != null) {
            return poll;
        }
        KeyAffinityState<K> keyAffinityState2 = this.currentState.get();
        if (keyAffinityState != keyAffinityState2) {
            return getCollocatedKey(keyAffinityState2, k);
        }
        LOGGER.debugf("Could not obtain pre-generated key with same affinity as %s -- generating random key", k);
        return (K) this.generator.getKey();
    }

    public K getKeyForAddress(Address address) {
        if (!this.filter.test(address)) {
            throw new IllegalArgumentException(address.toString());
        }
        KeyAffinityState<K> keyAffinityState = this.currentState.get();
        if (keyAffinityState == null) {
            throw new IllegalStateException();
        }
        return getKeyForAddress(keyAffinityState, address);
    }

    private K getKeyForAddress(KeyAffinityState<K> keyAffinityState, Address address) {
        K poll = poll(keyAffinityState.getRegistry(), address);
        if (poll != null) {
            return poll;
        }
        KeyAffinityState<K> keyAffinityState2 = this.currentState.get();
        if (keyAffinityState != keyAffinityState2) {
            return getKeyForAddress(keyAffinityState2, address);
        }
        LOGGER.debugf("Could not obtain pre-generated key with affinity for %s -- generating random key", address);
        return (K) this.generator.getKey();
    }

    private K poll(KeyRegistry<K> keyRegistry, Address address) {
        BlockingQueue<K> keys = keyRegistry.getKeys(address);
        if (keys == null) {
            return null;
        }
        Duration duration = this.timeout;
        try {
            return keys.poll(duration.getSeconds() == 0 ? duration.getNano() : duration.toNanos(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    @TopologyChanged
    public CompletionStage<Void> topologyChanged(TopologyChangedEvent<?, ?> topologyChangedEvent) {
        if (!getSegments(topologyChangedEvent.getWriteConsistentHashAtStart()).equals(getSegments(topologyChangedEvent.getWriteConsistentHashAtEnd()))) {
            LOGGER.debugf("Restarting key generation based on new consistent hash for topology %d", topologyChangedEvent.getNewTopologyId());
            accept(topologyChangedEvent.getWriteConsistentHashAtEnd());
        }
        return CompletableFuture.completedStage(null);
    }

    private Map<Address, Set<Integer>> getSegments(ConsistentHash consistentHash) {
        TreeMap treeMap = new TreeMap();
        for (Address address : consistentHash.getMembers()) {
            if (this.filter.test(address)) {
                treeMap.put(address, consistentHash.getPrimarySegmentsForOwner(address));
            }
        }
        return treeMap;
    }

    private void accept(ConsistentHash consistentHash) {
        final KeyDistribution apply = this.distributionFactory.apply(this.cache, consistentHash);
        final ConsistentHashKeyRegistry consistentHashKeyRegistry = new ConsistentHashKeyRegistry(consistentHash, this.filter, this);
        Set<Address> addresses = consistentHashKeyRegistry.getAddresses();
        final List arrayList = !addresses.isEmpty() ? new ArrayList(addresses.size()) : Collections.emptyList();
        try {
            for (Address address : addresses) {
                arrayList.add(CompletableFuture.runAsync(new GenerateKeysTask(this.generator, apply, address, consistentHashKeyRegistry.getKeys(address)), this.executor));
            }
            KeyAffinityState<K> andSet = this.currentState.getAndSet(new KeyAffinityState<K>() { // from class: org.wildfly.clustering.cache.infinispan.embedded.affinity.DefaultKeyAffinityService.1
                @Override // org.wildfly.clustering.cache.infinispan.embedded.affinity.DefaultKeyAffinityService.KeyAffinityState
                public KeyDistribution getDistribution() {
                    return apply;
                }

                @Override // org.wildfly.clustering.cache.infinispan.embedded.affinity.DefaultKeyAffinityService.KeyAffinityState
                public KeyRegistry<K> getRegistry() {
                    return consistentHashKeyRegistry;
                }

                @Override // org.wildfly.clustering.cache.infinispan.embedded.affinity.DefaultKeyAffinityService.KeyAffinityState
                public Iterable<Future<?>> getFutures() {
                    return arrayList;
                }
            });
            if (andSet != null) {
                Iterator<Future<?>> it = andSet.getFutures().iterator();
                while (it.hasNext()) {
                    it.next().cancel(true);
                }
            }
        } catch (RejectedExecutionException e) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).cancel(true);
            }
        }
    }
}
