/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.mqtt.handler.publish;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ClientConnectionContext;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extensions.handler.IncomingPublishHandler;
import com.hivemq.mqtt.event.PublishDroppedEvent;
import com.hivemq.mqtt.event.PubrelDroppedEvent;
import com.hivemq.mqtt.handler.publish.DropOutgoingPublishesHandler;
import com.hivemq.mqtt.handler.publish.OrderedTopicService;
import com.hivemq.mqtt.message.MessageWithID;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.puback.PUBACK;
import com.hivemq.mqtt.message.pubcomp.PUBCOMP;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.pubrec.PUBREC;
import com.hivemq.mqtt.message.pubrel.PUBREL;
import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.persistence.qos.IncomingMessageFlowPersistence;
import com.hivemq.persistence.util.FutureUtils;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishFlowHandler
extends ChannelDuplexHandler {
    @NotNull
    private static final Logger log = LoggerFactory.getLogger(PublishFlowHandler.class);
    @NotNull
    private static final AtomicLong UNACKNOWLEDGED_PUBLISHES_COUNTER = new AtomicLong();
    @NotNull
    private final IncomingMessageFlowPersistence persistence;
    @NotNull
    private final OrderedTopicService orderedTopicService;
    @NotNull
    private final PublishPollService publishPollService;
    @NotNull
    private final IncomingPublishHandler incomingPublishHandler;
    @NotNull
    private final DropOutgoingPublishesHandler dropOutgoingPublishesHandler;
    @NotNull
    private final Map<Integer, Boolean> qos1AlreadySentMap;

    @Inject
    @VisibleForTesting
    public PublishFlowHandler(@NotNull PublishPollService publishPollService, @NotNull IncomingMessageFlowPersistence persistence, @NotNull OrderedTopicService orderedTopicService, @NotNull IncomingPublishHandler incomingPublishHandler, @NotNull DropOutgoingPublishesHandler dropOutgoingPublishesHandler) {
        this.publishPollService = publishPollService;
        this.persistence = persistence;
        this.orderedTopicService = orderedTopicService;
        this.qos1AlreadySentMap = new HashMap<Integer, Boolean>();
        this.incomingPublishHandler = incomingPublishHandler;
        this.dropOutgoingPublishesHandler = dropOutgoingPublishesHandler;
    }

    public void channelRead(ChannelHandlerContext ctx, @NotNull Object msg) throws Exception {
        if (msg instanceof PUBLISH) {
            this.handlePublish(ctx, (PUBLISH)msg);
        } else if (msg instanceof PUBACK) {
            this.handlePuback(ctx, (PUBACK)msg);
        } else if (msg instanceof PUBREC) {
            this.handlePubrec(ctx, (PUBREC)msg);
        } else if (msg instanceof PUBREL) {
            this.handlePubrel(ctx, (PUBREL)msg);
        } else if (msg instanceof PUBCOMP) {
            this.handlePubcomp(ctx, (PUBCOMP)msg);
        } else {
            super.channelRead(ctx, msg);
        }
    }

    public void write(@NotNull ChannelHandlerContext ctx, @NotNull Object msg, @NotNull ChannelPromise promise) throws Exception {
        boolean flowComplete;
        if (!(msg instanceof PUBLISH || msg instanceof PUBACK || msg instanceof PUBREL)) {
            super.write(ctx, msg, promise);
            return;
        }
        if (msg instanceof PUBACK) {
            PUBACK puback = (PUBACK)msg;
            int messageId = puback.getPacketIdentifier();
            promise.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                if (future.isSuccess()) {
                    this.qos1AlreadySentMap.remove(messageId);
                    if (log.isTraceEnabled()) {
                        log.trace("Client '{}' completed a PUBLISH flow with QoS 1 for packet identifier '{}'", (Object)ctx, (Object)messageId);
                    }
                }
            }));
        }
        if (flowComplete = this.orderedTopicService.handlePublish(ctx.channel(), msg, promise)) {
            return;
        }
        boolean messageDropped = this.dropOutgoingPublishesHandler.checkChannelNotWritable(ctx, msg, promise);
        if (messageDropped) {
            return;
        }
        super.write(ctx, msg, promise);
    }

    public void userEventTriggered(@NotNull ChannelHandlerContext ctx, @NotNull Object evt) throws Exception {
        if (evt instanceof PublishDroppedEvent) {
            PublishDroppedEvent publishDroppedEvent = (PublishDroppedEvent)evt;
            this.orderedTopicService.messageFlowComplete(ctx, publishDroppedEvent.getMessage().getPacketIdentifier());
            return;
        }
        if (evt instanceof PubrelDroppedEvent) {
            PubrelDroppedEvent pubrelDroppedEvent = (PubrelDroppedEvent)evt;
            this.orderedTopicService.messageFlowComplete(ctx, pubrelDroppedEvent.getMessage().getPacketIdentifier());
            return;
        }
        super.userEventTriggered(ctx, evt);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String clientId;
        this.orderedTopicService.handleInactive();
        ClientConnectionContext clientConnectionContext = ClientConnectionContext.of(ctx.channel());
        Long sessionExpiryInterval = clientConnectionContext.getClientSessionExpiryInterval();
        if (sessionExpiryInterval != null && sessionExpiryInterval == 0L && (clientId = clientConnectionContext.getClientId()) != null) {
            this.persistence.delete(clientId);
        }
        super.channelInactive(ctx);
    }

    private void handlePublish(@NotNull ChannelHandlerContext ctx, @NotNull PUBLISH publish) throws Exception {
        String clientId = ClientConnection.of(ctx.channel()).getClientId();
        if (publish.getQoS() == QoS.AT_MOST_ONCE) {
            this.incomingPublishHandler.interceptOrDelegate(ctx, publish, clientId);
        } else if (publish.getQoS() == QoS.AT_LEAST_ONCE) {
            UNACKNOWLEDGED_PUBLISHES_COUNTER.incrementAndGet();
            if (publish.isDuplicateDelivery() && this.qos1AlreadySentMap.get(publish.getPacketIdentifier()) != null) {
                log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored", (Object)clientId, (Object)publish.getPacketIdentifier());
            } else {
                int packetId = publish.getPacketIdentifier();
                this.qos1AlreadySentMap.put(publish.getPacketIdentifier(), true);
                this.firstPublishForMessageIdReceived(ctx, publish, clientId, packetId);
            }
        } else {
            int messageId = publish.getPacketIdentifier();
            MessageWithID savedMessage = this.persistence.get(clientId, messageId);
            if (!(savedMessage instanceof PUBLISH)) {
                this.persistence.addOrReplace(clientId, messageId, publish);
                this.firstPublishForMessageIdReceived(ctx, publish, clientId, messageId);
            } else {
                ctx.writeAndFlush((Object)new PUBREC(messageId));
            }
        }
    }

    private void firstPublishForMessageIdReceived(@NotNull ChannelHandlerContext ctx, @NotNull PUBLISH publish, @NotNull String client, int messageId) throws Exception {
        this.incomingPublishHandler.interceptOrDelegate(ctx, publish, client);
        log.trace("Client {} sent a publish message with id {} which was not forwarded before. This message is processed normally", (Object)client, (Object)messageId);
    }

    private void handlePuback(@NotNull ChannelHandlerContext ctx, PUBACK msg) {
        String clientId = ClientConnection.of(ctx.channel()).getClientId();
        log.trace("Client {}: Received PUBACK", (Object)clientId);
        int messageId = msg.getPacketIdentifier();
        this.orderedTopicService.messageFlowComplete(ctx, messageId);
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Received PUBACK remove message id:[{}] ", (Object)clientId, (Object)messageId);
        }
    }

    private void handlePubrec(@NotNull ChannelHandlerContext ctx, @NotNull PUBREC msg) {
        String clientId = ClientConnection.of(ctx.channel()).getClientId();
        log.trace("Client {}: Received pubrec", (Object)clientId);
        if (msg.getReasonCode() != Mqtt5PubRecReasonCode.SUCCESS && msg.getReasonCode() != Mqtt5PubRecReasonCode.NO_MATCHING_SUBSCRIBERS) {
            this.orderedTopicService.messageFlowComplete(ctx, ((MessageWithID)msg).getPacketIdentifier());
        }
        ListenableFuture<Void> future = this.publishPollService.putPubrelInQueue(clientId, msg.getPacketIdentifier());
        FutureUtils.addExceptionLogger(future);
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Received PUBREC remove message id:[{}]", (Object)clientId, (Object)msg.getPacketIdentifier());
        }
        ctx.channel().writeAndFlush((Object)new PUBREL(msg.getPacketIdentifier()));
    }

    private void handlePubrel(ChannelHandlerContext ctx, PUBREL pubrel) {
        String client = ClientConnection.of(ctx.channel()).getClientId();
        int messageId = pubrel.getPacketIdentifier();
        this.persistence.addOrReplace(client, messageId, pubrel);
        ctx.writeAndFlush((Object)new PUBCOMP(messageId)).addListener((GenericFutureListener)new PubcompSentListener(messageId, client, this.persistence));
    }

    private void handlePubcomp(@NotNull ChannelHandlerContext ctx, @NotNull PUBCOMP msg) {
        String clientId = ClientConnection.of(ctx.channel()).getClientId();
        log.trace("Client {}: Received PUBCOMP", (Object)clientId);
        this.orderedTopicService.messageFlowComplete(ctx, msg.getPacketIdentifier());
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Received PUBCOMP remove message id:[{}]", (Object)clientId, (Object)msg.getPacketIdentifier());
        }
    }

    private static class PubcompSentListener
    implements ChannelFutureListener {
        private final int messageId;
        @NotNull
        private final String client;
        @NotNull
        private final IncomingMessageFlowPersistence persistence;

        PubcompSentListener(int messageId, @NotNull String client, @NotNull IncomingMessageFlowPersistence persistence) {
            this.messageId = messageId;
            this.client = client;
            this.persistence = persistence;
        }

        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                UNACKNOWLEDGED_PUBLISHES_COUNTER.decrementAndGet();
                this.persistence.remove(this.client, this.messageId);
                log.trace("Client '{}' completed a PUBLISH flow with QoS 2 for packet identifier '{}'", (Object)this.client, (Object)this.messageId);
            }
        }
    }
}

