/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.mqtt.handler.unsubscribe;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.mqtt.handler.connect.SubscribeMessageBarrier;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.reason.Mqtt5UnsubAckReasonCode;
import com.hivemq.mqtt.message.unsuback.UNSUBACK;
import com.hivemq.mqtt.message.unsubscribe.UNSUBSCRIBE;
import com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence;
import com.hivemq.persistence.clientsession.SharedSubscriptionService;
import com.hivemq.util.Exceptions;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Arrays;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@ChannelHandler.Sharable
public class UnsubscribeHandler
extends SimpleChannelInboundHandler<UNSUBSCRIBE> {
    private static final Logger log = LoggerFactory.getLogger(UnsubscribeHandler.class);
    @NotNull
    private final ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence;
    @NotNull
    private final SharedSubscriptionService sharedSubscriptionService;

    @Inject
    public UnsubscribeHandler(@NotNull ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence, @NotNull SharedSubscriptionService sharedSubscriptionService) {
        this.clientSessionSubscriptionPersistence = clientSessionSubscriptionPersistence;
        this.sharedSubscriptionService = sharedSubscriptionService;
    }

    protected void channelRead0(@NotNull ChannelHandlerContext ctx, @NotNull UNSUBSCRIBE msg) throws Exception {
        SubscribeMessageBarrier.addToPipeline(ctx);
        ClientConnection clientConnection = ClientConnection.of(ctx.channel());
        String clientId = (String)Preconditions.checkNotNull((Object)clientConnection.getClientId());
        UnsubscribeOperationCompletionCallback unsubscribeOperationCompletionCallback = new UnsubscribeOperationCompletionCallback(ctx, this.sharedSubscriptionService, clientConnection.getProtocolVersion(), clientId, msg.getTopics(), msg.getPacketIdentifier());
        if (msg.getTopics().size() == 1) {
            String topic = (String)msg.getTopics().get(0);
            ListenableFuture<Void> future = this.clientSessionSubscriptionPersistence.remove(clientId, topic);
            future.addListener(() -> {
                if (log.isTraceEnabled()) {
                    log.trace("Unsubscribed from topic [{}] for client [{}]", (Object)topic, (Object)clientId);
                }
            }, MoreExecutors.directExecutor());
            Futures.addCallback(future, (FutureCallback)unsubscribeOperationCompletionCallback, (Executor)ctx.executor());
            if (log.isTraceEnabled()) {
                log.trace("Applied all unsubscriptions for client [{}]", (Object)clientId);
            }
            return;
        }
        ListenableFuture<Void> future = this.clientSessionSubscriptionPersistence.removeSubscriptions(clientId, (ImmutableSet<String>)ImmutableSet.copyOf(msg.getTopics()));
        future.addListener(() -> msg.getTopics().forEach(topic -> {
            if (log.isTraceEnabled()) {
                log.trace("Unsubscribed from topic [{}] for client [{}]", topic, (Object)clientId);
            }
        }), MoreExecutors.directExecutor());
        Futures.addCallback(future, (FutureCallback)unsubscribeOperationCompletionCallback, (Executor)ctx.executor());
        if (log.isTraceEnabled()) {
            log.trace("Applied all unsubscriptions for client [{}]", (Object)clientId);
        }
    }

    private static class UnsubscribeOperationCompletionCallback
    implements FutureCallback<Void> {
        @NotNull
        private final ChannelHandlerContext ctx;
        @NotNull
        private final SharedSubscriptionService sharedSubscriptionService;
        @NotNull
        private final ProtocolVersion protocolVersion;
        @NotNull
        private final String clientId;
        @NotNull
        private final ImmutableList<String> topicFilters;
        private final int packetIdentifier;

        UnsubscribeOperationCompletionCallback(@NotNull ChannelHandlerContext ctx, @NotNull SharedSubscriptionService sharedSubscriptionService, @NotNull ProtocolVersion protocolVersion, @NotNull String clientId, @NotNull ImmutableList<String> topicFilters, int packetIdentifier) {
            this.ctx = ctx;
            this.sharedSubscriptionService = sharedSubscriptionService;
            this.protocolVersion = protocolVersion;
            this.clientId = clientId;
            this.topicFilters = topicFilters;
            this.packetIdentifier = packetIdentifier;
        }

        public void onSuccess(@NotNull Void aVoid) {
            for (String topicFilter : this.topicFilters) {
                SharedSubscriptionService.SharedSubscription sharedSubscription = SharedSubscriptionService.checkForSharedSubscription(topicFilter);
                if (sharedSubscription == null) continue;
                this.sharedSubscriptionService.invalidateSharedSubscriberCache(sharedSubscription.getShareName() + "/" + sharedSubscription.getTopicFilter());
                this.sharedSubscriptionService.invalidateSharedSubscriptionCache(this.clientId);
            }
            if (ProtocolVersion.MQTTv5 == this.protocolVersion) {
                Object[] reasonCodes = new Mqtt5UnsubAckReasonCode[this.topicFilters.size()];
                Arrays.fill(reasonCodes, Mqtt5UnsubAckReasonCode.SUCCESS);
                this.ctx.writeAndFlush((Object)new UNSUBACK(this.packetIdentifier, (Mqtt5UnsubAckReasonCode[])reasonCodes));
            } else {
                this.ctx.writeAndFlush((Object)new UNSUBACK(this.packetIdentifier, new Mqtt5UnsubAckReasonCode[0]));
            }
        }

        public void onFailure(@NotNull Throwable throwable) {
            if (ProtocolVersion.MQTTv5 == this.protocolVersion) {
                Object[] reasonCodes = new Mqtt5UnsubAckReasonCode[this.topicFilters.size()];
                Arrays.fill(reasonCodes, Mqtt5UnsubAckReasonCode.UNSPECIFIED_ERROR);
                this.ctx.writeAndFlush((Object)new UNSUBACK(this.packetIdentifier, (Mqtt5UnsubAckReasonCode[])reasonCodes));
            }
            Exceptions.rethrowError("Unable to unsubscribe client " + this.clientId + ".", throwable);
        }
    }
}

