/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.persistence.retained;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extensions.iteration.ChunkCursor;
import com.hivemq.extensions.iteration.Chunker;
import com.hivemq.extensions.iteration.MultipleChunkResult;
import com.hivemq.persistence.AbstractPersistence;
import com.hivemq.persistence.ProducerQueues;
import com.hivemq.persistence.RetainedMessage;
import com.hivemq.persistence.SingleWriterService;
import com.hivemq.persistence.retained.RetainedMessageLocalPersistence;
import com.hivemq.persistence.retained.RetainedMessagePersistence;
import com.hivemq.persistence.util.FutureUtils;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.inject.Inject;

@LazySingleton
public class RetainedMessagePersistenceImpl
extends AbstractPersistence
implements RetainedMessagePersistence {
    @NotNull
    private final RetainedMessageLocalPersistence localPersistence;
    @NotNull
    private final ProducerQueues singleWriter;
    @NotNull
    private final Chunker chunker;

    @Inject
    RetainedMessagePersistenceImpl(@NotNull RetainedMessageLocalPersistence localPersistence, @NotNull SingleWriterService singleWriterService, @NotNull Chunker chunker) {
        this.localPersistence = localPersistence;
        this.singleWriter = singleWriterService.getRetainedMessageQueue();
        this.chunker = chunker;
    }

    @Override
    @NotNull
    public ListenableFuture<RetainedMessage> get(@NotNull String topic) {
        try {
            Preconditions.checkNotNull((Object)topic, (Object)"Topic must not be null");
            if (topic.contains("+") || topic.contains("#")) {
                throw new IllegalArgumentException("Topic contains wildcard characters. Call getWithWildcards method instead.");
            }
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
        return this.singleWriter.submit(topic, bucketIndex -> this.localPersistence.get(topic, bucketIndex));
    }

    @Override
    public long size() {
        return this.localPersistence.size();
    }

    @Override
    @NotNull
    public ListenableFuture<Void> remove(@NotNull String topic) {
        try {
            Preconditions.checkNotNull((Object)topic, (Object)"Topic must not be null");
            return this.singleWriter.submit(topic, bucketIndex -> {
                this.localPersistence.remove(topic, bucketIndex);
                return null;
            });
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> persist(@NotNull String topic, @NotNull RetainedMessage retainedMessage) {
        try {
            Preconditions.checkNotNull((Object)topic, (Object)"Topic must not be null");
            Preconditions.checkNotNull((Object)retainedMessage, (Object)"Retained message must not be null");
            return this.singleWriter.submit(topic, bucketIndex -> {
                this.localPersistence.put(retainedMessage, topic, bucketIndex);
                return null;
            });
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Set<String>> getWithWildcards(@NotNull String subscription) {
        try {
            Preconditions.checkNotNull((Object)subscription, (Object)"Topic must not be null");
            if (!subscription.contains("+") && !subscription.contains("#")) {
                throw new IllegalArgumentException("Topic does not contain wildcard characters. Call get method instead.");
            }
            List<ListenableFuture<Set>> futures = this.singleWriter.submitToAllBucketsParallel(bucketIndex -> new HashSet<String>(this.localPersistence.getAllTopics(subscription, bucketIndex)));
            return Futures.transform((ListenableFuture)Futures.allAsList(futures), listOfSets -> listOfSets.stream().flatMap(Collection::stream).collect(Collectors.toSet()), (Executor)MoreExecutors.directExecutor());
        }
        catch (Throwable throwable) {
            return Futures.immediateFailedFuture((Throwable)throwable);
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> cleanUp(int bucketIndex) {
        return this.singleWriter.submit(bucketIndex, bucketIndex1 -> {
            this.localPersistence.cleanUp(bucketIndex1);
            return null;
        });
    }

    @Override
    @NotNull
    public ListenableFuture<Void> closeDB() {
        return this.closeDB(this.localPersistence, this.singleWriter);
    }

    @Override
    @NotNull
    public ListenableFuture<Void> clear() {
        List<ListenableFuture<Void>> futureList = this.singleWriter.submitToAllBucketsParallel(bucketIndex -> {
            this.localPersistence.clear(bucketIndex);
            return null;
        });
        return FutureUtils.voidFutureFromList((ImmutableList<ListenableFuture<Void>>)ImmutableList.copyOf(futureList));
    }

    @Override
    @NotNull
    public @NotNull ListenableFuture<MultipleChunkResult<Map<String, @NotNull RetainedMessage>>> getAllLocalRetainedMessagesChunk(@NotNull ChunkCursor cursor) {
        return this.chunker.getAllLocalChunk(cursor, 0xA00000, (bucket, lastKey, maxResults) -> this.singleWriter.submit(bucket, bucketIndex -> this.localPersistence.getAllRetainedMessagesChunk(bucketIndex, lastKey, maxResults)));
    }
}

