package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:BOOT-INF/lib/infinispan-core-14.0.6.Final.jar:org/infinispan/reactive/publisher/impl/PartitionAwareClusterPublisherManager.class */
public class PartitionAwareClusterPublisherManager<K, V> extends ClusterPublisherManagerImpl<K, V> {

    @Inject
    protected ComponentRef<Cache<?, ?>> cache;
    volatile AvailabilityMode currentMode = AvailabilityMode.AVAILABLE;
    protected final PartitionAwareClusterPublisherManager<K, V>.PartitionListener listener = new PartitionListener();
    private final Set<AtomicBoolean> pendingOperations = ConcurrentHashMap.newKeySet();

    @Listener
    /* loaded from: input_file:BOOT-INF/lib/infinispan-core-14.0.6.Final.jar:org/infinispan/reactive/publisher/impl/PartitionAwareClusterPublisherManager$PartitionListener.class */
    private class PartitionListener {
        private PartitionListener() {
        }

        @PartitionStatusChanged
        public void onPartitionChange(PartitionStatusChangedEvent<K, ?> partitionStatusChangedEvent) {
            if (partitionStatusChangedEvent.isPre()) {
                return;
            }
            AvailabilityMode availabilityMode = partitionStatusChangedEvent.getAvailabilityMode();
            PartitionAwareClusterPublisherManager.this.currentMode = availabilityMode;
            if (availabilityMode == AvailabilityMode.DEGRADED_MODE) {
                PartitionAwareClusterPublisherManager.this.pendingOperations.forEach(atomicBoolean -> {
                    atomicBoolean.set(true);
                });
            }
        }
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl
    public void start() {
        super.start();
        this.cache.running().addListener(this.listener);
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> CompletionStage<R> keyReduction(boolean z, IntSet intSet, Set<K> set, InvocationContext invocationContext, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        checkPartitionStatus();
        return registerStage(super.keyReduction(z, intSet, set, invocationContext, j, deliveryGuarantee, function, function2));
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> CompletionStage<R> entryReduction(boolean z, IntSet intSet, Set<K> set, InvocationContext invocationContext, long j, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> function, Function<? super Publisher<R>, ? extends CompletionStage<R>> function2) {
        checkPartitionStatus();
        return registerStage(super.entryReduction(z, intSet, set, invocationContext, j, deliveryGuarantee, function, function2));
    }

    private <R> CompletionStage<R> registerStage(CompletionStage<R> completionStage) {
        AtomicBoolean registerOperation = registerOperation();
        return (CompletionStage<R>) completionStage.handle((obj, th) -> {
            this.pendingOperations.remove(registerOperation);
            if (registerOperation.get()) {
                throw Log.CLUSTER.partitionDegraded();
            }
            CompletableFutures.rethrowExceptionIfPresent(th);
            return obj;
        });
    }

    private AtomicBoolean registerOperation() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.pendingOperations.add(atomicBoolean);
        if (isPartitionDegraded()) {
            atomicBoolean.set(true);
        }
        return atomicBoolean;
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> SegmentPublisherSupplier<R> keyPublisher(IntSet intSet, Set<K> set, InvocationContext invocationContext, long j, DeliveryGuarantee deliveryGuarantee, int i, Function<? super Publisher<K>, ? extends Publisher<R>> function) {
        checkPartitionStatus();
        return registerPublisher(super.keyPublisher(intSet, set, invocationContext, j, deliveryGuarantee, i, function));
    }

    @Override // org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl, org.infinispan.reactive.publisher.impl.ClusterPublisherManager
    public <R> SegmentPublisherSupplier<R> entryPublisher(IntSet intSet, Set<K> set, InvocationContext invocationContext, long j, DeliveryGuarantee deliveryGuarantee, int i, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> function) {
        checkPartitionStatus();
        return registerPublisher(super.entryPublisher(intSet, set, invocationContext, j, deliveryGuarantee, i, function));
    }

    private <R> SegmentPublisherSupplier<R> registerPublisher(final SegmentPublisherSupplier<R> segmentPublisherSupplier) {
        return new SegmentPublisherSupplier<R>() { // from class: org.infinispan.reactive.publisher.impl.PartitionAwareClusterPublisherManager.1
            @Override // org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier
            public Publisher<SegmentPublisherSupplier.Notification<R>> publisherWithSegments() {
                return handleEarlyTermination((v0) -> {
                    return v0.publisherWithSegments();
                });
            }

            @Override // org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier
            public Publisher<R> publisherWithoutSegments() {
                return handleEarlyTermination((v0) -> {
                    return v0.publisherWithoutSegments();
                });
            }

            private <S> Flowable<S> handleEarlyTermination(Function<SegmentPublisherSupplier<R>, Publisher<S>> function) {
                AtomicBoolean registerOperation = PartitionAwareClusterPublisherManager.this.registerOperation();
                return Flowable.fromPublisher(function.apply(segmentPublisherSupplier)).doOnNext(obj -> {
                    PartitionAwareClusterPublisherManager.this.checkPendingOperation(registerOperation);
                }).doOnComplete(() -> {
                    PartitionAwareClusterPublisherManager.this.checkPendingOperation(registerOperation);
                }).doFinally(() -> {
                    PartitionAwareClusterPublisherManager.this.pendingOperations.remove(registerOperation);
                });
            }
        };
    }

    private void checkPendingOperation(AtomicBoolean atomicBoolean) {
        if (atomicBoolean.get()) {
            throw Log.CLUSTER.partitionDegraded();
        }
    }

    private void checkPartitionStatus() {
        if (isPartitionDegraded()) {
            throw Log.CLUSTER.partitionDegraded();
        }
    }

    private boolean isPartitionDegraded() {
        return this.currentMode != AvailabilityMode.AVAILABLE;
    }
}
