/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;
import io.servicetalk.http.api.AbstractServiceAdapterHolder;
import io.servicetalk.http.api.BlockingStreamingHttpService;
import io.servicetalk.http.api.DefaultBlockingStreamingHttpServerResponse;
import io.servicetalk.http.api.DefaultHttpExecutionStrategy;
import io.servicetalk.http.api.DefaultHttpResponseMetaData;
import io.servicetalk.http.api.DefaultPayloadInfo;
import io.servicetalk.http.api.DefaultStreamingHttpResponse;
import io.servicetalk.http.api.HeaderUtils;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpPayloadWriter;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.oio.api.internal.PayloadWriterUtils;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

final class BlockingStreamingToStreamingService
extends AbstractServiceAdapterHolder {
    private static final HttpExecutionStrategy DEFAULT_STRATEGY = DefaultHttpExecutionStrategy.OFFLOAD_RECEIVE_META_STRATEGY;
    private final BlockingStreamingHttpService original;

    BlockingStreamingToStreamingService(BlockingStreamingHttpService original, HttpExecutionStrategy strategy) {
        super(HttpExecutionStrategies.defaultStrategy() == strategy ? DEFAULT_STRATEGY : strategy);
        this.original = Objects.requireNonNull(original);
    }

    @Override
    public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx, final StreamingHttpRequest request, StreamingHttpResponseFactory responseFactory) {
        return new Single<StreamingHttpResponse>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            protected void handleSubscribe(SingleSource.Subscriber<? super StreamingHttpResponse> subscriber) {
                final ThreadInterruptingCancellable tiCancellable = new ThreadInterruptingCancellable(Thread.currentThread());
                try {
                    subscriber.onSubscribe(tiCancellable);
                }
                catch (Throwable cause) {
                    SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, cause);
                    return;
                }
                CompletableSource.Processor exceptionProcessor = Processors.newCompletableProcessor();
                BufferHttpPayloadWriter payloadWriter = new BufferHttpPayloadWriter(ctx.headersFactory().newTrailers());
                DefaultBlockingStreamingHttpServerResponse response = null;
                try {
                    Consumer<DefaultHttpResponseMetaData> sendMeta = metaData -> {
                        DefaultStreamingHttpResponse result;
                        try {
                            boolean addTrailers;
                            HttpHeaders headers = metaData.headers();
                            HttpProtocolVersion version = metaData.version();
                            boolean bl = addTrailers = version.major() > 1 || HeaderUtils.isTransferEncodingChunked(headers);
                            if (!addTrailers && HttpProtocolVersion.h1TrailersSupported(version) && !HeaderUtils.hasContentLength(headers) && !HttpRequestMethod.HEAD.equals(request.method())) {
                                headers.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                addTrailers = true;
                            }
                            Publisher<Object> messageBody = SourceAdapters.fromSource(exceptionProcessor).merge(payloadWriter.connect());
                            if (addTrailers) {
                                messageBody = messageBody.concat(1.succeeded(payloadWriter.trailers()));
                            }
                            messageBody = messageBody.beforeSubscription(() -> new PublisherSource.Subscription(){

                                @Override
                                public void request(long n) {
                                }

                                @Override
                                public void cancel() {
                                    tiCancellable.cancel();
                                }
                            });
                            result = new DefaultStreamingHttpResponse(metaData.status(), version, headers, metaData.context0(), ctx.executionContext().bufferAllocator(), messageBody, DefaultPayloadInfo.forTransportReceive(false, version, headers), ctx.headersFactory());
                        }
                        catch (Throwable t) {
                            subscriber.onError(t);
                            throw t;
                        }
                        subscriber.onSuccess(result);
                    };
                    response = new DefaultBlockingStreamingHttpServerResponse(HttpResponseStatus.OK, request.version(), ctx.headersFactory().newHeaders(), payloadWriter, ctx.executionContext().bufferAllocator(), sendMeta);
                    BlockingStreamingToStreamingService.this.original.handle(ctx, request.toBlockingStreamingRequest(), response);
                    exceptionProcessor.onComplete();
                }
                catch (Throwable cause) {
                    tiCancellable.setDone(cause);
                    if (response == null || response.markMetaSent()) {
                        SubscriberUtils.safeOnError(subscriber, cause);
                    } else {
                        try {
                            exceptionProcessor.onError(cause);
                        }
                        finally {
                            PayloadWriterUtils.safeClose(payloadWriter, cause);
                        }
                    }
                    return;
                }
                tiCancellable.setDone();
            }
        };
    }

    @Override
    public Completable closeAsync() {
        return Completable.fromCallable(() -> {
            this.original.close();
            return null;
        });
    }

    @Override
    public Completable closeAsyncGracefully() {
        return Completable.fromCallable(() -> {
            this.original.closeGracefully();
            return null;
        });
    }

    private static final class BufferHttpPayloadWriter
    implements HttpPayloadWriter<Buffer> {
        private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter();
        private final HttpHeaders trailers;

        BufferHttpPayloadWriter(HttpHeaders trailers) {
            this.trailers = trailers;
        }

        @Override
        public void write(Buffer object) throws IOException {
            this.payloadWriter.write(object);
        }

        @Override
        public void flush() throws IOException {
            this.payloadWriter.flush();
        }

        @Override
        public void close() throws IOException {
            this.payloadWriter.close();
        }

        @Override
        public void close(Throwable cause) throws IOException {
            this.payloadWriter.close(cause);
        }

        @Override
        public HttpHeaders trailers() {
            return this.trailers;
        }

        Publisher<Buffer> connect() {
            return this.payloadWriter.connect();
        }
    }
}

