/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.SerializedSubscriber;
import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class MultiRetryWhenOp<T>
extends AbstractMultiOperator<T, T> {
    private final Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory;

    public MultiRetryWhenOp(Multi<? extends T> upstream, Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory) {
        super(upstream);
        this.triggerStreamFactory = triggerStreamFactory;
    }

    private static <T> void subscribe(MultiSubscriber<? super T> downstream, Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory, Multi<? extends T> upstream) {
        Publisher<?> publisher;
        TriggerSubscriber other = new TriggerSubscriber();
        SerializedSubscriber<Throwable> signaller = new SerializedSubscriber<Throwable>(other.processor);
        signaller.onSubscribe(Subscriptions.empty());
        SerializedSubscriber<T> serialized = new SerializedSubscriber<T>(downstream);
        RetryWhenOperator<T> operator = new RetryWhenOperator<T>(upstream, serialized, signaller);
        other.operator = operator;
        serialized.onSubscribe(operator);
        try {
            publisher = triggerStreamFactory.apply(other);
            if (publisher == null) {
                throw new NullPointerException("The stream factory returned `null`");
            }
        }
        catch (Throwable e) {
            downstream.onFailure(e);
            return;
        }
        publisher.subscribe(other);
        if (!operator.isCancelled()) {
            upstream.subscribe(operator);
        }
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> downstream) {
        MultiRetryWhenOp.subscribe(downstream, this.triggerStreamFactory, this.upstream);
    }

    static final class TriggerSubscriber
    extends AbstractMulti<Throwable>
    implements Multi<Throwable>,
    Subscriber<Object> {
        RetryWhenOperator<?> operator;
        private final Processor<Throwable, Throwable> processor = UnicastProcessor.create().serialized();

        TriggerSubscriber() {
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.operator.setWhen(s);
        }

        @Override
        public void onNext(Object t) {
            this.operator.resubscribe();
        }

        @Override
        public void onError(Throwable t) {
            this.operator.whenFailure(t);
        }

        @Override
        public void onComplete() {
            this.operator.whenComplete();
        }

        @Override
        public void subscribe(Subscriber<? super Throwable> actual) {
            this.processor.subscribe(actual);
        }
    }

    static final class RetryWhenOperator<T>
    extends SwitchableSubscriptionSubscriber<T> {
        private final Publisher<? extends T> upstream;
        private final AtomicInteger wip = new AtomicInteger();
        private final Subscriber<Throwable> signaller;
        private final Subscriptions.DeferredSubscription arbiter = new Subscriptions.DeferredSubscription();
        long produced;

        RetryWhenOperator(Publisher<? extends T> upstream, MultiSubscriber<? super T> downstream, Subscriber<Throwable> signaller) {
            super(downstream);
            this.upstream = upstream;
            this.signaller = signaller;
        }

        @Override
        public void cancel() {
            if (!this.isCancelled()) {
                this.arbiter.cancel();
                super.cancel();
            }
        }

        public void setWhen(Subscription w) {
            this.arbiter.set(w);
        }

        @Override
        public void onItem(T t) {
            this.downstream.onItem(t);
            ++this.produced;
        }

        @Override
        public void onFailure(Throwable t) {
            long p = this.produced;
            if (p != 0L) {
                this.produced = 0L;
                this.emitted(p);
            }
            this.arbiter.request(1L);
            this.signaller.onNext(t);
        }

        @Override
        public void onCompletion() {
            this.arbiter.cancel();
            this.downstream.onComplete();
        }

        void resubscribe() {
            if (this.wip.getAndIncrement() == 0) {
                do {
                    if (this.isCancelled()) {
                        return;
                    }
                    this.upstream.subscribe(this);
                } while (this.wip.decrementAndGet() != 0);
            }
        }

        void whenFailure(Throwable failure) {
            super.cancel();
            this.downstream.onFailure(failure);
        }

        void whenComplete() {
            super.cancel();
            this.downstream.onComplete();
        }
    }
}

