package org.infinispan.persistence.async;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.AsyncStoreConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.PersistenceConfiguration;
import org.infinispan.factories.threads.DefaultThreadFactory;
import org.infinispan.persistence.modifications.Modification;
import org.infinispan.persistence.modifications.ModificationsList;
import org.infinispan.persistence.modifications.Remove;
import org.infinispan.persistence.modifications.Store;
import org.infinispan.persistence.spi.CacheWriter;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.support.DelegatingCacheWriter;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

/* loaded from: input_file:org/infinispan/persistence/async/AsyncCacheWriter.class */
public class AsyncCacheWriter extends DelegatingCacheWriter {
    private static final Log log = LogFactory.getLog(AsyncCacheWriter.class);
    private static final boolean trace = log.isTraceEnabled();
    private ExecutorService executor;
    private Thread coordinator;
    private int concurrencyLevel;
    private String cacheName;
    private String nodeName;
    protected BufferLock stateLock;

    @GuardedBy("stateLock")
    protected final AtomicReference<State> state;

    @GuardedBy("stateLock")
    private boolean stopped;
    private final Lock availabilityLock;
    private final Condition availability;

    @GuardedBy("availabilityLock")
    private volatile boolean delegateAvailable;
    protected AsyncStoreConfiguration asyncConfiguration;

    /* loaded from: input_file:org/infinispan/persistence/async/AsyncCacheWriter$AsyncStoreCoordinator.class */
    private class AsyncStoreCoordinator implements Runnable {
        final boolean failSilently;
        static final /* synthetic */ boolean $assertionsDisabled;

        AsyncStoreCoordinator(boolean z) {
            this.failSilently = z;
        }

        /* JADX WARN: Code restructure failed: missing block: B:37:0x00d7, code lost:
        
            throw new java.lang.AssertionError("State chain longer than 3 entries!");
         */
        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 667
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.infinispan.persistence.async.AsyncCacheWriter.AsyncStoreCoordinator.run():void");
        }

        private List<AsyncStoreProcessor> createProcessors(State state, List<Modification> list) {
            ArrayList arrayList = new ArrayList();
            int min = Math.min(list.size(), AsyncCacheWriter.this.asyncConfiguration.threadPoolSize());
            if (min > 0) {
                int i = 0;
                int size = list.size() / min;
                int size2 = list.size() % min;
                int i2 = 0;
                while (i2 < min) {
                    int i3 = i + size + (i2 < size2 ? 1 : 0);
                    arrayList.add(new AsyncStoreProcessor(list.subList(i, i3), state, this.failSilently));
                    i = i3;
                    i2++;
                }
                if (!$assertionsDisabled && i != list.size()) {
                    throw new AssertionError("Thread distribution is broken!");
                }
            }
            return arrayList;
        }

