/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.persistence.clientsession;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.metrics.MetricsHolder;
import com.hivemq.mqtt.handler.publish.PublishReturnCode;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PUBLISHFactory;
import com.hivemq.mqtt.services.InternalPublishService;
import com.hivemq.persistence.clientsession.ClientSession;
import com.hivemq.persistence.clientsession.ClientSessionPersistence;
import com.hivemq.persistence.clientsession.ClientSessionWill;
import com.hivemq.persistence.ioc.annotation.Persistence;
import com.hivemq.persistence.local.ClientSessionLocalPersistence;
import com.hivemq.util.Checkpoints;
import com.hivemq.util.Exceptions;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class PendingWillMessages {
    private static final Logger log = LoggerFactory.getLogger(PendingWillMessages.class);
    @NotNull
    private final Map<String, PendingWill> pendingWills = new ConcurrentHashMap<String, PendingWill>();
    @NotNull
    private final InternalPublishService publishService;
    @NotNull
    private final ListeningScheduledExecutorService executorService;
    @NotNull
    private final ClientSessionPersistence clientSessionPersistence;
    @NotNull
    private final ClientSessionLocalPersistence clientSessionLocalPersistence;
    @NotNull
    private final MetricsHolder metricsHolder;

    @Inject
    public PendingWillMessages(@NotNull InternalPublishService publishService, @Persistence @NotNull ListeningScheduledExecutorService executorService, @NotNull ClientSessionPersistence clientSessionPersistence, @NotNull ClientSessionLocalPersistence clientSessionLocalPersistence, @NotNull MetricsHolder metricsHolder) {
        this.publishService = publishService;
        this.executorService = executorService;
        this.clientSessionPersistence = clientSessionPersistence;
        this.clientSessionLocalPersistence = clientSessionLocalPersistence;
        this.metricsHolder = metricsHolder;
        executorService.scheduleAtFixedRate((Runnable)new CheckWillsTask(), 1L, 1L, TimeUnit.SECONDS);
    }

    public void sendOrEnqueueWillIfAvailable(@NotNull String clientId, @NotNull ClientSession session) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        Preconditions.checkNotNull((Object)session, (Object)"Client session must not be null");
        ClientSessionWill sessionWill = session.getWillPublish();
        if (session.getWillPublish() == null) {
            return;
        }
        if (sessionWill.getDelayInterval() == 0L || session.getSessionExpiryIntervalSec() == 0L) {
            this.sendWill(clientId, PendingWillMessages.publishFromWill(sessionWill));
            return;
        }
        this.pendingWills.put(clientId, new PendingWill(Math.min(sessionWill.getDelayInterval(), session.getSessionExpiryIntervalSec()), System.currentTimeMillis()));
    }

    public void sendWillIfPending(@NotNull String clientId) {
        PendingWill pendingWill = this.pendingWills.remove(clientId);
        if (pendingWill != null) {
            this.getAndSendPendingWill(clientId);
        }
    }

    public void sendWillIfPending(@NotNull String clientId, @NotNull ClientSession session) {
        PendingWill pendingWill = this.pendingWills.remove(clientId);
        if (pendingWill != null) {
            this.getAndSendPendingWill(clientId, session);
        }
    }

    private void getAndSendPendingWill(@NotNull String clientId) {
        ClientSession session = this.clientSessionLocalPersistence.getSession(clientId, false);
        assert (session != null) : "Missing expected session to get will message from for client: " + clientId;
        if (session == null) {
            log.warn("Unable to send pending will for client {} because the session is gone.", (Object)clientId);
            return;
        }
        this.getAndSendPendingWill(clientId, session);
    }

    private void getAndSendPendingWill(@NotNull String clientId, @NotNull ClientSession session) {
        ClientSessionWill sessionWill = session.getWillPublish();
        assert (sessionWill != null) : "Missing expected will message in session of client: " + clientId;
        if (sessionWill == null) {
            log.warn("Unable to send pending will for client {} because the session's will is gone.", (Object)clientId);
            return;
        }
        this.sendWill(clientId, PendingWillMessages.publishFromWill(sessionWill));
    }

    public void cancelWillIfPending(@NotNull String clientId) {
        this.pendingWills.remove(clientId);
    }

    public void reset() {
        this.pendingWills.clear();
        ListenableFuture<Map<String, PendingWill>> future = this.clientSessionPersistence.pendingWills();
        Futures.addCallback(future, (FutureCallback)new FutureCallback<Map<String, PendingWill>>(){

            public void onSuccess(@NotNull Map<String, PendingWill> result) {
                PendingWillMessages.this.pendingWills.putAll(result);
            }

            public void onFailure(@NotNull Throwable t) {
                Exceptions.rethrowError("Exception when reading pending will messages", t);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private void sendWill(@NotNull String clientId, @NotNull PUBLISH willPublish) {
        Futures.addCallback(this.publishService.publish(willPublish, (ExecutorService)this.executorService, clientId), (FutureCallback)new FutureCallback<PublishReturnCode>(){

            public void onSuccess(@NotNull PublishReturnCode result) {
                PendingWillMessages.this.metricsHolder.getPublishedWillMessagesCount().inc();
            }

            public void onFailure(@NotNull Throwable t) {
                log.error("Publish of Will message failed.", t);
                Exceptions.rethrowError(t);
            }
        }, (Executor)MoreExecutors.directExecutor());
        ListenableFuture<Void> future = this.clientSessionPersistence.deleteWill(clientId);
        if (Checkpoints.enabled()) {
            future.addListener(() -> Checkpoints.checkpoint("pending-will-removed"), MoreExecutors.directExecutor());
        }
    }

    @NotNull
    private static PUBLISH publishFromWill(@NotNull ClientSessionWill sessionWill) {
        return new PUBLISHFactory.Mqtt5Builder().withTopic(sessionWill.getTopic()).withQoS(sessionWill.getQos()).withOnwardQos(sessionWill.getQos()).withPayload(sessionWill.getPayload()).withRetain(sessionWill.isRetain()).withHivemqId(sessionWill.getHivemqId()).withUserProperties(sessionWill.getUserProperties()).withResponseTopic(sessionWill.getResponseTopic()).withCorrelationData(sessionWill.getCorrelationData()).withContentType(sessionWill.getContentType()).withPayloadFormatIndicator(sessionWill.getPayloadFormatIndicator()).withMessageExpiryInterval(sessionWill.getMessageExpiryInterval()).build();
    }

    @VisibleForTesting
    @NotNull
    public Map<String, PendingWill> getPendingWills() {
        return this.pendingWills;
    }

    public static class PendingWill {
        private final long delayInterval;
        private final long startTime;

        public PendingWill(long delayInterval, long startTime) {
            this.delayInterval = delayInterval;
            this.startTime = startTime;
        }

        public long getDelayInterval() {
            return this.delayInterval;
        }

        public long getStartTime() {
            return this.startTime;
        }
    }

    class CheckWillsTask
    implements Runnable {
        CheckWillsTask() {
        }

        @Override
        public void run() {
            try {
                for (String clientId : PendingWillMessages.this.pendingWills.keySet()) {
                    PendingWillMessages.this.pendingWills.computeIfPresent(clientId, (clientIdKey, pendingWill) -> {
                        if (pendingWill.getStartTime() + pendingWill.getDelayInterval() * 1000L < System.currentTimeMillis()) {
                            PendingWillMessages.this.getAndSendPendingWill((String)clientIdKey);
                            return null;
                        }
                        return pendingWill;
                    });
                }
            }
            catch (Exception e) {
                log.error("Exception while checking pending will messages", (Throwable)e);
            }
        }
    }
}

