package org.infinispan.remoting;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.cache.AsyncConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.CR1.jar:org/infinispan/remoting/ReplicationQueueImpl.class */
public class ReplicationQueueImpl implements ReplicationQueue {
    private static final Log log = LogFactory.getLog(ReplicationQueue.class);
    private long maxElements = 500;
    private final BlockingQueue<ReplicableCommand> elements = new LinkedBlockingQueue();
    private ScheduledExecutorService scheduledExecutor = null;
    private RpcManager rpcManager;
    private Configuration configuration;
    private boolean enabled;
    private CommandsFactory commandsFactory;
    private volatile ScheduledFuture<?> scheduledFuture;
    private boolean trace;
    private String cacheName;

    @Override // org.infinispan.remoting.ReplicationQueue
    public boolean isEnabled() {
        return this.enabled;
    }

    @Inject
    public void injectDependencies(@ComponentName("org.infinispan.executors.replicationQueue") ScheduledExecutorService scheduledExecutorService, RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory, Cache cache) {
        injectDependencies(scheduledExecutorService, rpcManager, configuration, commandsFactory, cache.getName());
    }

    public void injectDependencies(ScheduledExecutorService scheduledExecutorService, RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory, String str) {
        this.rpcManager = rpcManager;
        this.configuration = configuration;
        this.commandsFactory = commandsFactory;
        this.scheduledExecutor = scheduledExecutorService;
        this.cacheName = str;
    }

    @Override // org.infinispan.commons.api.Lifecycle
    @Start
    public void start() {
        AsyncConfiguration async = this.configuration.clustering().async();
        long replQueueInterval = async.replQueueInterval();
        this.maxElements = async.replQueueMaxElements();
        this.trace = log.isTraceEnabled();
        if (this.trace) {
            log.tracef("Starting replication queue, with interval %d and maxElements %s", replQueueInterval, this.maxElements);
        }
        this.enabled = async.useReplQueue();
        if (!this.enabled || replQueueInterval <= 0) {
            return;
        }
        this.scheduledFuture = this.scheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.infinispan.remoting.ReplicationQueueImpl.1
            @Override // java.lang.Runnable
            public void run() {
                LogFactory.pushNDC(ReplicationQueueImpl.this.cacheName, ReplicationQueueImpl.this.trace);
                try {
                    ReplicationQueueImpl.this.flush();
                } finally {
                    LogFactory.popNDC(ReplicationQueueImpl.this.trace);
                }
            }
        }, replQueueInterval, replQueueInterval, TimeUnit.MILLISECONDS);
    }

    @Override // org.infinispan.commons.api.Lifecycle
    @Stop(priority = 9)
    public void stop() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
        }
        try {
            flush();
        } catch (Exception e) {
            log.debug("Unable to perform final flush before shutting down", e);
        }
        this.scheduledExecutor = null;
    }

    @Override // org.infinispan.remoting.ReplicationQueue
    public void add(ReplicableCommand replicableCommand) {
        if (replicableCommand == null) {
            throw new NullPointerException("job is null");
        }
        try {
            this.elements.put(replicableCommand);
            if (this.elements.size() >= this.maxElements) {
                flush();
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override // org.infinispan.remoting.ReplicationQueue
    public synchronized int flush() {
        List<ReplicableCommand> drainReplQueue = drainReplQueue();
        if (this.trace) {
            log.tracef("flush(): flushing repl queue (num elements=%s)", drainReplQueue.size());
        }
        int size = drainReplQueue.size();
        if (size > 0) {
            try {
                log.tracef("Flushing %s elements", size);
                this.rpcManager.invokeRemotely(null, this.commandsFactory.buildReplicateCommand(drainReplQueue), this.rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS).skipReplicationQueue(true).build());
            } catch (Throwable th) {
                log.failedReplicatingQueue(drainReplQueue.size(), th);
            }
        }
        return size;
    }

    protected List<ReplicableCommand> drainReplQueue() {
        LinkedList linkedList = new LinkedList();
        this.elements.drainTo(linkedList);
        return linkedList;
    }

    protected Configuration getConfiguration() {
        return this.configuration;
    }

    @Override // org.infinispan.remoting.ReplicationQueue
    public int getElementsCount() {
        return this.elements.size();
    }

    @Override // org.infinispan.remoting.ReplicationQueue
    public void reset() {
        this.elements.clear();
    }
}
