package org.infinispan.notifications.cachelistener;

import io.reactivex.rxjava3.core.Flowable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.infinispan.commands.SegmentSpecificCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-14.0.5.Final.jar:org/infinispan/notifications/cachelistener/BaseQueueingSegmentListener.class */
abstract class BaseQueueingSegmentListener<K, V, E extends Event<K, V>> implements QueueingSegmentListener<K, V, E> {
    protected final AtomicBoolean completed = new AtomicBoolean(false);
    protected final AtomicReferenceArray<ConcurrentMap<K, Object>> notifiedKeys;
    protected final KeyPartitioner keyPartitioner;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseQueueingSegmentListener(int i, KeyPartitioner keyPartitioner) {
        this.notifiedKeys = new AtomicReferenceArray<>(i);
        this.keyPartitioner = keyPartitioner;
        for (int i2 = 0; i2 < i; i2++) {
            this.notifiedKeys.set(i2, new ConcurrentHashMap());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int segmentFromEventWrapper(EventWrapper<K, V, E> eventWrapper) {
        return SegmentSpecificCommand.extractSegment(eventWrapper.getCommand(), eventWrapper.getKey(), this.keyPartitioner);
    }

    @Override // org.infinispan.notifications.cachelistener.QueueingSegmentListener, io.reactivex.rxjava3.functions.Function
    public Publisher<CacheEntry<K, V>> apply(SegmentPublisherSupplier.Notification<CacheEntry<K, V>> notification) throws Throwable {
        if (notification.isSegmentComplete()) {
            return segmentComplete(notification.completedSegment());
        }
        int valueSegment = notification.valueSegment();
        CacheEntry<K, V> value = notification.value();
        K key = value.getKey();
        Object put = this.notifiedKeys.get(valueSegment).put(key, NOTIFIED);
        if (put == null) {
            return Flowable.just(value);
        }
        if (getLog().isTraceEnabled()) {
            getLog().tracef("Processing key %s as a concurrent update occurred with value %s", key, put);
        }
        return put != QueueingSegmentListener.REMOVED ? Flowable.just((CacheEntry) put) : Flowable.empty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flowable<CacheEntry<K, V>> segmentComplete(int i) {
        ConcurrentMap<K, Object> concurrentMap = this.notifiedKeys.get(i);
        synchronized (concurrentMap) {
            this.notifiedKeys.set(i, null);
        }
        return Flowable.fromIterable(concurrentMap.entrySet()).filter(entry -> {
            return concurrentMap.remove(entry.getKey()) != null;
        }).map(RxJavaInterop.entryToValueFunction()).filter(obj -> {
            return (obj == NOTIFIED || obj == REMOVED) ? false : true;
        }).map(obj2 -> {
            return (CacheEntry) obj2;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean addEvent(K k, int i, Object obj) {
        ConcurrentMap<K, Object> concurrentMap = this.notifiedKeys.get(i);
        if (concurrentMap == null) {
            return false;
        }
        synchronized (concurrentMap) {
            if (this.notifiedKeys.get(i) == concurrentMap) {
                return concurrentMap.compute(k, (obj2, obj3) -> {
                    return obj3 == NOTIFIED ? obj3 : obj;
                }) == obj;
            }
            return false;
        }
    }

    protected abstract Log getLog();
}
