package org.wildfly.clustering.server.infinispan.scheduler;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.BlockingManager;
import org.jboss.logging.Logger;
import org.wildfly.clustering.cache.infinispan.embedded.distribution.CacheStreamFilter;
import org.wildfly.clustering.cache.infinispan.embedded.listener.ListenerRegistrar;
import org.wildfly.clustering.cache.infinispan.embedded.listener.ListenerRegistration;
import org.wildfly.clustering.context.DefaultThreadFactory;

@Listener
/* loaded from: input_file:org/wildfly/clustering/server/infinispan/scheduler/SchedulerTopologyChangeListener.class */
public class SchedulerTopologyChangeListener<K, V, SE, CE> implements ListenerRegistrar {
    private static final Logger LOGGER = Logger.getLogger(SchedulerTopologyChangeListener.class);
    private static final ThreadFactory THREAD_FACTORY = new DefaultThreadFactory(SchedulerTopologyChangeListener.class, (ClassLoader) AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { // from class: org.wildfly.clustering.server.infinispan.scheduler.SchedulerTopologyChangeListener.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.security.PrivilegedAction
        public ClassLoader run() {
            return SchedulerTopologyChangeListener.class.getClassLoader();
        }
    }));
    private final Cache<K, V> cache;
    private final ExecutorService executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
    private final AtomicReference<Future<?>> scheduleTaskFuture = new AtomicReference<>();
    private final Consumer<CacheStreamFilter<SE>> scheduleTask;
    private final Consumer<CacheStreamFilter<CE>> cancelTask;
    private final BlockingManager blocking;

    public SchedulerTopologyChangeListener(Cache<K, V> cache, Consumer<CacheStreamFilter<SE>> consumer, Consumer<CacheStreamFilter<CE>> consumer2) {
        this.cache = cache;
        this.scheduleTask = consumer;
        this.cancelTask = consumer2;
        this.blocking = (BlockingManager) GlobalComponentRegistry.componentOf(this.cache.getCacheManager(), BlockingManager.class);
    }

    public ListenerRegistration register() {
        this.cache.addListener(this);
        return () -> {
            this.cache.removeListener(this);
            LOGGER.debugf("Shutting down thread pool for %s scheduler topology change listener", this.cache.getName());
            this.executor.shutdownNow();
            try {
                LOGGER.debugf("Awaiting task termination for %s scheduler topology change listener", this.cache.getName());
                this.executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            LOGGER.debugf("%s scheduler topology change listener shutdown complete", this.cache.getName());
        };
    }

    @TopologyChanged
    public CompletionStage<Void> topologyChanged(TopologyChangedEvent<K, V> topologyChangedEvent) {
        Future<?> andSet;
        Cache cache = topologyChangedEvent.getCache();
        Address address = cache.getCacheManager().getAddress();
        ConsistentHash writeConsistentHashAtStart = topologyChangedEvent.getWriteConsistentHashAtStart();
        Set primarySegmentsForOwner = writeConsistentHashAtStart.getMembers().contains(address) ? writeConsistentHashAtStart.getPrimarySegmentsForOwner(address) : Collections.emptySet();
        ConsistentHash writeConsistentHashAtEnd = topologyChangedEvent.getWriteConsistentHashAtEnd();
        Set primarySegmentsForOwner2 = writeConsistentHashAtEnd.getMembers().contains(address) ? writeConsistentHashAtEnd.getPrimarySegmentsForOwner(address) : Collections.emptySet();
        Logger logger = LOGGER;
        Object[] objArr = new Object[4];
        objArr[0] = cache.getName();
        objArr[1] = topologyChangedEvent.isPre() ? "pre" : "post";
        objArr[2] = writeConsistentHashAtStart.getMembers();
        objArr[3] = writeConsistentHashAtEnd.getMembers();
        logger.debugf("%s scheduler topology change listener received %s-topology changed event: %s -> %s", objArr);
        if (topologyChangedEvent.isPre()) {
            if (!primarySegmentsForOwner.isEmpty()) {
                IntSet mutableCopyFrom = IntSets.mutableCopyFrom(primarySegmentsForOwner);
                mutableCopyFrom.removeAll(IntSets.from(primarySegmentsForOwner2));
                if (!mutableCopyFrom.isEmpty()) {
                    Future<?> andSet2 = this.scheduleTaskFuture.getAndSet(null);
                    if (andSet2 != null) {
                        andSet2.cancel(true);
                    }
                    return this.blocking.runBlocking(() -> {
                        this.cancelTask.accept(CacheStreamFilter.segments(mutableCopyFrom));
                    }, getClass().getName());
                }
            }
        } else if (!primarySegmentsForOwner2.isEmpty()) {
            IntSet mutableCopyFrom2 = IntSets.mutableCopyFrom(primarySegmentsForOwner2);
            mutableCopyFrom2.removeAll(IntSets.from(primarySegmentsForOwner));
            if (!mutableCopyFrom2.isEmpty() && (andSet = this.scheduleTaskFuture.getAndSet(this.executor.submit(() -> {
                this.scheduleTask.accept(CacheStreamFilter.segments(mutableCopyFrom2));
            }, getClass().getName()))) != null) {
                andSet.cancel(true);
            }
        }
        return CompletableFuture.completedStage(null);
    }
}
