/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.extensions.services.subscription;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.services.exception.DoNotImplementException;
import com.hivemq.extension.sdk.api.services.exception.InvalidTopicException;
import com.hivemq.extension.sdk.api.services.exception.NoSuchClientIdException;
import com.hivemq.extension.sdk.api.services.general.IterationCallback;
import com.hivemq.extension.sdk.api.services.general.IterationContext;
import com.hivemq.extension.sdk.api.services.subscription.SubscriberForTopicResult;
import com.hivemq.extension.sdk.api.services.subscription.SubscriberWithFilterResult;
import com.hivemq.extension.sdk.api.services.subscription.SubscriptionStore;
import com.hivemq.extension.sdk.api.services.subscription.SubscriptionType;
import com.hivemq.extension.sdk.api.services.subscription.SubscriptionsForClientResult;
import com.hivemq.extension.sdk.api.services.subscription.TopicSubscription;
import com.hivemq.extensions.ListenableFutureConverter;
import com.hivemq.extensions.iteration.AllItemsFetchCallback;
import com.hivemq.extensions.iteration.AllItemsItemCallback;
import com.hivemq.extensions.iteration.AsyncIterator;
import com.hivemq.extensions.iteration.AsyncIteratorFactory;
import com.hivemq.extensions.iteration.ChunkCursor;
import com.hivemq.extensions.iteration.MultipleChunkResult;
import com.hivemq.extensions.services.PluginServiceRateLimitService;
import com.hivemq.extensions.services.executor.GlobalManagedExtensionExecutorService;
import com.hivemq.extensions.services.general.IterationContextImpl;
import com.hivemq.extensions.services.subscription.SubscriberForTopicResultImpl;
import com.hivemq.extensions.services.subscription.SubscriberWithFilterResultImpl;
import com.hivemq.extensions.services.subscription.SubscriptionsForClientResultImpl;
import com.hivemq.extensions.services.subscription.TopicSubscriptionImpl;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.mqtt.topic.tree.LocalTopicTree;
import com.hivemq.mqtt.topic.tree.SubscriptionTypeItemFilter;
import com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence;
import com.hivemq.persistence.clientsession.callback.SubscriptionResult;
import com.hivemq.util.Topics;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;

