/*
 * Decompiled with CFR 0.152.
 */
package org.wildfly.clustering.server.infinispan.scheduler;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.infinispan.Cache;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.BlockingManager;
import org.jboss.logging.Logger;
import org.wildfly.clustering.cache.Key;
import org.wildfly.clustering.cache.infinispan.embedded.distribution.Locality;
import org.wildfly.clustering.cache.infinispan.embedded.listener.ListenerRegistrar;
import org.wildfly.clustering.cache.infinispan.embedded.listener.ListenerRegistration;
import org.wildfly.clustering.context.DefaultThreadFactory;
import org.wildfly.clustering.server.infinispan.scheduler.CacheEntryScheduler;

@Listener
public class SchedulerTopologyChangeListener<I, K extends Key<I>, V>
implements ListenerRegistrar {
    private static final Logger LOGGER = Logger.getLogger(SchedulerTopologyChangeListener.class);
    private static final ThreadFactory THREAD_FACTORY = new DefaultThreadFactory(SchedulerTopologyChangeListener.class, AccessController.doPrivileged(new PrivilegedAction<ClassLoader>(){

        @Override
        public ClassLoader run() {
            return SchedulerTopologyChangeListener.class.getClassLoader();
        }
    }));
    private final Cache<K, V> cache;
    private final ExecutorService executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
    private final AtomicReference<Future<?>> scheduleTaskFuture = new AtomicReference();
    private final Consumer<Locality> cancelTask;
    private final BiConsumer<Locality, Locality> scheduleTask;
    private final BlockingManager blocking;

    public SchedulerTopologyChangeListener(Cache<K, V> cache, CacheEntryScheduler<I, ?> scheduler, BiConsumer<Locality, Locality> scheduleTask) {
        this(cache, scheduler::cancel, scheduleTask);
    }

    public SchedulerTopologyChangeListener(Cache<K, V> cache, Consumer<Locality> cancelTask, BiConsumer<Locality, Locality> scheduleTask) {
        this.cache = cache;
        this.cancelTask = cancelTask;
        this.scheduleTask = scheduleTask;
        this.blocking = (BlockingManager)GlobalComponentRegistry.componentOf((EmbeddedCacheManager)this.cache.getCacheManager(), BlockingManager.class);
    }

    public ListenerRegistration register() {
        this.cache.addListener((Object)this);
        return () -> {
            this.cache.removeListener((Object)this);
            LOGGER.debugf("Shutting down thread pool for %s scheduler topology change listener", (Object)this.cache.getName());
            this.executor.shutdownNow();
            try {
                LOGGER.debugf("Awaiting task termination for %s scheduler topology change listener", (Object)this.cache.getName());
                this.executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            LOGGER.debugf("%s scheduler topology change listener shutdown complete", (Object)this.cache.getName());
        };
    }

    @TopologyChanged
    public CompletionStage<Void> topologyChanged(TopologyChangedEvent<K, V> event) {
        Cache cache = event.getCache();
        Address address = cache.getCacheManager().getAddress();
        ConsistentHash oldHash = event.getWriteConsistentHashAtStart();
        Set oldSegments = oldHash.getMembers().contains(address) ? oldHash.getPrimarySegmentsForOwner(address) : Collections.emptySet();
        ConsistentHash newHash = event.getWriteConsistentHashAtEnd();
        Set newSegments = newHash.getMembers().contains(address) ? newHash.getPrimarySegmentsForOwner(address) : Collections.emptySet();
        LOGGER.debugf("%s scheduler topology change listener received %s-topology changed event: %s -> %s", new Object[]{cache.getName(), event.isPre() ? "pre" : "post", oldHash.getMembers(), newHash.getMembers()});
        if (event.isPre()) {
            if (!newSegments.containsAll(oldSegments)) {
                Future future = this.scheduleTaskFuture.getAndSet(null);
                if (future != null) {
                    future.cancel(true);
                }
                return this.blocking.runBlocking(() -> this.cancelTask.accept(Locality.forConsistentHash((Cache)cache, (ConsistentHash)newHash)), (Object)this.getClass().getName());
            }
        } else if (!oldSegments.containsAll(newSegments)) {
            Locality oldLocality = Locality.forConsistentHash((Cache)cache, (ConsistentHash)oldHash);
            Locality newLocality = Locality.forConsistentHash((Cache)cache, (ConsistentHash)newHash);
            try {
                Future<?> future = this.scheduleTaskFuture.getAndSet(this.executor.submit(() -> this.scheduleTask.accept(oldLocality, newLocality)));
                if (future != null) {
                    future.cancel(true);
                }
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }
        return CompletableFuture.completedStage(null);
    }
}

