/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSubscriptionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;

public class MQTTProtocolHandler
extends ChannelInboundHandlerAdapter {
    private ConnectionEntry connectionEntry;
    private MQTTConnection connection;
    private MQTTSession session;
    private ActiveMQServer server;
    private MQTTProtocolManager protocolManager;
    private ChannelHandlerContext ctx;
    private final MQTTLogger log = MQTTLogger.LOGGER;
    private boolean stopped = false;

    public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
        this.server = server;
        this.protocolManager = protocolManager;
    }

    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
        this.connectionEntry = entry;
        this.connection = connection;
        this.session = new MQTTSession(this, connection);
    }

    void stop(boolean error) {
        this.stopped = true;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (this.stopped) {
                this.disconnect();
                return;
            }
            MqttMessage message = (MqttMessage)msg;
            if (message.decoderResult().isFailure()) {
                this.log.debug("Bad Message Disconnecting Client.");
                this.disconnect();
                return;
            }
            this.connection.dataReceived();
            MQTTUtil.logMessage(this.session.getState(), message, true);
            switch (message.fixedHeader().messageType()) {
                case CONNECT: {
                    this.handleConnect((MqttConnectMessage)message, ctx);
                    break;
                }
                case CONNACK: {
                    this.handleConnack((MqttConnAckMessage)message);
                    break;
                }
                case PUBLISH: {
                    this.handlePublish((MqttPublishMessage)message);
                    break;
                }
                case PUBACK: {
                    this.handlePuback((MqttPubAckMessage)message);
                    break;
                }
                case PUBREC: {
                    this.handlePubrec(message);
                    break;
                }
                case PUBREL: {
                    this.handlePubrel(message);
                    break;
                }
                case PUBCOMP: {
                    this.handlePubcomp(message);
                    break;
                }
                case SUBSCRIBE: {
                    this.handleSubscribe((MqttSubscribeMessage)message, ctx);
                    break;
                }
                case SUBACK: {
                    this.handleSuback((MqttSubAckMessage)message);
                    break;
                }
                case UNSUBSCRIBE: {
                    this.handleUnsubscribe((MqttUnsubscribeMessage)message);
                    break;
                }
                case UNSUBACK: {
                    this.handleUnsuback((MqttUnsubAckMessage)message);
                    break;
                }
                case PINGREQ: {
                    this.handlePingreq(message, ctx);
                    break;
                }
                case PINGRESP: {
                    this.handlePingresp(message);
                    break;
                }
                case DISCONNECT: {
                    this.handleDisconnect(message);
                    break;
                }
                default: {
                    this.disconnect();
                    break;
                }
            }
        }
        catch (Exception e) {
            this.log.debug("Error processing Control Packet, Disconnecting Client", e);
            this.disconnect();
        }
    }

    void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        this.connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500;
        String clientId = connect.payload().clientIdentifier();
        this.session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().password(), connect.variableHeader().isWillFlag(), connect.payload().willMessage(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
    }

    void disconnect() {
        this.session.getConnectionManager().disconnect();
    }

    void sendConnack(MqttConnectReturnCode returnCode) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
        MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
        this.ctx.write((Object)message);
        this.ctx.flush();
    }

    void handleConnack(MqttConnAckMessage message) {
        this.log.debug("Received invalid CONNACK from client: " + this.session.getSessionState().getClientId());
        this.log.debug("Disconnecting client: " + this.session.getSessionState().getClientId());
        this.disconnect();
    }

    void handlePublish(MqttPublishMessage message) throws Exception {
        this.protocolManager.invokeIncoming((MqttMessage)message, this.connection);
        this.session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
    }

    void sendPubAck(int messageId) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBACK);
    }

    void sendPubRel(int messageId) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREL);
    }

    void sendPubRec(int messageId) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREC);
    }

    void sendPubComp(int messageId) {
        this.sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBCOMP);
    }

    void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType) {
        MqttQoS qos = messageType == MqttMessageType.PUBREL ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
        MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, false, 0);
        MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)messageId));
        this.ctx.write((Object)rel);
        this.ctx.flush();
    }

    void handlePuback(MqttPubAckMessage message) throws Exception {
        this.session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
    }

    void handlePubrec(MqttMessage message) throws Exception {
        int messageId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        this.session.getMqttPublishManager().handlePubRec(messageId);
    }

    void handlePubrel(MqttMessage message) {
        int messageId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        this.session.getMqttPublishManager().handlePubRel(messageId);
    }

    void handlePubcomp(MqttMessage message) throws Exception {
        int messageId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        this.session.getMqttPublishManager().handlePubComp(messageId);
    }

    void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception {
        MQTTSubscriptionManager subscriptionManager = this.session.getSubscriptionManager();
        int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
        MQTTUtil.logMessage(this.session.getSessionState(), (MqttMessage)ack, false);
        ctx.write((Object)ack);
        ctx.flush();
    }

    void handleSuback(MqttSubAckMessage message) {
        this.disconnect();
    }

    void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
        this.session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
        MQTTUtil.logMessage(this.session.getSessionState(), (MqttMessage)m, false);
        this.ctx.write((Object)m);
        this.ctx.flush();
    }

    void handleUnsuback(MqttUnsubAckMessage message) {
        this.disconnect();
    }

    void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
        MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
        MQTTUtil.logMessage(this.session.getSessionState(), pingResp, false);
        ctx.write((Object)pingResp);
        ctx.flush();
    }

    void handlePingresp(MqttMessage message) {
        this.disconnect();
    }

    void handleDisconnect(MqttMessage message) {
        if (this.session.getSessionState() != null) {
            this.session.getState().deleteWillMessage();
        }
        this.disconnect();
    }

    protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount) {
        boolean redelivery = qosLevel == 0 ? false : deliveryCount > 0;
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf((int)qosLevel), false, 0);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
        MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload);
        this.protocolManager.invokeOutgoing((MqttMessage)publish, this.connection);
        MQTTUtil.logMessage(this.session.getSessionState(), (MqttMessage)publish, false);
        this.ctx.write((Object)publish);
        this.ctx.flush();
        return 1;
    }

    ActiveMQServer getServer() {
        return this.server;
    }
}

