/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.mqtt.handler.publish;

import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.puback.PUBACK;
import com.hivemq.mqtt.message.pubcomp.PUBCOMP;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.pubrec.PUBREC;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;

public class FlowControlHandler
extends ChannelDuplexHandler {
    private final AtomicInteger serverSendQuota;
    private final int serverReceiveMaximum;
    private final MqttServerDisconnector serverDisconnector;

    @Inject
    public FlowControlHandler(MqttConfigurationService mqttConfigurationService, MqttServerDisconnector serverDisconnector) {
        this.serverReceiveMaximum = mqttConfigurationService.serverReceiveMaximum();
        this.serverDisconnector = serverDisconnector;
        this.serverSendQuota = new AtomicInteger(this.serverReceiveMaximum);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof PUBLISH) {
            if (this.serverSendQuota.get() < 0) {
                return;
            }
            PUBLISH publish = (PUBLISH)msg;
            if (QoS.AT_MOST_ONCE != publish.getQoS() && this.serverSendQuota.getAndDecrement() == 0) {
                this.serverDisconnector.disconnect(ctx.channel(), "A client (IP: {}) sent too many concurrent PUBLISH messages. Disconnecting client.", "Sent too many concurrent PUBLISH messages", Mqtt5DisconnectReasonCode.RECEIVE_MAXIMUM_EXCEEDED, "Too many concurrent PUBLISH messages sent.");
                return;
            }
        }
        super.channelRead(ctx, msg);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        PUBREC pubrec;
        if (this.serverSendQuota.get() == this.serverReceiveMaximum) {
            super.write(ctx, msg, promise);
            return;
        }
        if (msg instanceof PUBACK) {
            this.serverSendQuota.incrementAndGet();
        }
        if (msg instanceof PUBCOMP) {
            this.serverSendQuota.incrementAndGet();
        }
        if (msg instanceof PUBREC && ((Mqtt5PubRecReasonCode)(pubrec = (PUBREC)msg).getReasonCode()).getCode() >= 128) {
            this.serverSendQuota.incrementAndGet();
        }
        super.write(ctx, msg, promise);
    }

    public int getServerSendQuota() {
        return this.serverSendQuota.get();
    }
}