@LazySingleton
public class SubscriptionStoreImpl
implements SubscriptionStore {
    @NotNull
    private final ClientSessionSubscriptionPersistence subscriptionPersistence;
    @NotNull
    private final PluginServiceRateLimitService rateLimitService;
    @NotNull
    private final LocalTopicTree topicTree;
    @NotNull
    private final GlobalManagedExtensionExecutorService managedExtensionExecutorService;
    @NotNull
    private final AsyncIteratorFactory asyncIteratorFactory;

    @Inject
    public SubscriptionStoreImpl(@NotNull ClientSessionSubscriptionPersistence subscriptionPersistence, @NotNull PluginServiceRateLimitService rateLimitService, @NotNull LocalTopicTree topicTree, @NotNull GlobalManagedExtensionExecutorService managedExtensionExecutorService, @NotNull AsyncIteratorFactory asyncIteratorFactory) {
        this.subscriptionPersistence = subscriptionPersistence;
        this.rateLimitService = rateLimitService;
        this.topicTree = topicTree;
        this.managedExtensionExecutorService = managedExtensionExecutorService;
        this.asyncIteratorFactory = asyncIteratorFactory;
    }

    @NotNull
    public CompletableFuture<Void> addSubscription(final @NotNull String clientID, @NotNull TopicSubscription subscription) {
        Preconditions.checkNotNull((Object)clientID, (Object)"Client id must never be null");
        Preconditions.checkArgument((!clientID.isEmpty() ? 1 : 0) != 0, (Object)"Client id must never be empty");
        Preconditions.checkNotNull((Object)subscription, (Object)"Topic subscription must never be null");
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        if (!(subscription instanceof TopicSubscriptionImpl)) {
            return CompletableFuture.failedFuture((Throwable)new DoNotImplementException(TopicSubscription.class.getSimpleName()));
        }
        ListenableFuture<SubscriptionResult> addSubscriptionFuture = this.subscriptionPersistence.addSubscription(clientID, TopicSubscriptionImpl.convertToTopic(subscription));
        final SettableFuture settableFuture = SettableFuture.create();
        Futures.addCallback(addSubscriptionFuture, (FutureCallback)new FutureCallback<SubscriptionResult>(){

            public void onSuccess(@Nullable SubscriptionResult result) {
                if (result == null) {
                    settableFuture.setException((Throwable)new NoSuchClientIdException(clientID));
                    return;
                }
                settableFuture.set(null);
            }

            public void onFailure(@NotNull Throwable t) {
                settableFuture.setException(t);
            }
        }, (Executor)this.managedExtensionExecutorService);
        return ListenableFutureConverter.toCompletable(settableFuture, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> addSubscriptions(@NotNull String clientID, @NotNull Set<TopicSubscription> subscriptions) {
        Preconditions.checkNotNull((Object)clientID, (Object)"Client id must never be null");
        Preconditions.checkArgument((!clientID.isEmpty() ? 1 : 0) != 0, (Object)"Client id must never be empty");
        Preconditions.checkNotNull(subscriptions, (Object)"Subscriptions must never be null");
        Preconditions.checkArgument((!subscriptions.isEmpty() ? 1 : 0) != 0, (Object)"Subscriptions must never be empty");
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        ImmutableSet.Builder topicsToProcess = new ImmutableSet.Builder();
        for (TopicSubscription topicSubscription : subscriptions) {
            Preconditions.checkNotNull((Object)topicSubscription, (Object)"Topic subscription must never be null");
            if (!(topicSubscription instanceof TopicSubscriptionImpl)) {
                return CompletableFuture.failedFuture((Throwable)new DoNotImplementException(TopicSubscription.class.getSimpleName()));
            }
            topicsToProcess.add((Object)TopicSubscriptionImpl.convertToTopic(topicSubscription));
        }
        return this.processAddSubscriptions(clientID, (ImmutableSet<Topic>)topicsToProcess.build());
    }

    @NotNull
    private CompletableFuture<Void> processAddSubscriptions(final @NotNull String clientID, @NotNull ImmutableSet<Topic> successTopics) {
        ListenableFuture<ImmutableList<SubscriptionResult>> addSubscriptionFuture = this.subscriptionPersistence.addSubscriptions(clientID, successTopics);
        final SettableFuture settableFuture = SettableFuture.create();
        Futures.addCallback(addSubscriptionFuture, (FutureCallback)new FutureCallback<ImmutableList<SubscriptionResult>>(){

            public void onSuccess(@Nullable ImmutableList<SubscriptionResult> result) {
                if (result == null) {
                    settableFuture.setException((Throwable)new NoSuchClientIdException(clientID));
                    return;
                }
                settableFuture.set(null);
            }

            public void onFailure(@NotNull Throwable t) {
                settableFuture.setException(t);
            }
        }, (Executor)this.managedExtensionExecutorService);
        return ListenableFutureConverter.toCompletable(settableFuture, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> removeSubscription(@NotNull String clientID, @NotNull String topicFilter) {
        Preconditions.checkNotNull((Object)clientID, (Object)"Client id must never be null");
        Preconditions.checkArgument((!clientID.isEmpty() ? 1 : 0) != 0, (Object)"Client id must never be empty");
        Preconditions.checkNotNull((Object)topicFilter, (Object)"Topic filter must never be null");
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        if (!Topics.isValidToSubscribe(topicFilter)) {
            return CompletableFuture.failedFuture((Throwable)new InvalidTopicException(topicFilter));
        }
        return ListenableFutureConverter.toCompletable(this.subscriptionPersistence.remove(clientID, topicFilter), this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> removeSubscriptions(@NotNull String clientID, @NotNull Set<String> topicFilters) {
        Preconditions.checkNotNull((Object)clientID, (Object)"Client id must never be null");
        Preconditions.checkArgument((!clientID.isEmpty() ? 1 : 0) != 0, (Object)"Client id must never be empty");
        Preconditions.checkNotNull(topicFilters, (Object)"Topic-filters must never be null");
        Preconditions.checkArgument((!topicFilters.isEmpty() ? 1 : 0) != 0, (Object)"Topics-filters must never be empty");
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        ArrayList<String> failedTopics = new ArrayList<String>();
        for (String topicFilter : topicFilters) {
            Preconditions.checkNotNull((Object)topicFilter, (Object)"Topic filter must never be null");
            if (Topics.isValidToSubscribe(topicFilter)) continue;
            failedTopics.add(topicFilter);
        }
        if (failedTopics.isEmpty()) {
            return ListenableFutureConverter.toVoidCompletable(this.subscriptionPersistence.removeSubscriptions(clientID, (ImmutableSet<String>)ImmutableSet.copyOf(topicFilters)), this.managedExtensionExecutorService);
        }
        return CompletableFuture.failedFuture((Throwable)new InvalidTopicException("Topics not valid: " + String.valueOf(failedTopics)));
    }

    @NotNull
    public CompletableFuture<Set<TopicSubscription>> getSubscriptions(@NotNull String clientID) {
        Preconditions.checkNotNull((Object)clientID, (Object)"Client id must never be null");
        Preconditions.checkArgument((!clientID.isEmpty() ? 1 : 0) != 0, (Object)"Client id must never be empty");
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        return CompletableFuture.completedFuture(ClientSubscriptionsToTopicSubscriptions.INSTANCE.apply(this.subscriptionPersistence.getSubscriptions(clientID)));
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersForTopic(@NotNull String topic, @NotNull IterationCallback<SubscriberForTopicResult> callback) {
        return this.iterateAllSubscribersForTopic(topic, SubscriptionType.ALL, callback, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersForTopic(@NotNull String topic, @NotNull SubscriptionType subscriptionType, @NotNull IterationCallback<SubscriberForTopicResult> callback) {
        return this.iterateAllSubscribersForTopic(topic, subscriptionType, callback, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersForTopic(@NotNull String topic, @NotNull IterationCallback<SubscriberForTopicResult> callback, @NotNull Executor callbackExecutor) {
        return this.iterateAllSubscribersForTopic(topic, SubscriptionType.ALL, callback, callbackExecutor);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersForTopic(@NotNull String topic, @NotNull SubscriptionType subscriptionType, @NotNull IterationCallback<SubscriberForTopicResult> callback, @NotNull Executor callbackExecutor) {
        Preconditions.checkNotNull((Object)topic, (Object)"Topic cannot be null");
        Preconditions.checkNotNull(callback, (Object)"Callback cannot be null");
        Preconditions.checkNotNull((Object)callbackExecutor, (Object)"Executor cannot be null");
        Preconditions.checkArgument((boolean)Topics.isValidTopicToPublish(topic), (Object)("Topic must be a valid topic and cannot contain wildcard characters, got '" + topic + "'"));
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        ImmutableSet<String> subscribers = this.topicTree.getSubscribersForTopic(topic, new SubscriptionTypeItemFilter(subscriptionType), false);
        SettableFuture iterationFinishedFuture = SettableFuture.create();
        callbackExecutor.execute(() -> {
            ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
            IterationContextImpl iterationContext = new IterationContextImpl();
            try {
                Thread.currentThread().setContextClassLoader(callback.getClass().getClassLoader());
                for (String subscriber : subscribers) {
                    try {
                        callback.iterate((IterationContext)iterationContext, (Object)new SubscriberForTopicResultImpl(subscriber));
                        if (!iterationContext.isAborted()) continue;
                        iterationFinishedFuture.set(null);
                        return;
                    }
                    catch (Exception e) {
                        iterationFinishedFuture.setException((Throwable)e);
                        return;
                    }
                }
                iterationFinishedFuture.set(null);
            }
            finally {
                Thread.currentThread().setContextClassLoader(previousClassLoader);
            }
        });
        return ListenableFutureConverter.toCompletable(iterationFinishedFuture, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersWithTopicFilter(@NotNull String topicFilter, @NotNull IterationCallback<SubscriberWithFilterResult> callback) {
        return this.iterateAllSubscribersWithTopicFilter(topicFilter, SubscriptionType.ALL, callback, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersWithTopicFilter(@NotNull String topicFilter, @NotNull SubscriptionType subscriptionType, @NotNull IterationCallback<SubscriberWithFilterResult> callback) {
        return this.iterateAllSubscribersWithTopicFilter(topicFilter, subscriptionType, callback, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersWithTopicFilter(@NotNull String topicFilter, @NotNull IterationCallback<SubscriberWithFilterResult> callback, @NotNull Executor callbackExecutor) {
        return this.iterateAllSubscribersWithTopicFilter(topicFilter, SubscriptionType.ALL, callback, callbackExecutor);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscribersWithTopicFilter(@NotNull String topicFilter, @NotNull SubscriptionType subscriptionType, @NotNull IterationCallback<SubscriberWithFilterResult> callback, @NotNull Executor callbackExecutor) {
        Preconditions.checkNotNull((Object)topicFilter, (Object)"Topic filter cannot be null");
        Preconditions.checkNotNull(callback, (Object)"Callback cannot be null");
        Preconditions.checkNotNull((Object)callbackExecutor, (Object)"Executor cannot be null");
        Preconditions.checkNotNull((Object)subscriptionType, (Object)"SubscriptionType cannot be null");
        Preconditions.checkArgument((boolean)Topics.isValidToSubscribe(topicFilter), (Object)("Topic filter must be a valid MQTT topic filter, got '" + topicFilter + "'"));
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        ImmutableSet<String> subscribers = this.topicTree.getSubscribersWithFilter(topicFilter, new SubscriptionTypeItemFilter(subscriptionType));
        SettableFuture iterationFinishedFuture = SettableFuture.create();
        callbackExecutor.execute(() -> {
            ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
            IterationContextImpl iterationContext = new IterationContextImpl();
            try {
                Thread.currentThread().setContextClassLoader(callback.getClass().getClassLoader());
                for (String subscriber : subscribers) {
                    try {
                        callback.iterate((IterationContext)iterationContext, (Object)new SubscriberWithFilterResultImpl(subscriber));
                        if (!iterationContext.isAborted()) continue;
                        iterationFinishedFuture.set(null);
                        return;
                    }
                    catch (Exception e) {
                        iterationFinishedFuture.setException((Throwable)e);
                        return;
                    }
                }
                iterationFinishedFuture.set(null);
            }
            finally {
                Thread.currentThread().setContextClassLoader(previousClassLoader);
            }
        });
        return ListenableFutureConverter.toCompletable(iterationFinishedFuture, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscriptions(@NotNull IterationCallback<SubscriptionsForClientResult> callback) {
        return this.iterateAllSubscriptions(callback, this.managedExtensionExecutorService);
    }

    @NotNull
    public CompletableFuture<Void> iterateAllSubscriptions(@NotNull IterationCallback<SubscriptionsForClientResult> callback, @NotNull Executor callbackExecutor) {
        Preconditions.checkNotNull(callback, (Object)"Callback cannot be null");
        Preconditions.checkNotNull(callback, (Object)"Callback executor cannot be null");
        if (this.rateLimitService.rateLimitExceeded()) {
            return CompletableFuture.failedFuture((Throwable)PluginServiceRateLimitService.RATE_LIMIT_EXCEEDED_EXCEPTION);
        }
        AllSubscribersFetchCallback fetchCallback = new AllSubscribersFetchCallback(this.subscriptionPersistence);
        AsyncIterator<SubscriptionsForClientResult> asyncIterator = this.asyncIteratorFactory.createIterator(fetchCallback, new AllItemsItemCallback<SubscriptionsForClientResult>(callbackExecutor, callback));
        asyncIterator.fetchAndIterate();
        SettableFuture iterationFinishedFuture = SettableFuture.create();
        asyncIterator.getFinishedFuture().whenComplete((aVoid, throwable) -> {
            if (throwable != null) {
                iterationFinishedFuture.setException(throwable);
            } else {
                iterationFinishedFuture.set(null);
            }
        });
        return ListenableFutureConverter.toCompletable(iterationFinishedFuture, this.managedExtensionExecutorService);
    }

    static class AllSubscribersFetchCallback
    extends AllItemsFetchCallback<SubscriptionsForClientResult, Map<String, ImmutableSet<Topic>>> {
        @NotNull
        private final ClientSessionSubscriptionPersistence subscriptionPersistence;

        AllSubscribersFetchCallback(@NotNull ClientSessionSubscriptionPersistence subscriptionPersistence) {
            this.subscriptionPersistence = subscriptionPersistence;
        }

        @Override
        @NotNull
        protected ListenableFuture<MultipleChunkResult<Map<String, ImmutableSet<Topic>>>> persistenceCall(@NotNull ChunkCursor chunkCursor) {
            return this.subscriptionPersistence.getAllLocalSubscribersChunk(chunkCursor);
        }

        @Override
        @NotNull
        protected Collection<SubscriptionsForClientResult> transform(@NotNull Map<String, ImmutableSet<Topic>> stringSetMap) {
            return stringSetMap.entrySet().stream().map(entry -> new SubscriptionsForClientResultImpl((String)entry.getKey(), ((ImmutableSet)entry.getValue()).stream().map(TopicSubscriptionImpl::new).collect(Collectors.toSet()))).collect(Collectors.toUnmodifiableList());
        }
    }

    private static class ClientSubscriptionsToTopicSubscriptions
    implements Function<ImmutableSet<Topic>, Set<TopicSubscription>> {
        private static final ClientSubscriptionsToTopicSubscriptions INSTANCE = new ClientSubscriptionsToTopicSubscriptions();

        private ClientSubscriptionsToTopicSubscriptions() {
        }

        @Override
        @NotNull
        public Set<TopicSubscription> apply(@NotNull ImmutableSet<Topic> clientSubscriptions) {
            return clientSubscriptions.stream().map(TopicSubscriptionImpl::new).collect(Collectors.toUnmodifiableSet());
        }
    }
}

