/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.persistence.clientsession;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extensions.iteration.ChunkCursor;
import com.hivemq.extensions.iteration.Chunker;
import com.hivemq.extensions.iteration.MultipleChunkResult;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.mqtt.topic.SubscriptionFlag;
import com.hivemq.mqtt.topic.TopicFilter;
import com.hivemq.mqtt.topic.tree.LocalTopicTree;
import com.hivemq.persistence.AbstractPersistence;
import com.hivemq.persistence.ProducerQueues;
import com.hivemq.persistence.SingleWriterService;
import com.hivemq.persistence.clientsession.ClientSession;
import com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence;
import com.hivemq.persistence.clientsession.SharedSubscriptionService;
import com.hivemq.persistence.clientsession.Subscription;
import com.hivemq.persistence.clientsession.callback.SubscriptionResult;
import com.hivemq.persistence.connection.ConnectionPersistence;
import com.hivemq.persistence.local.ClientSessionLocalPersistence;
import com.hivemq.persistence.local.ClientSessionSubscriptionLocalPersistence;
import java.util.HashSet;
import java.util.Map;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LazySingleton
public class ClientSessionSubscriptionPersistenceImpl
extends AbstractPersistence
implements ClientSessionSubscriptionPersistence {
    private static final Logger log = LoggerFactory.getLogger(ClientSessionSubscriptionPersistenceImpl.class);
    @NotNull
    private final ClientSessionSubscriptionLocalPersistence localPersistence;
    @NotNull
    private final LocalTopicTree topicTree;
    @NotNull
    private final SharedSubscriptionService sharedSubscriptionService;
    @NotNull
    private final ConnectionPersistence connectionPersistence;
    @NotNull
    private final ProducerQueues singleWriter;
    @NotNull
    private final ClientSessionLocalPersistence clientSessionLocalPersistence;
    @NotNull
    private final PublishPollService publishPollService;
    @NotNull
    private final Chunker chunker;
    @NotNull
    private final MqttServerDisconnector mqttServerDisconnector;

    @Inject
    ClientSessionSubscriptionPersistenceImpl(@NotNull ClientSessionSubscriptionLocalPersistence localPersistence, @NotNull LocalTopicTree topicTree, @NotNull SharedSubscriptionService sharedSubscriptionService, @NotNull SingleWriterService singleWriterService, @NotNull ConnectionPersistence connectionPersistence, @NotNull ClientSessionLocalPersistence clientSessionLocalPersistence, @NotNull PublishPollService publishPollService, @NotNull Chunker chunker, @NotNull MqttServerDisconnector mqttServerDisconnector) {
        this.localPersistence = localPersistence;
        this.topicTree = topicTree;
        this.sharedSubscriptionService = sharedSubscriptionService;
        this.connectionPersistence = connectionPersistence;
        this.clientSessionLocalPersistence = clientSessionLocalPersistence;
        this.publishPollService = publishPollService;
        this.chunker = chunker;
        this.mqttServerDisconnector = mqttServerDisconnector;
        this.singleWriter = singleWriterService.getSubscriptionQueue();
    }

    @Override
    @NotNull
    public ImmutableSet<Topic> getSubscriptions(@NotNull String client) {
        Preconditions.checkNotNull((Object)client, (Object)"Client id must not be null");
        return this.localPersistence.getSubscriptions(client);
    }

    @Override
    @NotNull
    public ListenableFuture<SubscriptionResult> addSubscription(@NotNull String client, @NotNull Topic topic) {
        try {
            ListenableFuture<Void> persistFuture;
            boolean subscriberExisted;
            Preconditions.checkNotNull((Object)client, (Object)"Client id must not be null");
            Preconditions.checkNotNull((Object)topic, (Object)"Topic must not be null");
            long timestamp = System.currentTimeMillis();
            ClientSession session = this.clientSessionLocalPersistence.getSession(client);
            if (session == null) {
                return Futures.immediateFuture(null);
            }
            SharedSubscriptionService.SharedSubscription sharedSubscription = SharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
            if (sharedSubscription == null) {
                subscriberExisted = this.topicTree.addTopic(client, topic, SubscriptionFlag.getDefaultFlags(false, topic.isRetainAsPublished(), topic.isNoLocal()), null);
                persistFuture = this.singleWriter.submit(client, bucketIndex -> {
                    this.localPersistence.addSubscription(client, topic, timestamp, bucketIndex);
                    return null;
                });
            } else {
                if (sharedSubscription.getTopicFilter().isEmpty()) {
                    this.disconnectSharedSubscriberWithEmptyTopic(client);
                    return Futures.immediateFuture(null);
                }
                if (topic.getQoS() == QoS.EXACTLY_ONCE) {
                    topic.setQoS(QoS.AT_LEAST_ONCE);
                }
                Topic sharedTopic = new Topic(sharedSubscription.getTopicFilter(), topic.getQoS(), topic.isNoLocal(), topic.isRetainAsPublished(), topic.getRetainHandling(), topic.getSubscriptionIdentifier());
                subscriberExisted = this.topicTree.addTopic(client, sharedTopic, SubscriptionFlag.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), sharedSubscription.getShareName());
                Subscription subscription = new Subscription(sharedTopic, SubscriptionFlag.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), sharedSubscription.getShareName());
                persistFuture = this.singleWriter.submit(client, bucketIndex -> {
                    this.localPersistence.addSubscription(client, topic, timestamp, bucketIndex);
                    this.invalidateSharedSubscriptionCacheAndPoll(client, (ImmutableSet<Subscription>)ImmutableSet.of((Object)subscription));
                    return null;
                });
            }
            return Futures.whenAllComplete((ListenableFuture[])new ListenableFuture[]{persistFuture}).call(() -> new SubscriptionResult(topic, subscriberExisted, sharedSubscription == null ? null : sharedSubscription.getShareName()), MoreExecutors.directExecutor());
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<ImmutableList<SubscriptionResult>> addSubscriptions(@NotNull String client, @NotNull ImmutableSet<Topic> topics) {
        try {
            Preconditions.checkNotNull((Object)client, (Object)"Client id must not be null");
            Preconditions.checkNotNull(topics, (Object)"Topics must not be null");
            return this.addBatchedTopics(client, topics);
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> removeSubscriptions(@NotNull String client, @NotNull ImmutableSet<String> topics) {
        try {
            Preconditions.checkNotNull((Object)client, (Object)"Client id must not be null");
            Preconditions.checkNotNull(topics, (Object)"Topics must not be null");
            return this.removeBatchedTopics(client, topics);
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> remove(@NotNull String client, @NotNull String topic) {
        try {
            Preconditions.checkNotNull((Object)client, (Object)"Client id must not be null");
            Preconditions.checkNotNull((Object)topic, (Object)"Topic must not be null");
            long timestamp = System.currentTimeMillis();
            SharedSubscriptionService.SharedSubscription sharedSubscription = SharedSubscriptionService.checkForSharedSubscription(topic);
            if (sharedSubscription == null) {
                this.topicTree.removeSubscriber(client, topic, null);
            } else {
                if (sharedSubscription.getTopicFilter().isEmpty()) {
                    this.disconnectSharedSubscriberWithEmptyTopic(client);
                    return Futures.immediateFuture(null);
                }
                this.topicTree.removeSubscriber(client, sharedSubscription.getTopicFilter(), sharedSubscription.getShareName());
            }
            ListenableFuture<Void> persistFuture = this.singleWriter.submit(client, bucketIndex -> {
                this.localPersistence.remove(client, topic, timestamp, bucketIndex);
                return null;
            });
            return Futures.whenAllComplete((ListenableFuture[])new ListenableFuture[]{persistFuture}).call(() -> (Void)persistFuture.get(), MoreExecutors.directExecutor());
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> removeAll(@NotNull String clientId) {
        try {
            Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
            ImmutableSet<Topic> topics = this.localPersistence.getSubscriptions(clientId);
            HashSet<TopicFilter> subscriptions = new HashSet<TopicFilter>();
            for (Topic topic : topics) {
                SharedSubscriptionService.SharedSubscription sharedSubscription = SharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
                if (sharedSubscription == null) {
                    subscriptions.add(new TopicFilter(topic.getTopic(), null));
                    continue;
                }
                subscriptions.add(new TopicFilter(sharedSubscription.getTopicFilter(), sharedSubscription.getShareName()));
            }
            for (TopicFilter subscription : subscriptions) {
                this.topicTree.removeSubscriber(clientId, subscription.getTopic(), subscription.getSharedName());
            }
            return this.removeAllLocally(clientId);
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> removeAllLocally(@NotNull String clientId) {
        return this.singleWriter.submit(clientId, bucketIndex -> {
            this.localPersistence.removeAll(clientId, System.currentTimeMillis(), bucketIndex);
            return null;
        });
    }

    @NotNull
    private ListenableFuture<ImmutableList<SubscriptionResult>> addBatchedTopics(@NotNull String clientId, @NotNull ImmutableSet<Topic> topics) {
        long timestamp = System.currentTimeMillis();
        ClientSession session = this.clientSessionLocalPersistence.getSession(clientId);
        if (session == null) {
            return Futures.immediateFuture(null);
        }
        ImmutableSet.Builder sharedSubs = new ImmutableSet.Builder();
        HashSet<Subscription> subscriptions = new HashSet<Subscription>();
        for (Object topic : topics) {
            SharedSubscriptionService.SharedSubscription sharedSubscription = SharedSubscriptionService.checkForSharedSubscription(((Topic)topic).getTopic());
            if (sharedSubscription == null) {
                subscriptions.add(new Subscription((Topic)topic, SubscriptionFlag.getDefaultFlags(false, ((Topic)topic).isRetainAsPublished(), ((Topic)topic).isNoLocal()), null));
                continue;
            }
            if (sharedSubscription.getTopicFilter().isEmpty()) {
                this.disconnectSharedSubscriberWithEmptyTopic(clientId);
                return Futures.immediateFuture(null);
            }
            if (((Topic)topic).getQoS() == QoS.EXACTLY_ONCE) {
                ((Topic)topic).setQoS(QoS.AT_LEAST_ONCE);
            }
            Subscription sharedSub = new Subscription(new Topic(sharedSubscription.getTopicFilter(), ((Topic)topic).getQoS(), ((Topic)topic).isNoLocal(), ((Topic)topic).isRetainAsPublished(), ((Topic)topic).getRetainHandling(), ((Topic)topic).getSubscriptionIdentifier()), SubscriptionFlag.getDefaultFlags(true, ((Topic)topic).isRetainAsPublished(), ((Topic)topic).isNoLocal()), sharedSubscription.getShareName());
            sharedSubs.add((Object)sharedSub);
            subscriptions.add(sharedSub);
        }
        ImmutableList.Builder subscriptionResultBuilder = ImmutableList.builder();
        for (Subscription subscription : subscriptions) {
            boolean subscriberExisted = this.topicTree.addTopic(clientId, subscription.getTopic(), subscription.getFlags(), subscription.getSharedGroup());
            subscriptionResultBuilder.add((Object)new SubscriptionResult(subscription.getTopic(), subscriberExisted, subscription.getSharedGroup()));
        }
        ListenableFuture<Void> persistFuture = this.singleWriter.submit(clientId, bucketIndex -> {
            this.localPersistence.addSubscriptions(clientId, topics, timestamp, bucketIndex);
            return null;
        });
        this.invalidateSharedSubscriptionCacheAndPoll(clientId, (ImmutableSet<Subscription>)sharedSubs.build());
        return Futures.whenAllComplete((ListenableFuture[])new ListenableFuture[]{persistFuture}).call(() -> subscriptionResultBuilder.build(), MoreExecutors.directExecutor());
    }

    @Override
    public void invalidateSharedSubscriptionCacheAndPoll(@NotNull String clientId, @NotNull ImmutableSet<Subscription> sharedSubs) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must never be null");
        Preconditions.checkNotNull(sharedSubs, (Object)"Subscriptions must never be null");
        ClientSession session = this.clientSessionLocalPersistence.getSession(clientId);
        if (session != null && !session.isConnected() || sharedSubs.isEmpty()) {
            return;
        }
        ClientConnection clientConnection = this.connectionPersistence.get(clientId);
        if (clientConnection != null && clientConnection.getChannel().isActive()) {
            for (Subscription sharedSub : sharedSubs) {
                Topic topic = sharedSub.getTopic();
                String sharedSubId = sharedSub.getSharedGroup() + "/" + topic.getTopic();
                this.publishPollService.pollSharedPublishesForClient(clientId, sharedSubId, topic.getQoS().getQosNumber(), topic.isRetainAsPublished(), topic.getSubscriptionIdentifier(), clientConnection.getChannel());
                this.sharedSubscriptionService.invalidateSharedSubscriptionCache(clientId);
                this.sharedSubscriptionService.invalidateSharedSubscriberCache(sharedSubId);
                clientConnection.setNoSharedSubscription(false);
                log.trace("Invalidated cache and polled for shared subscription '{}' and client '{}'", (Object)sharedSubId, (Object)clientId);
            }
        }
    }

    @Override
    @NotNull
    public ListenableFuture<MultipleChunkResult<Map<String, ImmutableSet<Topic>>>> getAllLocalSubscribersChunk(@NotNull ChunkCursor cursor) {
        return this.chunker.getAllLocalChunk(cursor, 2000, (bucket, lastKey, maxResults) -> this.singleWriter.submit(bucket, bucketIndex -> this.localPersistence.getAllSubscribersChunk(bucketIndex, lastKey, maxResults)));
    }

    @NotNull
    private ListenableFuture<Void> removeBatchedTopics(@NotNull String clientId, @NotNull ImmutableSet<String> topics) {
        long timestamp = System.currentTimeMillis();
        ImmutableSet.Builder topicsToRemoveBuilder = new ImmutableSet.Builder();
        for (String topic : topics) {
            SharedSubscriptionService.SharedSubscription sharedSubscription = SharedSubscriptionService.checkForSharedSubscription(topic);
            if (sharedSubscription == null) {
                topicsToRemoveBuilder.add((Object)new TopicFilter(topic, null));
                continue;
            }
            topicsToRemoveBuilder.add((Object)new TopicFilter(sharedSubscription.getTopicFilter(), sharedSubscription.getShareName()));
        }
        ImmutableSet topicsToRemove = topicsToRemoveBuilder.build();
        for (TopicFilter topicFilter : topicsToRemove) {
            this.topicTree.removeSubscriber(clientId, topicFilter.getTopic(), topicFilter.getSharedName());
        }
        ListenableFuture<Void> persistFuture = this.singleWriter.submit(clientId, bucketIndex -> {
            this.localPersistence.removeSubscriptions(clientId, topics, timestamp, bucketIndex);
            return null;
        });
        return persistFuture;
    }

    private void disconnectSharedSubscriberWithEmptyTopic(@NotNull String clientId) {
        ClientConnection clientConnection = this.connectionPersistence.get(clientId);
        if (clientConnection != null) {
            clientConnection.getChannel().eventLoop().execute(() -> this.mqttServerDisconnector.disconnect(clientConnection.getChannel(), "A client (IP: {}) sent a shared subscription with an empty topic. Disconnecting client.", "Sent shared subscription with empty topic", Mqtt5DisconnectReasonCode.TOPIC_FILTER_INVALID, "Shared subscription with empty topic."));
        } else {
            log.debug("Client {} sent a shared subscription with empty topic.", (Object)clientId);
        }
    }

    @Override
    @NotNull
    public ImmutableSet<Topic> getSharedSubscriptions(@NotNull String client) {
        Preconditions.checkNotNull((Object)client, (Object)"Client id must not be null");
        ImmutableSet<Topic> subscriptions = this.getSubscriptions(client);
        ImmutableSet.Builder sharedSubscriptions = ImmutableSet.builder();
        for (Topic subscription : subscriptions) {
            boolean isSharedSubscription = SharedSubscriptionService.checkForSharedSubscription(subscription.getTopic()) != null;
            if (!isSharedSubscription) continue;
            sharedSubscriptions.add((Object)subscription);
        }
        return sharedSubscriptions.build();
    }

    @Override
    @NotNull
    public ListenableFuture<Void> cleanUp(int bucketIndex) {
        return this.singleWriter.submit(bucketIndex, bucketIndex1 -> {
            this.localPersistence.cleanUp(bucketIndex1);
            return null;
        });
    }

    @Override
    @NotNull
    public ListenableFuture<Void> closeDB() {
        return this.closeDB(this.localPersistence, this.singleWriter);
    }
}

