/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribeSingle;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.ArrayUtils;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.utils.internal.PlatformDependent;
import java.lang.reflect.Array;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class CacheSingle<T>
extends AbstractNoHandleSubscribeSingle<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheSingle.class);
    private static final SingleSource.Subscriber<?>[] EMPTY_SUBSCRIBERS = new SingleSource.Subscriber[0];
    private static final AtomicReferenceFieldUpdater<State, SingleSource.Subscriber[]> newSubscribersUpdater = AtomicReferenceFieldUpdater.newUpdater(State.class, SingleSource.Subscriber[].class, "subscribers");
    private static final AtomicIntegerFieldUpdater<State> subscribeCountUpdater = AtomicIntegerFieldUpdater.newUpdater(State.class, "subscribeCount");
    private final Single<T> original;
    private final BiFunction<T, Throwable, Completable> terminalResubscribe;
    private final int minSubscribers;
    private final boolean cancelUpstream;
    private volatile State state = new State();

    CacheSingle(Single<T> original, int minSubscribers, boolean cancelUpstream, BiFunction<T, Throwable, Completable> terminalResubscribe) {
        if (minSubscribers < 1) {
            throw new IllegalArgumentException("minSubscribers: " + minSubscribers + " (expected >1)");
        }
        this.original = original;
        this.minSubscribers = minSubscribers;
        this.cancelUpstream = cancelUpstream;
        this.terminalResubscribe = Objects.requireNonNull(terminalResubscribe);
    }

    @Override
    void handleSubscribe(SingleSource.Subscriber<? super T> subscriber, ContextMap contextMap, AsyncContextProvider contextProvider) {
        this.state.addSubscriber(subscriber, contextMap, contextProvider);
    }

    static /* synthetic */ SingleSource.Subscriber[] access$100() {
        return EMPTY_SUBSCRIBERS;
    }

    private static final class TerminalSubscriber<T>
    implements SingleSource.Subscriber<T> {
        @Nullable
        private final Object terminal;
        private final boolean isSuccess;

        private TerminalSubscriber(@Nullable Throwable error, @Nullable T value) {
            if (error == null) {
                this.terminal = value;
                this.isSuccess = true;
            } else {
                this.terminal = error;
                this.isSuccess = false;
            }
        }

        private void safeTerminateFromSource(SingleSource.Subscriber<? super T> sub) {
            if (this.isSuccess) {
                SubscriberUtils.deliverSuccessFromSource(sub, this.terminal);
            } else {
                assert (this.terminal != null);
                SubscriberUtils.deliverErrorFromSource(sub, (Throwable)this.terminal);
            }
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void onSuccess(@Nullable T result) {
            if (this.isSuccess) {
                throw new IllegalStateException("terminal signal already received in onSuccess. new: " + result, (Throwable)this.terminal);
            }
            throw new IllegalStateException("terminal signal already received in onSuccess. old: " + this.terminal + " new: " + result);
        }

        @Override
        public void onError(Throwable t) {
            throw new IllegalStateException("duplicate terminal signal in onError. old: " + this.terminal, t);
        }
    }

    private static final class ConcurrentOnSubscribeSubscriber<T>
    implements SingleSource.Subscriber<T> {
        private static final AtomicReferenceFieldUpdater<ConcurrentOnSubscribeSubscriber, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentOnSubscribeSubscriber.class, Object.class, "state");
        private static final Object INIT = new Object();
        private static final Object INVOKING_ON_SUBSCRIBE = new Object();
        private static final Object WAITING_FOR_TERMINAL = new Object();
        private static final Object TERMINATED = new Object();
        private final SingleSource.Subscriber<? super T> delegate;
        @Nullable
        private volatile Object state = INIT;

        private ConcurrentOnSubscribeSubscriber(SingleSource.Subscriber<? super T> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            block7: {
                Object currState;
                do {
                    if ((currState = this.state) == INIT && stateUpdater.compareAndSet(this, INIT, INVOKING_ON_SUBSCRIBE)) {
                        try {
                            this.delegate.onSubscribe(cancellable);
                            break block7;
                        }
                        finally {
                            if (!stateUpdater.compareAndSet(this, INVOKING_ON_SUBSCRIBE, WAITING_FOR_TERMINAL)) {
                                this.sendTerminal(this.state);
                            }
                        }
                    }
                    if (currState == WAITING_FOR_TERMINAL || currState == INVOKING_ON_SUBSCRIBE) {
                        this.duplicateOnSubscribe(cancellable);
                        break block7;
                    }
                    if (currState == TERMINATED) break block7;
                } while (currState == INIT);
                this.delayedOnSubscribe(currState);
            }
        }

        private void duplicateOnSubscribe(Cancellable cancellable) {
            try {
                cancellable.cancel();
            }
            finally {
                SubscriberUtils.logDuplicateTerminal(this);
            }
        }

        private void delayedOnSubscribe(@Nullable Object currState) {
            try {
                this.delegate.onSubscribe(Cancellable.IGNORE_CANCEL);
            }
            finally {
                this.sendTerminal(currState);
            }
        }

        private void sendTerminal(@Nullable Object currState) {
            this.state = TERMINATED;
            if (currState instanceof TerminalNotification) {
                Throwable cause = ((TerminalNotification)currState).cause();
                assert (cause != null);
                this.delegate.onError(cause);
            } else {
                Object t = currState;
                this.delegate.onSuccess(t);
            }
        }

        @Override
        public void onSuccess(@Nullable T result) {
            block3: {
                block4: {
                    while (true) {
                        Object currState;
                        if ((currState = this.state) == WAITING_FOR_TERMINAL) {
                            this.state = TERMINATED;
                            this.delegate.onSuccess(result);
                            break block3;
                        }
                        if (currState == INIT) {
                            if (!stateUpdater.compareAndSet(this, INIT, TERMINATED)) continue;
                            SubscriberUtils.deliverSuccessFromSource(this.delegate, result);
                            break block3;
                        }
                        if (currState != INVOKING_ON_SUBSCRIBE) break block4;
                        if (stateUpdater.compareAndSet(this, INVOKING_ON_SUBSCRIBE, result)) break;
                    }
                    break block3;
                }
                SubscriberUtils.logDuplicateTerminalOnSuccess(this, result);
            }
        }

        @Override
        public void onError(Throwable t) {
            block3: {
                block4: {
                    TerminalNotification terminalNotification = TerminalNotification.error(t);
                    while (true) {
                        Object currState;
                        if ((currState = this.state) == WAITING_FOR_TERMINAL) {
                            this.state = TERMINATED;
                            this.delegate.onError(t);
                            break block3;
                        }
                        if (currState == INIT) {
                            if (!stateUpdater.compareAndSet(this, INIT, TERMINATED)) continue;
                            SubscriberUtils.deliverErrorFromSource(this.delegate, t);
                            break block3;
                        }
                        if (currState != INVOKING_ON_SUBSCRIBE) break block4;
                        if (stateUpdater.compareAndSet(this, INVOKING_ON_SUBSCRIBE, terminalNotification)) break;
                    }
                    break block3;
                }
                SubscriberUtils.logDuplicateTerminal(this, t);
            }
        }
    }

    private final class State
    implements SingleSource.Subscriber<T> {
        volatile SingleSource.Subscriber<? super T>[] subscribers = CacheSingle.access$100();
        volatile int subscribeCount;
        private final DelayedCancellable delayedCancellable = new DelayedCancellable();

        private State() {
        }

        void addSubscriber(SingleSource.Subscriber<? super T> subscriber, ContextMap contextMap, AsyncContextProvider contextProvider) {
            block2: {
                SingleSource.Subscriber[] newSubs;
                SingleSource.Subscriber<? super T>[] currSubs;
                int sCount = subscribeCountUpdater.incrementAndGet(this);
                ConcurrentOnSubscribeSubscriber multiSubscriber = new ConcurrentOnSubscribeSubscriber(subscriber);
                do {
                    if ((currSubs = this.subscribers).length == 1 && currSubs[0] instanceof TerminalSubscriber) {
                        TerminalSubscriber terminalSubscriber = (TerminalSubscriber)currSubs[0];
                        terminalSubscriber.safeTerminateFromSource(subscriber);
                        break block2;
                    }
                    newSubs = (SingleSource.Subscriber[])Array.newInstance(SingleSource.Subscriber.class, currSubs.length + 1);
                    System.arraycopy(currSubs, 0, newSubs, 0, currSubs.length);
                    newSubs[currSubs.length] = multiSubscriber;
                } while (!newSubscribersUpdater.compareAndSet(this, currSubs, newSubs));
                multiSubscriber.onSubscribe(() -> this.removeSubscriber(multiSubscriber));
                if (sCount != CacheSingle.this.minSubscribers) break block2;
                CacheSingle.this.original.delegateSubscribe(this, contextMap, contextProvider);
            }
        }

        void removeSubscriber(SingleSource.Subscriber<T> subscriber) {
            block3: {
                SingleSource.Subscriber[] newSubs;
                SingleSource.Subscriber<? super T>[] currSubs;
                do {
                    int i;
                    if ((i = ArrayUtils.indexOf(subscriber, currSubs = this.subscribers)) < 0) {
                        return;
                    }
                    newSubs = (SingleSource.Subscriber[])Array.newInstance(SingleSource.Subscriber.class, currSubs.length - 1);
                    if (i == 0) {
                        System.arraycopy(currSubs, 1, newSubs, 0, newSubs.length);
                        continue;
                    }
                    System.arraycopy(currSubs, 0, newSubs, 0, i);
                    System.arraycopy(currSubs, i + 1, newSubs, i, newSubs.length - i);
                } while (!newSubscribersUpdater.compareAndSet(this, currSubs, newSubs));
                if (!CacheSingle.this.cancelUpstream || newSubs.length != 0) break block3;
                CacheSingle.this.state = new State();
                this.delayedCancellable.cancel();
            }
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            this.delayedCancellable.delayedCancellable(cancellable);
        }

        @Override
        public void onSuccess(@Nullable T result) {
            this.safeTerminalStateReset(result, null);
            this.terminate(new TerminalSubscriber(null, result), sub -> sub.onSuccess(result));
        }

        @Override
        public void onError(Throwable t) {
            this.safeTerminalStateReset(null, t);
            this.terminate(new TerminalSubscriber(t, null), sub -> sub.onError(t));
        }

        private void terminate(TerminalSubscriber<T> terminalSubscriber, Consumer<SingleSource.Subscriber<? super T>> terminator) {
            block4: {
                SingleSource.Subscriber<? super T>[] subs;
                SingleSource.Subscriber[] newSubs = (SingleSource.Subscriber[])Array.newInstance(SingleSource.Subscriber.class, 1);
                newSubs[0] = terminalSubscriber;
                do {
                    subs = this.subscribers;
                } while (!newSubscribersUpdater.compareAndSet(this, subs, newSubs));
                Throwable delayedCause = null;
                for (SingleSource.Subscriber subscriber : subs) {
                    try {
                        terminator.accept(subscriber);
                    }
                    catch (Throwable cause) {
                        delayedCause = ThrowableUtils.catchUnexpected(delayedCause, cause);
                    }
                }
                if (delayedCause == null) break block4;
                PlatformDependent.throwException(delayedCause);
            }
        }

        private void safeTerminalStateReset(@Nullable T value, @Nullable Throwable t) {
            Completable completable;
            try {
                completable = (Completable)CacheSingle.this.terminalResubscribe.apply(value, t);
            }
            catch (Throwable cause) {
                LOGGER.warn("terminalStateReset {} threw", (Object)CacheSingle.this.terminalResubscribe, (Object)cause);
                completable = Completable.never();
            }
            completable.whenFinally(() -> CacheSingle.this.state = new State()).subscribe();
        }
    }
}

