package reactor.rx.action.aggregation;

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.dispatch.InsufficientCapacityException;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.action.Action;
import reactor.rx.subscription.BatchSubscription;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/rx/action/aggregation/BatchAction.class */
public abstract class BatchAction<T, V> extends Action<T, V> {
    protected final boolean next;
    protected final boolean flush;
    protected final boolean first;
    protected final int batchSize;
    protected final Dispatcher dispatcher;
    protected final long timespan;
    protected final TimeUnit unit;
    protected final Timer timer;
    protected final Consumer<T> flushConsumer;
    protected int index;
    private Pausable timespanRegistration;

    /* loaded from: input_file:reactor/rx/action/aggregation/BatchAction$FlushConsumer.class */
    private final class FlushConsumer implements Consumer<T> {
        private FlushConsumer() {
        }

        public void accept(T t) {
            BatchAction.this.flushCallback(t);
            BatchAction.this.index = 0;
        }
    }

    public BatchAction(Dispatcher dispatcher, int i, boolean z, boolean z2, boolean z3) {
        this(dispatcher, i, z, z2, z3, -1L, null, null);
    }

    public BatchAction(Dispatcher dispatcher, int i, boolean z, boolean z2, boolean z3, long j, TimeUnit timeUnit, Timer timer) {
        super(i);
        this.flushConsumer = new FlushConsumer();
        this.index = 0;
        this.dispatcher = dispatcher;
        if (j > 0) {
            this.unit = timeUnit != null ? timeUnit : TimeUnit.SECONDS;
            this.timespan = j;
            this.timer = timer;
        } else {
            this.timespan = -1L;
            this.timer = null;
            this.unit = null;
        }
        this.first = z2;
        this.flush = z3;
        this.next = z;
        this.batchSize = i;
    }

    @Override // reactor.rx.action.Action
    protected PushSubscription<T> createTrackingSubscription(Subscription subscription) {
        return new BatchSubscription(subscription, this, this.batchSize);
    }

    protected void nextCallback(T t) {
    }

    protected void flushCallback(T t) {
    }

    protected void firstCallback(T t) {
    }

    @Override // reactor.rx.action.Action
    protected void doNext(T t) {
        int i = this.index + 1;
        this.index = i;
        if (i == 1) {
            if (this.timer != null) {
                this.timespanRegistration = this.timer.submit(new Consumer<Long>() { // from class: reactor.rx.action.aggregation.BatchAction.1
                    public void accept(Long l) {
                        try {
                            if (BatchAction.this.isPublishing()) {
                                BatchAction.this.dispatcher.tryDispatch((Object) null, BatchAction.this.flushConsumer, (Consumer) null);
                            }
                        } catch (InsufficientCapacityException e) {
                        }
                    }
                }, this.timespan, this.unit);
            }
            if (this.first) {
                firstCallback(t);
            }
        }
        if (this.next) {
            nextCallback(t);
        }
        if (this.index % this.batchSize == 0) {
            if (this.timespanRegistration != null) {
                this.timespanRegistration.cancel();
                this.timespanRegistration = null;
            }
            this.index = 0;
            if (this.flush) {
                this.flushConsumer.accept(t);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doComplete() {
        this.flushConsumer.accept((Object) null);
        super.doComplete();
    }

    @Override // reactor.rx.action.Action, reactor.rx.Stream
    public String toString() {
        return super.toString() + "{" + (this.timer != null ? "timed - " + this.timespan + " " + this.unit : "") + " batchSize=" + this.index + "/" + this.batchSize + " [" + ((int) ((this.index / this.batchSize) * 100.0f)) + "%]";
    }

    @Override // reactor.rx.Stream
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }
}
