/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.DefaultContextMap;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.ProxyResponseException;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.DeferSslHandler;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ProxyConnectConnectionFactoryFilter<ResolvedAddress, C extends FilterableStreamingHttpConnection>
implements ConnectionFactoryFilter<ResolvedAddress, C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProxyConnectConnectionFactoryFilter.class);
    private final String connectAddress;

    ProxyConnectConnectionFactoryFilter(CharSequence connectAddress) {
        this.connectAddress = connectAddress.toString();
    }

    @Override
    public ConnectionFactory<ResolvedAddress, C> create(ConnectionFactory<ResolvedAddress, C> original) {
        return new ProxyFilter(original);
    }

    static void logUnexpectedAddress(@Nullable Object current, Object expected, Logger logger) {
        if (current != null && !expected.equals(current)) {
            logger.info("Observed unexpected value for {}: {}, overridden with: {}", HttpContextKeys.HTTP_TARGET_ADDRESS_BEHIND_PROXY, current, expected);
        }
    }

    @Override
    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    private final class ProxyFilter
    extends DelegatingConnectionFactory<ResolvedAddress, C> {
        private ProxyFilter(ConnectionFactory<ResolvedAddress, C> delegate) {
            super(delegate);
        }

        @Override
        public Single<C> newConnection(ResolvedAddress resolvedAddress, @Nullable ContextMap context, @Nullable TransportObserver observer) {
            return Single.defer(() -> {
                ContextMap contextMap = context != null ? context : new DefaultContextMap();
                ProxyConnectConnectionFactoryFilter.logUnexpectedAddress(contextMap.put(HttpContextKeys.HTTP_TARGET_ADDRESS_BEHIND_PROXY, ProxyConnectConnectionFactoryFilter.this.connectAddress), ProxyConnectConnectionFactoryFilter.this.connectAddress, LOGGER);
                return this.delegate().newConnection(resolvedAddress, contextMap, observer).flatMap(c -> {
                    try {
                        return c.request(c.connect(ProxyConnectConnectionFactoryFilter.this.connectAddress).addHeader(HttpHeaderNames.CONTENT_LENGTH, HttpHeaderValues.ZERO)).flatMap(response -> this.handleConnectResponse((Object)c, (StreamingHttpResponse)response)).onErrorResume(t -> c.closeAsync().concat(Single.failed(t)));
                    }
                    catch (Throwable t2) {
                        return c.closeAsync().concat(Single.failed(t2));
                    }
                }).shareContextOnSubscribe();
            });
        }

        private Single<C> handleConnectResponse(C connection, StreamingHttpResponse response) {
            if (response.status().statusClass() != HttpResponseStatus.StatusClass.SUCCESSFUL_2XX) {
                return Single.failed(new ProxyResponseException("Non-successful response from proxy CONNECT " + ProxyConnectConnectionFactoryFilter.this.connectAddress, response.status()));
            }
            Channel channel = ((NettyConnectionContext)((Object)connection.connectionContext())).nettyChannel();
            final SingleSource.Processor processor = Processors.newSingleProcessor();
            channel.pipeline().addLast(new ChannelInboundHandlerAdapter((FilterableStreamingHttpConnection)connection){
                final /* synthetic */ FilterableStreamingHttpConnection val$connection;
                {
                    this.val$connection = filterableStreamingHttpConnection;
                }

                @Override
                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                    if (evt instanceof SslHandshakeCompletionEvent) {
                        SslHandshakeCompletionEvent event = (SslHandshakeCompletionEvent)evt;
                        if (event.isSuccess()) {
                            processor.onSuccess(this.val$connection);
                        } else {
                            processor.onError(event.cause());
                        }
                    }
                    ctx.fireUserEventTriggered(evt);
                }
            });
            DeferSslHandler deferSslHandler = channel.pipeline().get(DeferSslHandler.class);
            if (deferSslHandler == null) {
                return Single.failed(new IllegalStateException("Failed to find a handler of type " + DeferSslHandler.class + " in channel pipeline."));
            }
            deferSslHandler.ready();
            return response.messageBody().ignoreElements().concat(SourceAdapters.fromSource(processor));
        }
    }
}

