/*
 * Decompiled with CFR 0.152.
 */
package org.wildfly.clustering.cache.infinispan.persistence.remote;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.CompletableTransformer;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import jakarta.transaction.Transaction;
import jakarta.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheContainer;
import org.infinispan.client.hotrod.impl.InternalRemoteCache;
import org.infinispan.client.hotrod.transaction.manager.RemoteTransactionManager;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.marshall.persistence.PersistenceMarshaller;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.spi.MarshalledValue;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.util.concurrent.BlockingManager;
import org.reactivestreams.Publisher;
import org.wildfly.clustering.cache.batch.Batch;
import org.wildfly.clustering.cache.batch.SuspendedBatch;
import org.wildfly.clustering.cache.infinispan.batch.TransactionalBatchFactory;
import org.wildfly.clustering.cache.infinispan.persistence.remote.RemoteCacheStoreConfiguration;
import org.wildfly.clustering.cache.infinispan.remote.ReadForUpdateRemoteCache;
import org.wildfly.clustering.context.Context;
import org.wildfly.clustering.function.Consumer;
import org.wildfly.clustering.function.UnaryOperator;

@ConfiguredBy(value=RemoteCacheStoreConfiguration.class)
public class RemoteCacheStore<K, V>
implements NonBlockingStore<K, V> {
    private volatile RemoteCacheContainer container;
    private volatile AtomicReferenceArray<RemoteCache<K, MarshalledValue>> caches;
    private volatile BlockingManager blockingManager;
    private volatile Executor executor;
    private volatile PersistenceMarshaller marshaller;
    private volatile MarshallableEntryFactory<K, V> entryFactory;
    private volatile Function<Map.Entry<K, MetadataValue<MarshalledValue>>, MarshallableEntry<K, V>> entryMapper;
    private volatile int batchSize;
    private volatile String cacheName;
    private volatile int segments;
    private volatile UnaryOperator<RemoteCache<K, MarshalledValue>> cacheTransformer;
    private volatile org.wildfly.clustering.function.Supplier<Batch> batchFactory;
    private final Map<Transaction, SuspendedBatch> transactions = new ConcurrentHashMap<Transaction, SuspendedBatch>();

    public Set<NonBlockingStore.Characteristic> characteristics() {
        return EnumSet.of(NonBlockingStore.Characteristic.BULK_READ, NonBlockingStore.Characteristic.EXPIRATION, NonBlockingStore.Characteristic.SEGMENTABLE, NonBlockingStore.Characteristic.SHAREABLE, NonBlockingStore.Characteristic.TRANSACTIONAL);
    }

    public CompletionStage<Void> start(InitializationContext context) {
        RemoteCacheStoreConfiguration configuration = (RemoteCacheStoreConfiguration)context.getConfiguration();
        if (configuration.preload()) {
            throw new IllegalStateException();
        }
        Cache cache = context.getCache();
        this.container = configuration.container();
        this.cacheName = cache.getName();
        this.blockingManager = context.getBlockingManager();
        this.executor = context.getNonBlockingExecutor();
        this.batchSize = configuration.maxBatchSize();
        this.marshaller = context.getPersistenceMarshaller();
        this.entryFactory = context.getMarshallableEntryFactory();
        this.entryMapper = entry -> {
            MarshalledValue value = (MarshalledValue)((MetadataValue)entry.getValue()).getValue();
            return this.entryFactory.create(entry.getKey(), value.getValueBytes(), value.getMetadataBytes(), value.getInternalMetadataBytes(), value.getCreated(), value.getLastUsed());
        };
        this.cacheTransformer = configuration.transactional() ? ReadForUpdateRemoteCache::new : UnaryOperator.identity();
        this.segments = configuration.segmented() && cache.getAdvancedCache().getDistributionManager() != null ? cache.getCacheConfiguration().clustering().hash().numSegments() : 1;
        this.batchFactory = configuration.transactional() ? new TransactionalBatchFactory(this.cacheName, (TransactionManager)RemoteTransactionManager.getInstance(), CacheException::new) : null;
        this.caches = new AtomicReferenceArray(this.segments);
        for (int i = 0; i < this.segments; ++i) {
            this.container.getConfiguration().addRemoteCache(this.segmentCacheName(i), (java.util.function.Consumer)configuration);
        }
        return !configuration.segmented() || configuration.shared() ? this.addSegments(IntSets.immutableRangeSet((int)this.segments)) : CompletableFuture.completedStage(null);
    }

    public CompletionStage<Void> stop() {
        CompletableFuture<Object> result = CompletableFuture.completedFuture(null);
        for (int i = 0; i < this.caches.length(); ++i) {
            RemoteCache cache = this.caches.get(i);
            if (cache == null) continue;
            result = CompletableFuture.allOf(result, this.blockingManager.runBlocking(() -> {
                cache.stop();
                cache.getRemoteCacheContainer().getConfiguration().removeRemoteCache(cache.getName());
            }, (Object)"hotrod-store-stop").toCompletableFuture());
        }
        return result;
    }

    private String segmentCacheName(int segment) {
        return this.segments > 1 ? this.cacheName + "." + segment : this.cacheName;
    }

    private int segmentIndex(int segment) {
        return this.segments > 1 ? segment : 0;
    }

    private RemoteCache<K, MarshalledValue> segmentCache(int segment) {
        return this.caches.get(this.segmentIndex(segment));
    }

    private PrimitiveIterator.OfInt segmentIterator(IntSet segments) {
        return this.segments > 1 ? segments.iterator() : IntStream.of(0).iterator();
    }

    public CompletionStage<MarshallableEntry<K, V>> load(int segment, Object key) {
        Object typedKey = key;
        RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
        if (cache == null) {
            return CompletableFuture.completedStage(null);
        }
        return cache.getWithMetadataAsync(typedKey).thenApplyAsync(value -> value != null ? this.entryMapper.apply(Map.entry(typedKey, value)) : null, this.executor);
    }

    public CompletionStage<Void> write(int segment, MarshallableEntry<? extends K, ? extends V> entry) {
        RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
        if (cache == null) {
            return CompletableFuture.completedStage(null);
        }
        Object key = entry.getKey();
        MarshalledValue value = entry.getMarshalledValue();
        Metadata metadata = entry.getMetadata();
        long lifespan = metadata != null ? TimeUnit.SECONDS.convert(metadata.lifespan(), TimeUnit.MILLISECONDS) : -1L;
        long maxIdle = metadata != null ? TimeUnit.SECONDS.convert(metadata.maxIdle(), TimeUnit.MILLISECONDS) : -1L;
        return cache.withFlags(new Flag[]{Flag.SKIP_LISTENER_NOTIFICATION}).putAsync(key, (Object)value, lifespan, TimeUnit.SECONDS, maxIdle, TimeUnit.SECONDS).thenAcceptAsync((java.util.function.Consumer)Consumer.empty(), this.executor);
    }

    public CompletionStage<Boolean> delete(int segment, Object key) {
        RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
        if (cache == null) {
            return CompletableFuture.completedStage(null);
        }
        return cache.withFlags(new Flag[]{Flag.FORCE_RETURN_VALUE, Flag.SKIP_LISTENER_NOTIFICATION}).removeAsync(key).thenApplyAsync(Objects::nonNull, this.executor);
    }

    private CompletionStage<Void> remove(int segment, Object key) {
        RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
        if (cache == null) {
            return CompletableFuture.completedStage(null);
        }
        return cache.withFlags(new Flag[]{Flag.SKIP_LISTENER_NOTIFICATION}).removeAsync(key).thenAcceptAsync((java.util.function.Consumer)Consumer.empty(), this.executor);
    }

    public CompletionStage<Void> batch(int publisherCount, Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) {
        return this.completableBatch(publisherCount, removePublisher, writePublisher).toCompletionStage(null);
    }

    private Completable completableBatch(int publisherCount, Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) {
        Completable removeCompletable = Flowable.fromPublisher(removePublisher).flatMap(sp -> Flowable.fromPublisher((Publisher)sp).map(key -> Map.entry(sp.getSegment(), key)), publisherCount).flatMapCompletable(this::remove, false, this.batchSize);
        Completable writeCompletable = Flowable.fromPublisher(writePublisher).flatMap(sp -> Flowable.fromPublisher((Publisher)sp).map(entry -> Map.entry(sp.getSegment(), entry)), publisherCount).flatMapCompletable(this::write, false, this.batchSize);
        return removeCompletable.mergeWith((CompletableSource)writeCompletable).observeOn(Schedulers.from((Executor)this.executor));
    }

    private Completable write(Map.Entry<Integer, MarshallableEntry<K, V>> entry) {
        return Completable.fromCompletionStage(this.write(entry.getKey(), entry.getValue()));
    }

    private Completable remove(Map.Entry<Integer, Object> entry) {
        return Completable.fromCompletionStage(this.remove(entry.getKey(), entry.getValue()));
    }

    public Flowable<K> publishKeys(IntSet segments, Predicate<? super K> filter) {
        Stream<K> keys = Stream.empty();
        PrimitiveIterator.OfInt iterator = this.segmentIterator(segments);
        try {
            while (iterator.hasNext()) {
                int segment = iterator.nextInt();
                RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
                if (cache == null) continue;
                keys = Stream.concat(keys, cache.keySet().stream());
            }
            Stream<K> filteredKeys = filter != null ? keys.filter(filter) : keys;
            return Flowable.fromStream(filteredKeys).doFinally(filteredKeys::close).observeOn(Schedulers.from((Executor)this.executor));
        }
        catch (PersistenceException e) {
            return Flowable.fromCompletionStage(CompletableFuture.failedStage(e));
        }
    }

    public Publisher<MarshallableEntry<K, V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues) {
        return includeValues ? this.publishEntries(segments, filter) : this.publishKeys(segments, filter).map(arg_0 -> this.entryFactory.create(arg_0));
    }

    private Flowable<MarshallableEntry<K, V>> publishEntries(IntSet segments, Predicate<? super K> filter) {
        ArrayList<Publisher> publishers = new ArrayList<Publisher>(segments.size());
        PrimitiveIterator.OfInt iterator = this.segmentIterator(segments);
        try {
            while (iterator.hasNext()) {
                int segment = iterator.nextInt();
                RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
                if (cache == null) continue;
                publishers.add(cache.publishEntriesWithMetadata(null, this.batchSize));
            }
            return !publishers.isEmpty() ? Flowable.concat(publishers).filter(entry -> filter.test((Object)entry.getKey())).map(this.entryMapper).observeOn(Schedulers.from((Executor)this.executor)) : Flowable.empty();
        }
        catch (PersistenceException e) {
            return Flowable.fromCompletionStage(CompletableFuture.failedStage(e));
        }
    }

    public CompletionStage<Void> clear() {
        CompletableFuture<Object> result = CompletableFuture.completedFuture(null);
        for (int i = 0; i < this.caches.length(); ++i) {
            RemoteCache<K, MarshalledValue> cache = this.caches.get(i);
            if (cache == null) continue;
            result = CompletableFuture.allOf(new CompletableFuture[]{result, cache.withFlags(new Flag[]{Flag.SKIP_LISTENER_NOTIFICATION}).clearAsync().thenApplyAsync((java.util.function.Function)org.wildfly.clustering.function.Function.identity(), this.executor)});
        }
        return result;
    }

    public CompletionStage<Boolean> containsKey(int segment, Object key) {
        Object typedKey = key;
        RemoteCache<K, MarshalledValue> cache = this.segmentCache(segment);
        if (cache == null) {
            return CompletableFuture.completedStage(false);
        }
        try {
            return cache.containsKeyAsync(typedKey).thenApplyAsync((java.util.function.Function)org.wildfly.clustering.function.Function.identity(), this.executor);
        }
        catch (PersistenceException e) {
            return CompletableFuture.failedStage(e);
        }
    }

    public CompletionStage<Boolean> isAvailable() {
        InternalRemoteCache internalCache = (InternalRemoteCache)this.segmentCache(0);
        return internalCache.ping().handleAsync((v, e) -> e == null && v.isSuccess(), this.executor);
    }

    public CompletionStage<Long> size(IntSet segments) {
        CompletionStage<Long> result = CompletableFuture.completedFuture(0L);
        PrimitiveIterator.OfInt iterator = this.segmentIterator(segments);
        while (iterator.hasNext()) {
            int segment = iterator.nextInt();
            RemoteCache<K, MarshalledValue> cache = this.caches.get(segment);
            result = result.thenCombineAsync((CompletionStage)cache.sizeAsync(), Long::sum, this.executor);
        }
        return result;
    }

    public CompletionStage<Void> addSegments(IntSet segments) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        AtomicInteger count = new AtomicInteger(segments.size());
        PrimitiveIterator.OfInt iterator = segments.iterator();
        while (iterator.hasNext()) {
            int segment = iterator.nextInt();
            String cacheName = this.segmentCacheName(segment);
            int index = this.segmentIndex(segment);
            this.blockingManager.runBlocking(() -> {
                RemoteCache cache = this.container.getCache(cacheName);
                cache.start();
                this.caches.set(index, (RemoteCache)this.cacheTransformer.apply((Object)cache.withDataFormat(DataFormat.builder().keyMarshaller((Marshaller)this.marshaller).valueMarshaller((Marshaller)this.marshaller).build())));
            }, (Object)"hotrod-store-add-segments").whenComplete((value, e) -> {
                if (e != null) {
                    result.completeExceptionally((Throwable)e);
                } else if (count.decrementAndGet() == 0) {
                    result.complete(null);
                }
            });
        }
        return result;
    }

    public CompletionStage<Void> removeSegments(IntSet segments) {
        CompletableFuture<Object> result = CompletableFuture.completedFuture(null);
        PrimitiveIterator.OfInt iterator = segments.iterator();
        while (iterator.hasNext()) {
            int segment = iterator.nextInt();
            RemoteCache<K, MarshalledValue> cache = this.caches.get(segment);
            if (cache == null) continue;
            this.caches.set(segment, null);
            CompletableFuture[] completableFutureArray = new CompletableFuture[2];
            completableFutureArray[0] = result;
            completableFutureArray[1] = this.blockingManager.thenRunBlocking(cache.clearAsync().thenAcceptAsync((java.util.function.Consumer)Consumer.empty(), this.executor), () -> cache.stop(), (Object)"hotrod-store-remove-segments").toCompletableFuture();
            result = CompletableFuture.allOf(completableFutureArray);
        }
        return result;
    }

    public Publisher<MarshallableEntry<K, V>> purgeExpired() {
        return Flowable.empty();
    }

    public CompletionStage<Void> prepareWithModifications(Transaction transaction, int publisherCount, Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) {
        SuspendedBatch suspended = this.transactions.computeIfAbsent(transaction, (java.util.function.Function<Transaction, SuspendedBatch>)org.wildfly.clustering.function.Function.get((Supplier)this.batchFactory.map(Batch::suspend)));
        CompletableTransformer batcher = upstream -> observer -> {
            try (Context context = suspended.resumeWithContext();){
                upstream.subscribe(observer);
            }
        };
        return this.completableBatch(publisherCount, removePublisher, writePublisher).compose(batcher).toCompletionStage(null);
    }

    public CompletionStage<Void> commit(Transaction transaction) {
        return this.close(transaction, (Consumer<Batch>)Consumer.empty(), "hotrod-store-commit");
    }

    public CompletionStage<Void> rollback(Transaction transaction) {
        return this.close(transaction, (Consumer<Batch>)((Consumer)Batch::discard), "hotrod-store-rollback");
    }

    private CompletionStage<Void> close(Transaction transaction, Consumer<Batch> consumer, String operationName) {
        SuspendedBatch suspended = this.transactions.remove(transaction);
        return suspended != null ? this.blockingManager.runBlocking(() -> {
            try (Batch batch = suspended.resume();){
                consumer.accept((Object)batch);
            }
        }, (Object)operationName) : CompletableFuture.completedStage(null);
    }

    static interface Function<T, R>
    extends org.wildfly.clustering.function.Function<T, R>,
    io.reactivex.rxjava3.functions.Function<T, R> {
        public R apply(T var1);
    }
}

