/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.mqtt.handler.publish;

import com.codahale.metrics.Counter;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.metrics.MetricsHolder;
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.handler.publish.PublishWriteFailedListener;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.LinkedList;
import java.util.List;

public class PublishFlushHandler
extends ChannelInboundHandlerAdapter
implements Runnable {
    @Nullable
    private ChannelHandlerContext ctx;
    @NotNull
    private final LinkedList<PublishWithFuture> messagesToWrite = new LinkedList();
    @NotNull
    private final Counter channelNotWritable;
    private final int maxWritesBeforeFlush;
    private boolean wasWritable = true;

    public PublishFlushHandler(@NotNull MetricsHolder metricsHolder) {
        this.channelNotWritable = metricsHolder.getChannelNotWritableCounter();
        this.maxWritesBeforeFlush = InternalConfigurations.COUNT_OF_PUBLISHES_WRITTEN_TO_CHANNEL_TO_TRIGGER_FLUSH.get();
    }

    public void handlerAdded(@NotNull ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    public void channelWritabilityChanged(@NotNull ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        if (channel.isWritable() && !this.wasWritable) {
            this.wasWritable = true;
            this.channelNotWritable.dec();
            channel.eventLoop().execute((Runnable)this);
        }
        ctx.fireChannelWritabilityChanged();
    }

    public void channelInactive(@NotNull ChannelHandlerContext ctx) throws Exception {
        this.handleChannelInactiveState();
        super.channelInactive(ctx);
    }

    private void handleChannelInactiveState() {
        while (!this.messagesToWrite.isEmpty()) {
            this.messagesToWrite.poll().getFuture().set((Object)PublishStatus.NOT_CONNECTED);
        }
    }

    public void sendPublishes(@NotNull List<PublishWithFuture> publishes) {
        assert (this.ctx != null) : "ctx can not be null because sendPublishes is called after handlerAdded";
        this.ctx.channel().eventLoop().execute(() -> {
            this.messagesToWrite.addAll(publishes);
            if (this.ctx.channel().isActive()) {
                this.consumeQueue();
            } else {
                this.handleChannelInactiveState();
            }
        });
    }

    @Override
    public void run() {
        this.consumeQueue();
    }

    private void consumeQueue() {
        assert (this.ctx != null) : "ctx can not be null because consumeQueue is called after handlerAdded";
        int written = 0;
        while (!this.messagesToWrite.isEmpty()) {
            if (!this.ctx.channel().isWritable()) {
                if (!this.wasWritable) break;
                this.wasWritable = false;
                this.channelNotWritable.inc();
                break;
            }
            PublishWithFuture publish = this.messagesToWrite.poll();
            this.ctx.write((Object)publish).addListener((GenericFutureListener)new PublishWriteFailedListener(publish.getFuture()));
            if (++written < this.maxWritesBeforeFlush) continue;
            this.ctx.flush();
            written = 0;
        }
        if (written > 0) {
            this.ctx.flush();
        }
    }
}

