package org.infinispan.persistence.async;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.configuration.cache.AsyncStoreConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.modifications.Modification;
import org.infinispan.persistence.modifications.Remove;
import org.infinispan.persistence.modifications.Store;
import org.infinispan.persistence.spi.CacheWriter;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.support.DelegatingCacheWriter;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/persistence/async/AsyncCacheWriter.class */
public class AsyncCacheWriter extends DelegatingCacheWriter {
    private static final Log log = LogFactory.getLog(AsyncCacheWriter.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final AtomicInteger threadId = new AtomicInteger(0);
    private ExecutorService executor;
    private Thread coordinator;
    private int concurrencyLevel;
    private String cacheName;
    protected BufferLock stateLock;

    @GuardedBy("stateLock")
    protected final AtomicReference<State> state;

    @GuardedBy("stateLock")
    private boolean stopped;
    protected AsyncStoreConfiguration asyncConfiguration;

    /* loaded from: input_file:org/infinispan/persistence/async/AsyncCacheWriter$AsyncStoreCoordinator.class */
    private class AsyncStoreCoordinator implements Runnable {
        static final /* synthetic */ boolean $assertionsDisabled;

        private AsyncStoreCoordinator() {
        }

        /* JADX WARN: Code restructure failed: missing block: B:13:0x004c, code lost:
        
            throw new java.lang.AssertionError("State chain longer than 3 entries!");
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 534
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.infinispan.persistence.async.AsyncCacheWriter.AsyncStoreCoordinator.run():void");
        }

        private List<AsyncStoreProcessor> createProcessors(State state, List<Modification> list) {
            ArrayList arrayList = new ArrayList();
            int min = Math.min(list.size(), AsyncCacheWriter.this.asyncConfiguration.threadPoolSize());
            if (min > 0) {
                int i = 0;
                int size = list.size() / min;
                int size2 = list.size() % min;
                int i2 = 0;
                while (i2 < min) {
                    int i3 = i + size + (i2 < size2 ? 1 : 0);
                    arrayList.add(new AsyncStoreProcessor(list.subList(i, i3), state));
                    i = i3;
                    i2++;
                }
                if (!$assertionsDisabled && i != list.size()) {
                    throw new AssertionError("Thread distribution is broken!");
                }
            }
            return arrayList;
        }

        static {
            $assertionsDisabled = !AsyncCacheWriter.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/persistence/async/AsyncCacheWriter$AsyncStoreProcessor.class */
    public class AsyncStoreProcessor implements Runnable {
        private final List<Modification> modifications;
        private final State myState;

        AsyncStoreProcessor(List<Modification> list, State state) {
            this.modifications = list;
            this.myState = state;
        }

        @Override // java.lang.Runnable
        public void run() {
            State state;
            long count;
            long j;
            State state2;
            try {
                retryWork(3);
                if (count != j) {
                    return;
                }
                if (state2 != null) {
                } else {
                    while (true) {
                        if (state == null) {
                            return;
                        }
                    }
                }
            } finally {
                this.myState.workerThreads.countDown();
                if (this.myState.workerThreads.getCount() == 0 && this.myState.next == null) {
                    State state3 = AsyncCacheWriter.this.state.get();
                    while (true) {
                        state = state3;
                        if (state == null) {
                            break;
                        }
                        if (state.next == this.myState) {
                            state.next = null;
                        }
                        state3 = state.next;
                    }
                }
            }
        }

        private void retryWork(int i) {
            for (int i2 = 0; i2 < i; i2++) {
                if (i2 > 0 && AsyncCacheWriter.log.isDebugEnabled()) {
                    AsyncCacheWriter.log.debugf("Retrying due to previous failure. %s attempts left.", i - i2);
                }
                try {
                    AsyncCacheWriter.this.applyModificationsSync(this.modifications);
                    return;
                } catch (Exception e) {
                    if (AsyncCacheWriter.log.isDebugEnabled()) {
                        AsyncCacheWriter.log.debug("Failed to process async modifications", e);
                    }
                }
            }
            AsyncCacheWriter.log.unableToProcessAsyncModifications(i);
        }
    }

    public AsyncCacheWriter(CacheWriter cacheWriter) {
        super(cacheWriter);
        this.state = new AtomicReference<>();
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.persistence.spi.CacheWriter
    public void init(InitializationContext initializationContext) {
        super.init(initializationContext);
        this.asyncConfiguration = initializationContext.getConfiguration().async();
        Cache cache = initializationContext.getCache();
        Configuration cacheConfiguration = cache != null ? cache.getCacheConfiguration() : null;
        this.concurrencyLevel = cacheConfiguration != null ? cacheConfiguration.locking().concurrencyLevel() : 16;
        this.cacheName = cache != null ? cache.getName() : null;
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.commons.api.Lifecycle
    public void start() {
        log.debugf("Async cache loader starting %s", this);
        this.state.set(newState(false, null));
        this.stopped = false;
        this.stateLock = new BufferLock(this.asyncConfiguration.modificationQueueSize());
        int threadPoolSize = this.asyncConfiguration.threadPoolSize();
        this.executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: org.infinispan.persistence.async.AsyncCacheWriter.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "AsyncStoreProcessor-" + AsyncCacheWriter.this.cacheName + "-" + AsyncCacheWriter.threadId.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        });
        ((ThreadPoolExecutor) this.executor).allowCoreThreadTimeOut(true);
        this.coordinator = new Thread(new AsyncStoreCoordinator(), "AsyncStoreCoordinator-" + this.cacheName);
        this.coordinator.setDaemon(true);
        this.coordinator.start();
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.commons.api.Lifecycle
    public void stop() {
        if (trace) {
            log.tracef("Stop async store %s", this);
        }
        this.stateLock.writeLock(0);
        this.stopped = true;
        this.stateLock.writeUnlock();
        try {
            this.coordinator.join();
            this.executor.shutdown();
            if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                log.errorAsyncStoreNotStopped();
            }
        } catch (InterruptedException e) {
            log.interruptedWaitingAsyncStorePush(e);
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.persistence.spi.CacheWriter
    public void write(MarshalledEntry marshalledEntry) {
        put(new Store(marshalledEntry.getKey(), marshalledEntry), 1);
    }

    @Override // org.infinispan.persistence.support.DelegatingCacheWriter, org.infinispan.persistence.spi.CacheWriter
    public boolean delete(Object obj) {
        put(new Remove(obj), 1);
        return true;
    }

    protected void applyModificationsSync(List<Modification> list) throws PersistenceException {
        for (Modification modification : list) {
            switch (modification.getType()) {
                case STORE:
                    this.actual.write(((Store) modification).getStoredValue());
                    break;
                case REMOVE:
                    this.actual.delete(((Remove) modification).getKey());
                    break;
                default:
                    throw new IllegalArgumentException("Unknown modification type " + modification.getType());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public State newState(boolean z, State state) {
        return new State(z, CollectionFactory.makeConcurrentMap(64, this.concurrencyLevel), state);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertNotStopped() throws CacheException {
        if (this.stopped) {
            throw new CacheException("AsyncCacheWriter stopped; no longer accepting more entries.");
        }
    }

    private void put(Modification modification, int i) {
        this.stateLock.writeLock(i);
        try {
            if (trace) {
                log.tracef("Queue modification: %s", modification);
            }
            assertNotStopped();
            this.state.get().put(modification);
        } finally {
            this.stateLock.writeUnlock();
        }
    }

    public AtomicReference<State> getState() {
        return this.state;
    }

    protected void clearStore() {
    }
}
