/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.clients.ptrans.backend;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.List;
import org.hawkular.metrics.client.common.Batcher;
import org.hawkular.metrics.client.common.BoundMetricFifo;
import org.hawkular.metrics.client.common.SingleMetric;
import org.hawkular.metrics.clients.ptrans.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class RestForwardingHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(RestForwardingHandler.class);
    public static final String TENANT_HEADER_NAME = "Hawkular-Tenant";
    private final String restHost;
    private final int restPort;
    private final String restUri;
    private final String tenant;
    private final int restCloseAfterRequests;
    BoundMetricFifo fifo;
    AttributeKey<List<SingleMetric>> listKey = AttributeKey.valueOf((String)"listToSend");
    private Channel senderChannel;
    private int sendCounter = 0;
    private String localHostName;
    boolean isConnecting = false;
    final Object connectingMutex;
    private long numberOfMetrics = 0L;

    public RestForwardingHandler(Configuration configuration) {
        LOG.debug("RestForwardingHandler init");
        URI restUrl = configuration.getRestUrl();
        this.restHost = restUrl.getHost();
        this.restPort = restUrl.getPort();
        this.restUri = restUrl.getPath();
        this.tenant = configuration.getTenant();
        this.restCloseAfterRequests = configuration.getRestCloseAfterRequests();
        this.connectingMutex = this;
        this.fifo = new BoundMetricFifo(10, configuration.getSpoolSize());
        try {
            this.localHostName = InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (UnknownHostException e) {
            LOG.error(e.getLocalizedMessage());
            this.localHostName = "- unknown host -";
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        List in = (List)msg;
        LOG.trace("Received some metrics: {}", (Object)in);
        this.fifo.addAll((Collection)in);
        Object object = this.connectingMutex;
        synchronized (object) {
            if (this.isConnecting) {
                return;
            }
        }
        if (this.senderChannel != null) {
            this.sendToChannel(this.senderChannel);
            return;
        }
        ChannelFuture cf = this.connectRestServer(ctx.channel().eventLoop().parent());
        cf.addListener((GenericFutureListener)new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void operationComplete(ChannelFuture future) throws Exception {
                Object object = RestForwardingHandler.this.connectingMutex;
                synchronized (object) {
                    RestForwardingHandler.this.isConnecting = false;
                }
                if (!future.isSuccess()) {
                    Throwable cause = future.cause();
                    if (cause instanceof ConnectException) {
                        LOG.warn("Sending failed: " + cause.getLocalizedMessage());
                    } else {
                        LOG.warn("Something went wrong: " + cause);
                    }
                } else {
                    RestForwardingHandler.this.senderChannel = future.channel();
                    RestForwardingHandler.this.sendToChannel(RestForwardingHandler.this.senderChannel);
                }
            }
        });
    }

    private void sendToChannel(final Channel ch) {
        LOG.trace("Sending to channel {}", (Object)ch);
        final List metricsToSend = this.fifo.getList();
        String payload = Batcher.metricListToJson((Collection)metricsToSend);
        ByteBuf content = Unpooled.copiedBuffer((CharSequence)payload, (Charset)CharsetUtil.UTF_8);
        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, this.restUri, content);
        HttpHeaders.setHeader((HttpMessage)request, (String)"Content-Type", (Object)"application/json;charset=utf-8");
        HttpHeaders.setContentLength((HttpMessage)request, (long)content.readableBytes());
        HttpHeaders.setKeepAlive((HttpMessage)request, (boolean)true);
        HttpHeaders.setHeader((HttpMessage)request, (String)TENANT_HEADER_NAME, (Object)this.tenant);
        ch.attr(this.listKey).set((Object)metricsToSend);
        ch.writeAndFlush((Object)request).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    ch.close();
                    RestForwardingHandler.this.senderChannel = null;
                    LOG.error("Sending to the hawkular-metrics server failed: " + future.cause());
                } else {
                    RestForwardingHandler.this.sendCounter++;
                    if (RestForwardingHandler.this.sendCounter >= RestForwardingHandler.this.restCloseAfterRequests) {
                        SingleMetric perfMetric = new SingleMetric(RestForwardingHandler.this.localHostName + ".ptrans.counter", System.currentTimeMillis(), Double.valueOf(RestForwardingHandler.this.numberOfMetrics + (long)metricsToSend.size()));
                        RestForwardingHandler.this.fifo.offer(perfMetric);
                        LOG.trace("Doing a periodic close after {} requests and {} items", (Object)RestForwardingHandler.this.restCloseAfterRequests, (Object)RestForwardingHandler.this.numberOfMetrics);
                        RestForwardingHandler.this.numberOfMetrics = 0L;
                        ch.close();
                        RestForwardingHandler.this.senderChannel = null;
                        RestForwardingHandler.this.sendCounter = 0;
                    }
                }
            }
        });
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ChannelFuture connectRestServer(EventLoopGroup group) throws Exception {
        Object object = this.connectingMutex;
        synchronized (object) {
            this.isConnecting = true;
        }
        Bootstrap clientBootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)clientBootstrap.group(group)).channel(NioSocketChannel.class)).remoteAddress(this.restHost, this.restPort).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new ChannelHandler[]{new HttpRequestEncoder()});
                pipeline.addLast(new ChannelHandler[]{new HttpResponseDecoder()});
                pipeline.addLast(new ChannelHandler[]{new HttpObjectAggregator(1024)});
                pipeline.addLast(new ChannelHandler[]{new HttpStatusWatcher()});
            }
        });
        return clientBootstrap.connect();
    }

    class HttpStatusWatcher
    extends ChannelInboundHandlerAdapter {
        private final Logger logger = LoggerFactory.getLogger(HttpStatusWatcher.class);

        HttpStatusWatcher() {
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpResponse) {
                FullHttpResponse response = (FullHttpResponse)msg;
                HttpResponseStatus status = response.getStatus();
                if (status.equals((Object)HttpResponseStatus.NO_CONTENT) || status.equals((Object)HttpResponseStatus.OK)) {
                    List metricsSent = (List)ctx.channel().attr(RestForwardingHandler.this.listKey).getAndRemove();
                    if (metricsSent != null) {
                        RestForwardingHandler.this.fifo.cleanout((Collection)metricsSent);
                        RestForwardingHandler.this.numberOfMetrics = RestForwardingHandler.this.numberOfMetrics + (long)metricsSent.size();
                        this.logger.debug("sent {} items", (Object)metricsSent.size());
                    }
                } else {
                    this.logger.warn("Send to rest-server failed:" + status);
                }
            } else {
                this.logger.error("msg " + msg);
            }
        }
    }
}

