package org.infinispan.persistence.support;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.lang.invoke.MethodHandles;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.transaction.Transaction;
import org.infinispan.commons.api.Lifecycle;
import org.infinispan.commons.persistence.Store;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.persistence.spi.AdvancedCacheExpirationWriter;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.persistence.spi.AdvancedCacheWriter;
import org.infinispan.persistence.spi.CacheLoader;
import org.infinispan.persistence.spi.CacheWriter;
import org.infinispan.persistence.spi.FlagAffectedStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.SegmentedAdvancedLoadWriteStore;
import org.infinispan.persistence.spi.TransactionalCacheWriter;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.web.servlet.tags.form.InputTag;
import org.wildfly.security.password.interfaces.ClearPassword;

/* loaded from: input_file:BOOT-INF/lib/infinispan-core-11.0.4.Final.jar:org/infinispan/persistence/support/NonBlockingStoreAdapter.class */
public class NonBlockingStoreAdapter<K, V> implements NonBlockingStore<K, V> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static final boolean trace = log.isTraceEnabled();
    private static final AtomicInteger id;
    private final Lifecycle oldStoreImpl;
    private final Set<NonBlockingStore.Characteristic> characteristics;
    private BlockingManager blockingManager;
    private MarshallableEntryFactory<K, V> marshallableEntryFactory;

    public NonBlockingStoreAdapter(Lifecycle lifecycle) {
        this.oldStoreImpl = lifecycle;
        this.characteristics = determineCharacteristics(lifecycle);
    }

    public Lifecycle getActualStore() {
        return this.oldStoreImpl;
    }

    private static String nextTraceId(String str) {
        if (id != null) {
            return "StoreAdapter-" + str + "-" + id.getAndIncrement();
        }
        return null;
    }

    private static Set<NonBlockingStore.Characteristic> determineCharacteristics(Object obj) {
        EnumSet noneOf;
        if (obj instanceof SegmentedAdvancedLoadWriteStore) {
            noneOf = EnumSet.of(NonBlockingStore.Characteristic.SEGMENTABLE, NonBlockingStore.Characteristic.EXPIRATION, NonBlockingStore.Characteristic.BULK_READ);
        } else {
            noneOf = EnumSet.noneOf(NonBlockingStore.Characteristic.class);
            if (obj instanceof AdvancedCacheLoader) {
                noneOf.add(NonBlockingStore.Characteristic.BULK_READ);
            } else if (!(obj instanceof CacheLoader)) {
                noneOf.add(NonBlockingStore.Characteristic.WRITE_ONLY);
            }
            if (obj instanceof AdvancedCacheWriter) {
                noneOf.add(NonBlockingStore.Characteristic.EXPIRATION);
            } else if (!(obj instanceof CacheWriter)) {
                noneOf.add(NonBlockingStore.Characteristic.READ_ONLY);
            }
        }
        Store store = (Store) obj.getClass().getAnnotation(Store.class);
        if (store != null && store.shared()) {
            noneOf.add(NonBlockingStore.Characteristic.SHAREABLE);
        }
        if (obj instanceof TransactionalCacheWriter) {
            noneOf.add(NonBlockingStore.Characteristic.TRANSACTIONAL);
        }
        return noneOf;
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> start(InitializationContext initializationContext) {
        this.blockingManager = initializationContext.getBlockingManager();
        this.marshallableEntryFactory = initializationContext.getMarshallableEntryFactory();
        return this.blockingManager.runBlocking(() -> {
            if (isReadOnly()) {
                loader().init(initializationContext);
            } else {
                writer().init(initializationContext);
            }
            this.oldStoreImpl.start();
        }, nextTraceId(org.apache.catalina.Lifecycle.START_EVENT));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> stop() {
        BlockingManager blockingManager = this.blockingManager;
        Lifecycle lifecycle = this.oldStoreImpl;
        Objects.requireNonNull(lifecycle);
        return blockingManager.runBlocking(lifecycle::stop, nextTraceId(org.apache.catalina.Lifecycle.STOP_EVENT));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public Set<NonBlockingStore.Characteristic> characteristics() {
        return this.characteristics;
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Long> size(IntSet intSet) {
        return this.blockingManager.supplyBlocking(() -> {
            return Integer.valueOf(isSegmented() ? segmentedStore().size(intSet) : advancedLoader().size());
        }, nextTraceId(InputTag.SIZE_ATTRIBUTE)).thenApply((v0) -> {
            return v0.longValue();
        });
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Long> approximateSize(IntSet intSet) {
        return size(intSet);
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public Publisher<MarshallableEntry<K, V>> publishEntries(IntSet intSet, Predicate<? super K> predicate, boolean z) {
        return this.blockingManager.blockingPublisher(isSegmented() ? segmentedStore().entryPublisher(intSet, predicate, z, true) : advancedLoader().entryPublisher(predicate, z, true));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public Publisher<K> publishKeys(IntSet intSet, Predicate<? super K> predicate) {
        return this.blockingManager.blockingPublisher(isSegmented() ? segmentedStore().publishKeys(intSet, predicate) : advancedLoader().publishKeys(predicate));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public Publisher<MarshallableEntry<K, V>> purgeExpired() {
        return Flowable.defer(() -> {
            final UnicastProcessor create = UnicastProcessor.create();
            AdvancedCacheExpirationWriter.ExpirationPurgeListener<K, V> expirationPurgeListener = new AdvancedCacheExpirationWriter.ExpirationPurgeListener<K, V>() { // from class: org.infinispan.persistence.support.NonBlockingStoreAdapter.1
                @Override // org.infinispan.persistence.spi.AdvancedCacheExpirationWriter.ExpirationPurgeListener
                public void marshalledEntryPurged(MarshallableEntry<K, V> marshallableEntry) {
                    create.onNext(marshallableEntry);
                }

                @Override // org.infinispan.persistence.spi.AdvancedCacheWriter.PurgeListener
                public void entryPurged(K k) {
                    create.onNext(NonBlockingStoreAdapter.this.marshallableEntryFactory.create(k));
                }
            };
            AdvancedCacheWriter<K, V> advancedWriter = advancedWriter();
            (advancedWriter instanceof AdvancedCacheExpirationWriter ? this.blockingManager.runBlocking(() -> {
                ((AdvancedCacheExpirationWriter) advancedWriter).purge((v0) -> {
                    v0.run();
                }, expirationPurgeListener);
            }, nextTraceId("purgeExpired")) : this.blockingManager.runBlocking(() -> {
                advancedWriter.purge((v0) -> {
                    v0.run();
                }, expirationPurgeListener);
            }, nextTraceId("purgeExpired"))).whenComplete((r4, th) -> {
                if (th != null) {
                    create.onError(th);
                } else {
                    create.onComplete();
                }
            });
            return create;
        });
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Boolean> isAvailable() {
        return this.blockingManager.supplyBlocking(() -> {
            return Boolean.valueOf(isReadOnly() ? loader().isAvailable() : writer().isAvailable());
        }, nextTraceId("isAvailable"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<MarshallableEntry<K, V>> load(int i, Object obj) {
        return this.blockingManager.supplyBlocking(() -> {
            return isSegmented() ? segmentedStore().get(i, obj) : loader().loadEntry(obj);
        }, nextTraceId("load"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Boolean> containsKey(int i, Object obj) {
        return this.blockingManager.supplyBlocking(() -> {
            return Boolean.valueOf(isSegmented() ? segmentedStore().contains(i, obj) : loader().contains(obj));
        }, nextTraceId("containsKey"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> write(int i, MarshallableEntry<? extends K, ? extends V> marshallableEntry) {
        return this.blockingManager.runBlocking(() -> {
            if (isSegmented()) {
                segmentedStore().write(i, marshallableEntry);
            } else {
                writer().write(marshallableEntry);
            }
        }, nextTraceId("write"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Boolean> delete(int i, Object obj) {
        return this.blockingManager.supplyBlocking(() -> {
            return Boolean.valueOf(isSegmented() ? segmentedStore().delete(i, obj) : writer().delete(obj));
        }, nextTraceId("delete"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> addSegments(IntSet intSet) {
        return this.blockingManager.runBlocking(() -> {
            segmentedStore().addSegments(intSet);
        }, nextTraceId("addSegments"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> removeSegments(IntSet intSet) {
        return this.blockingManager.runBlocking(() -> {
            segmentedStore().removeSegments(intSet);
        }, nextTraceId("removeSegments"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> clear() {
        if (!(this.oldStoreImpl instanceof AdvancedCacheWriter)) {
            return CompletableFutures.completedNull();
        }
        BlockingManager blockingManager = this.blockingManager;
        AdvancedCacheWriter<K, V> advancedWriter = advancedWriter();
        Objects.requireNonNull(advancedWriter);
        return blockingManager.runBlocking(advancedWriter::clear, nextTraceId(ClearPassword.ALGORITHM_CLEAR));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> batch(int i, Publisher<NonBlockingStore.SegmentedPublisher<Object>> publisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> publisher2) {
        Flowable flatMap = Flowable.fromPublisher(publisher).flatMap(RxJavaInterop.identityFunction(), false, i);
        Flowable flatMap2 = Flowable.fromPublisher(publisher2).flatMap(RxJavaInterop.identityFunction(), false, i);
        return this.blockingManager.supplyBlocking(() -> {
            Single collect = flatMap.collect(Collectors.toSet());
            CacheWriter<K, V> writer = writer();
            Objects.requireNonNull(writer);
            collect.subscribe((v1) -> {
                r1.deleteBatch(v1);
            });
            return writer().bulkUpdate(flatMap2);
        }, nextTraceId("batch-update")).thenCompose(Function.identity());
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> prepareWithModifications(Transaction transaction, int i, Publisher<NonBlockingStore.SegmentedPublisher<Object>> publisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> publisher2) {
        HashSet hashSet = new HashSet();
        BatchModification batchModification = new BatchModification(hashSet);
        Flowable.fromPublisher(publisher).subscribe(segmentedPublisher -> {
            Flowable.fromPublisher(segmentedPublisher).subscribe(obj -> {
                hashSet.add(obj);
                batchModification.removeEntry(obj);
            });
        });
        Flowable.fromPublisher(publisher2).subscribe(segmentedPublisher2 -> {
            Flowable.fromPublisher(segmentedPublisher2).subscribe(marshallableEntry -> {
                Object key = marshallableEntry.getKey();
                hashSet.add(key);
                batchModification.addMarshalledEntry(key, marshallableEntry);
            });
        });
        return this.blockingManager.runBlocking(() -> {
            transactionalStore().prepareWithModifications(transaction, batchModification);
        }, nextTraceId("prepareWithModifications"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> commit(Transaction transaction) {
        return this.blockingManager.runBlocking(() -> {
            transactionalStore().commit(transaction);
        }, nextTraceId("commit"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> rollback(Transaction transaction) {
        return this.blockingManager.runBlocking(() -> {
            transactionalStore().rollback(transaction);
        }, nextTraceId("rollback"));
    }

    @Override // org.infinispan.persistence.spi.NonBlockingStore
    public boolean ignoreCommandWithFlags(long j) {
        return (this.oldStoreImpl instanceof FlagAffectedStore) && !((FlagAffectedStore) this.oldStoreImpl).shouldWrite(j);
    }

    boolean isSegmented() {
        return this.characteristics.contains(NonBlockingStore.Characteristic.SEGMENTABLE);
    }

    boolean isReadOnly() {
        return this.characteristics.contains(NonBlockingStore.Characteristic.READ_ONLY);
    }

    public TransactionalCacheWriter<K, V> transactionalStore() {
        return (TransactionalCacheWriter) this.oldStoreImpl;
    }

    public SegmentedAdvancedLoadWriteStore<K, V> segmentedStore() {
        return (SegmentedAdvancedLoadWriteStore) this.oldStoreImpl;
    }

    public AdvancedCacheLoader<K, V> advancedLoader() {
        return (AdvancedCacheLoader) this.oldStoreImpl;
    }

    public AdvancedCacheWriter<K, V> advancedWriter() {
        return (AdvancedCacheWriter) this.oldStoreImpl;
    }

    public CacheLoader<K, V> loader() {
        return (CacheLoader) this.oldStoreImpl;
    }

    public CacheWriter<K, V> writer() {
        return (CacheWriter) this.oldStoreImpl;
    }

    static {
        id = trace ? new AtomicInteger() : null;
    }
}
