/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.PublisherToSingleOperator;
import io.servicetalk.concurrent.api.internal.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.http.netty.H2ClientParentConnectionContext;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SpliceFlatStreamToMetaSingle<Data, MetaData, Payload>
implements PublisherToSingleOperator<Object, Data> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpliceFlatStreamToMetaSingle.class);
    private final BiFunction<MetaData, Publisher<Payload>, Data> packer;

    SpliceFlatStreamToMetaSingle(BiFunction<MetaData, Publisher<Payload>, Data> packer) {
        this.packer = Objects.requireNonNull(packer);
    }

    @Override
    public PublisherSource.Subscriber<Object> apply(SingleSource.Subscriber<? super Data> subscriber) {
        return new SplicingSubscriber(this, subscriber);
    }

    static final class SplicingSubscriber<Data, MetaData, Payload>
    implements PublisherSource.Subscriber<Object> {
        private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object> maybePayloadSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, Object.class, "maybePayloadSub");
        private static final String CANCELED = "CANCELED";
        private static final String PENDING = "PENDING";
        private static final String EMPTY_COMPLETED = "EMPTY_COMPLETED";
        private static final String EMPTY_COMPLETED_DELIVERED = "EMPTY_COMPLETED_DELIVERED";
        @Nullable
        private volatile Object maybePayloadSub;
        @Nullable
        private PublisherSource.Subscriber<Payload> payloadSubscriber;
        private boolean metaSeenInOnNext;
        @Nullable
        private PublisherSource.Subscription rawSubscription;
        private boolean onSubscribeSent;
        private final SpliceFlatStreamToMetaSingle<Data, MetaData, Payload> parent;
        private final SingleSource.Subscriber<? super Data> dataSubscriber;

        private SplicingSubscriber(SpliceFlatStreamToMetaSingle<Data, MetaData, Payload> parent, SingleSource.Subscriber<? super Data> dataSubscriber) {
            this.parent = parent;
            this.dataSubscriber = dataSubscriber;
        }

        private void cancelData(PublisherSource.Subscription subscription) {
            if (maybePayloadSubUpdater.compareAndSet(this, null, CANCELED)) {
                subscription.cancel();
            }
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription inStreamSubscription) {
            if (!SubscriberUtils.checkDuplicateSubscription(this.rawSubscription, inStreamSubscription)) {
                return;
            }
            this.rawSubscription = inStreamSubscription;
            this.rawSubscription.request(1L);
            if (!this.onSubscribeSent) {
                this.onSubscribeSent = true;
                this.dataSubscriber.onSubscribe(() -> this.cancelData(inStreamSubscription));
            }
        }

        @Override
        public void onNext(@Nullable Object obj) {
            if (this.metaSeenInOnNext) {
                Object payload = obj;
                if (this.payloadSubscriber != null) {
                    this.payloadSubscriber.onNext(payload);
                } else {
                    Object subscriber = this.maybePayloadSub;
                    if (subscriber instanceof PublisherSource.Subscriber) {
                        this.payloadSubscriber = (PublisherSource.Subscriber)subscriber;
                        this.payloadSubscriber.onNext(payload);
                    }
                }
            } else {
                Object data;
                this.ensureResultSubscriberOnSubscribe();
                Object meta = obj;
                this.metaSeenInOnNext = true;
                try {
                    data = ((SpliceFlatStreamToMetaSingle)this.parent).packer.apply(meta, maybePayloadSubUpdater.compareAndSet(this, null, PENDING) ? this.newPayloadPublisher() : Publisher.failed(H2ClientParentConnectionContext.StacklessCancellationException.newInstance("Canceled prematurely from Data", SplicingSubscriber.class, "cancelData(..)")));
                    assert (data != null) : "Packer function must return non-null Data";
                }
                catch (Throwable t) {
                    assert (this.rawSubscription != null);
                    this.rawSubscription.cancel();
                    this.dataSubscriber.onError(t);
                    return;
                }
                this.dataSubscriber.onSuccess(data);
            }
        }

        @Nonnull
        private Publisher<Payload> newPayloadPublisher() {
            return new SubscribablePublisher<Payload>(){

                @Override
                protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSubscriber) {
                    DelayedSubscription delayedSubscription = new DelayedSubscription();
                    newSubscriber.onSubscribe(delayedSubscription);
                    if (maybePayloadSubUpdater.compareAndSet(this, SplicingSubscriber.PENDING, newSubscriber)) {
                        assert (rawSubscription != null);
                        delayedSubscription.delayedSubscription(rawSubscription);
                    } else {
                        Object maybeSubscriber = maybePayloadSub;
                        delayedSubscription.delayedSubscription(EmptySubscriptions.EMPTY_SUBSCRIPTION);
                        if (maybeSubscriber == SplicingSubscriber.EMPTY_COMPLETED && maybePayloadSubUpdater.compareAndSet(this, SplicingSubscriber.EMPTY_COMPLETED, SplicingSubscriber.EMPTY_COMPLETED_DELIVERED)) {
                            newSubscriber.onComplete();
                        } else if (maybeSubscriber instanceof Throwable && maybePayloadSubUpdater.compareAndSet(this, maybeSubscriber, SplicingSubscriber.EMPTY_COMPLETED_DELIVERED)) {
                            newSubscriber.onError((Throwable)maybeSubscriber);
                        } else {
                            newSubscriber.onError(new DuplicateSubscribeException(maybeSubscriber, newSubscriber, "HTTP request/response payload can only be subscribed to once"));
                        }
                    }
                }
            };
        }

        @Override
        public void onError(Throwable t) {
            if (this.payloadSubscriber != null) {
                this.payloadSubscriber.onError(t);
            } else {
                Object maybeSubscriber = maybePayloadSubUpdater.getAndSet(this, t);
                if (maybeSubscriber == CANCELED || !this.metaSeenInOnNext) {
                    this.ensureResultSubscriberOnSubscribe();
                    this.dataSubscriber.onError(t);
                } else if (maybeSubscriber instanceof PublisherSource.Subscriber) {
                    if (maybePayloadSubUpdater.compareAndSet(this, t, EMPTY_COMPLETED_DELIVERED)) {
                        ((PublisherSource.Subscriber)maybeSubscriber).onError(t);
                    } else {
                        ((PublisherSource.Subscriber)maybeSubscriber).onError(new IllegalStateException("Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber + ", failed the race with a duplicate, but neither has seen onNext()"));
                    }
                } else {
                    LOGGER.debug("Terminal error queued for delayed delivery to the payload publisher. If the payload is not subscribed, this event will not be delivered.", t);
                }
            }
        }

        @Override
        public void onComplete() {
            if (this.payloadSubscriber != null) {
                this.payloadSubscriber.onComplete();
            } else {
                Object maybeSubscriber = maybePayloadSubUpdater.getAndSet(this, EMPTY_COMPLETED);
                if (maybeSubscriber instanceof PublisherSource.Subscriber) {
                    if (maybePayloadSubUpdater.compareAndSet(this, EMPTY_COMPLETED, EMPTY_COMPLETED_DELIVERED)) {
                        ((PublisherSource.Subscriber)maybeSubscriber).onComplete();
                    } else {
                        ((PublisherSource.Subscriber)maybeSubscriber).onError(new IllegalStateException("Duplicate Subscribers are not allowed. Existing: " + maybeSubscriber + ", failed the race with a duplicate, but neither has seen onNext()"));
                    }
                } else if (!this.metaSeenInOnNext) {
                    this.ensureResultSubscriberOnSubscribe();
                    this.dataSubscriber.onError(new IllegalStateException("Stream unexpectedly completed without emitting any items"));
                }
            }
        }

        private void ensureResultSubscriberOnSubscribe() {
            assert (!this.metaSeenInOnNext);
            if (!this.onSubscribeSent) {
                this.onSubscribeSent = true;
                this.dataSubscriber.onSubscribe(Cancellable.IGNORE_CANCEL);
            }
        }
    }
}