        static {
            $assertionsDisabled = !AsyncCacheWriter.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/persistence/async/AsyncCacheWriter$AsyncStoreProcessor.class */
    public class AsyncStoreProcessor implements Runnable {
        private final List<Modification> modifications;
        private final State myState;
        private final boolean failSilently;
        private final PersistenceConfiguration configuration;

        AsyncStoreProcessor(List<Modification> list, State state, boolean z) {
            this.modifications = list;
            this.myState = state;
            this.failSilently = z;
            this.configuration = AsyncCacheWriter.this.ctx.getCache().getCacheConfiguration().persistence();
        }

        @Override // java.lang.Runnable
        public void run() {
            State state;
            long count;
            long j;
            State state2;
            try {
                retryWork(this.configuration.connectionAttempts());
                if (count != j) {
                    return;
                }
                if (state2 != null) {
                } else {
                    while (true) {
                        if (state == null) {
                            return;
                        }
                    }
                }
            } finally {
                this.myState.workerThreads.countDown();
                if (this.myState.workerThreads.getCount() == 0 && this.myState.next == null) {
                    State state3 = AsyncCacheWriter.this.state.get();
                    while (true) {
                        state = state3;
                        if (state == null) {
                            break;
                        }
                        if (state.next == this.myState) {
                            state.next = null;
                        }
                        state3 = state.next;
                    }
                }
            }
        }

        /* JADX WARN: Finally extract failed */
        private void retryWork(int i) {
            for (int i2 = 0; i2 < i; i2++) {
                if (i2 > 0 && AsyncCacheWriter.log.isDebugEnabled()) {
                    AsyncCacheWriter.log.debugf("Retrying due to previous failure. %s attempts left.", i - i2);
                }
                try {
                    if (!this.failSilently) {
                        try {
                            AsyncCacheWriter.this.availabilityLock.lock();
                            try {
                                if (!AsyncCacheWriter.this.delegateAvailable) {
                                    if (AsyncCacheWriter.this.stopped) {
                                        AsyncCacheWriter.log.debugf("Failed to write async modifications to %s as the store is unavailable and stop() was called", AsyncCacheWriter.this.actual);
                                        AsyncCacheWriter.this.availabilityLock.unlock();
                                        return;
                                    }
                                    AsyncCacheWriter.this.availability.await();
                                }
                                AsyncCacheWriter.this.availabilityLock.unlock();
                            } catch (InterruptedException e) {
                                AsyncCacheWriter.log.debugf("%s interrupted: %s", this, e);
                                Thread.currentThread().interrupt();
                                AsyncCacheWriter.this.availabilityLock.unlock();
                            }
                        } catch (Throwable th) {
                            AsyncCacheWriter.this.availabilityLock.unlock();
                            throw th;
                        }
                    }
                    AsyncCacheWriter.this.applyModificationsSync(this.modifications);
                    return;
                } catch (Exception e2) {
                    if (AsyncCacheWriter.log.isDebugEnabled()) {
                        AsyncCacheWriter.log.debug("Failed to process async modifications", e2);
                    }
                    if (!this.failSilently) {
                        try {
                            Thread.sleep(this.configuration.availabilityInterval());
                        } catch (InterruptedException e3) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
            AsyncCacheWriter.log.unableToProcessAsyncModifications(i);
        }
    }

    public AsyncCacheWriter(CacheWriter cacheWriter) {
        super(cacheWriter);
        this.state = new AtomicReference<>();
        this.availabilityLock = new ReentrantLock();
        this.availability = this.availabilityLock.newCondition();
        this.delegateAvailable = true;
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.persistence.spi.CacheWriter
    public void init(InitializationContext initializationContext) {
        super.init(initializationContext);
        this.asyncConfiguration = initializationContext.getConfiguration().async();
        Cache cache = initializationContext.getCache();
        Configuration cacheConfiguration = cache != null ? cache.getCacheConfiguration() : null;
        this.concurrencyLevel = cacheConfiguration != null ? cacheConfiguration.locking().concurrencyLevel() : 16;
        this.cacheName = cache != null ? cache.getName() : null;
        this.nodeName = cache != null ? cache.getCacheManager().getCacheManagerConfiguration().transport().nodeName() : null;
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.commons.api.Lifecycle
    public void start() {
        log.debugf("Async cache loader starting %s", this);
        this.state.set(newState(false, null));
        this.stopped = false;
        this.stateLock = new BufferLock(this.asyncConfiguration.modificationQueueSize());
        int threadPoolSize = this.asyncConfiguration.threadPoolSize();
        this.executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DefaultThreadFactory(null, 5, DefaultThreadFactory.DEFAULT_PATTERN, this.nodeName, "AsyncStoreProcessor"));
        ((ThreadPoolExecutor) this.executor).allowCoreThreadTimeOut(true);
        this.coordinator = new DefaultThreadFactory(null, 5, DefaultThreadFactory.DEFAULT_PATTERN, this.nodeName, "AsyncStoreCoordinator").newThread(new AsyncStoreCoordinator(this.asyncConfiguration.failSilently()));
        this.coordinator.start();
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.commons.api.Lifecycle
    public void stop() {
        if (trace) {
            log.tracef("Stop async store %s", this);
        }
        this.stateLock.writeLock(0);
        this.stopped = true;
        this.stateLock.writeUnlock();
        try {
            if (this.asyncConfiguration.failSilently() || this.delegateAvailable) {
                this.coordinator.join();
                this.executor.shutdown();
            } else {
                this.coordinator.interrupt();
                this.executor.shutdownNow();
            }
            if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                log.errorAsyncStoreNotStopped();
            }
        } catch (InterruptedException e) {
            log.interruptedWaitingAsyncStorePush(e);
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.infinispan.persistence.spi.CacheWriter
    public boolean isAvailable() {
        if (this.stopped) {
            return false;
        }
        if (this.asyncConfiguration.failSilently()) {
            return true;
        }
        boolean z = false;
        try {
            z = this.actual.isAvailable();
        } catch (Throwable th) {
            log.debugf("Error encountered when calling isAvailable on %s: %s", this.actual, th);
        }
        this.availabilityLock.lock();
        try {
            if (z != this.delegateAvailable) {
                this.delegateAvailable = z;
                if (this.delegateAvailable) {
                    this.availability.signalAll();
                }
            }
            return this.delegateAvailable || this.stateLock.hasCapacity();
        } finally {
            this.availabilityLock.unlock();
        }
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.persistence.spi.CacheWriter
    public void write(MarshallableEntry marshallableEntry) {
        put(new Store(marshallableEntry.getKey(), marshallableEntry), 1);
    }

    @Override // org.infinispan.persistence.spi.CacheWriter
    public CompletionStage<Void> bulkUpdate(Publisher publisher) {
        CompletableFuture completableFuture = new CompletableFuture();
        Single list = Flowable.fromPublisher(publisher).map(marshallableEntry -> {
            return new Store(marshallableEntry.getKey(), marshallableEntry);
        }).cast(Modification.class).toList();
        Consumer consumer = list2 -> {
            putAll(list2);
            completableFuture.complete(null);
        };
        Objects.requireNonNull(completableFuture);
        list.subscribe(consumer, completableFuture::completeExceptionally);
        return completableFuture;
    }

    @Override // org.infinispan.persistence.spi.CacheWriter
    public void deleteBatch(Iterable iterable) {
        putAll((List) StreamSupport.stream(iterable.spliterator(), false).map(Remove::new).collect(Collectors.toList()));
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.persistence.spi.CacheWriter
    public boolean delete(Object obj) {
        put(new Remove(obj), 1);
        return true;
    }

    protected void applyModificationsSync(List<Modification> list) throws PersistenceException {
        CacheWriter<K, V> cacheWriter = this.actual;
        Flowable filter = Flowable.fromIterable(list).filter(modification -> {
            return modification.getType() == Modification.Type.STORE;
        });
        Class<Store> cls = Store.class;
        Objects.requireNonNull(Store.class);
        cacheWriter.bulkUpdate(filter.map((v1) -> {
            return r2.cast(v1);
        }).map((v0) -> {
            return v0.getStoredValue();
        }));
        this.actual.deleteBatch(() -> {
            Stream filter2 = StreamSupport.stream(Spliterators.spliterator(list, 256), false).filter(modification2 -> {
                return modification2.getType() == Modification.Type.REMOVE;
            });
            Class<Remove> cls2 = Remove.class;
            Objects.requireNonNull(Remove.class);
            return filter2.map((v1) -> {
                return r1.cast(v1);
            }).map((v0) -> {
                return v0.getKey();
            }).iterator();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public State newState(boolean z, State state) {
        return new State(z, new ConcurrentHashMap(64, 0.75f, this.concurrencyLevel), state);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertNotStopped() throws CacheException {
        if (this.stopped) {
            throw new CacheException("AsyncCacheWriter stopped; no longer accepting more entries.");
        }
    }

    private void put(Modification modification, int i) {
        this.stateLock.writeLock(i);
        try {
            if (trace) {
                log.tracef("Queue modification: %s", modification);
            }
            assertNotStopped();
            this.state.get().put(modification);
        } finally {
            this.stateLock.writeUnlock();
        }
    }

    private void putAll(List<Modification> list) {
        this.stateLock.writeLock(list.size());
        try {
            this.state.get().put(new ModificationsList(list));
        } finally {
            this.stateLock.writeUnlock();
        }
    }

    public AtomicReference<State> getState() {
        return this.state;
    }

    protected void clearStore() {
    }
}
