/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.persistence.local.xodus.clientsession;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.annotations.ThreadSafe;
import com.hivemq.extensions.iteration.BucketChunkResult;
import com.hivemq.logging.EventLog;
import com.hivemq.metrics.MetricsHolder;
import com.hivemq.persistence.NoSessionException;
import com.hivemq.persistence.PersistenceEntry;
import com.hivemq.persistence.PersistenceStartup;
import com.hivemq.persistence.clientsession.ClientSession;
import com.hivemq.persistence.clientsession.ClientSessionWill;
import com.hivemq.persistence.clientsession.PendingWillMessages;
import com.hivemq.persistence.exception.InvalidSessionExpiryIntervalException;
import com.hivemq.persistence.local.ClientSessionLocalPersistence;
import com.hivemq.persistence.local.xodus.EnvironmentUtil;
import com.hivemq.persistence.local.xodus.TransactionCommitActions;
import com.hivemq.persistence.local.xodus.XodusLocalPersistence;
import com.hivemq.persistence.local.xodus.XodusUtils;
import com.hivemq.persistence.local.xodus.bucket.Bucket;
import com.hivemq.persistence.local.xodus.bucket.BucketUtils;
import com.hivemq.persistence.local.xodus.clientsession.ClientSessionPersistenceSerializer;
import com.hivemq.persistence.payload.PublishPayloadPersistence;
import com.hivemq.util.LocalPersistenceFileUtil;
import com.hivemq.util.ThreadPreConditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import jetbrains.exodus.ByteIterable;
import jetbrains.exodus.env.Cursor;
import jetbrains.exodus.env.Store;
import jetbrains.exodus.env.StoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LazySingleton
@ThreadSafe
public class ClientSessionXodusLocalPersistence
extends XodusLocalPersistence
implements ClientSessionLocalPersistence {
    @NotNull
    private static final Logger log = LoggerFactory.getLogger(ClientSessionXodusLocalPersistence.class);
    @NotNull
    private static final String PERSISTENCE_NAME = "client_session_store";
    @NotNull
    public static final String PERSISTENCE_VERSION = "040000";
    @NotNull
    private final ClientSessionPersistenceSerializer serializer;
    @NotNull
    private final PublishPayloadPersistence payloadPersistence;
    @NotNull
    private final EventLog eventLog;
    @NotNull
    private final MetricsHolder metricsHolder;
    @NotNull
    private final AtomicInteger sessionsCount = new AtomicInteger(0);

    @Inject
    ClientSessionXodusLocalPersistence(@NotNull LocalPersistenceFileUtil localPersistenceFileUtil, @NotNull EnvironmentUtil environmentUtil, @NotNull PublishPayloadPersistence payloadPersistence, @NotNull EventLog eventLog, @NotNull PersistenceStartup persistenceStartup, @NotNull MetricsHolder metricsHolder) {
        super(environmentUtil, localPersistenceFileUtil, persistenceStartup, InternalConfigurations.PERSISTENCE_BUCKET_COUNT.get(), true);
        this.payloadPersistence = payloadPersistence;
        this.eventLog = eventLog;
        this.metricsHolder = metricsHolder;
        this.serializer = new ClientSessionPersistenceSerializer();
    }

    @Override
    @NotNull
    protected String getName() {
        return PERSISTENCE_NAME;
    }

    @Override
    @NotNull
    protected String getVersion() {
        return PERSISTENCE_VERSION;
    }

    @Override
    @NotNull
    protected StoreConfig getStoreConfig() {
        return StoreConfig.WITHOUT_DUPLICATES;
    }

    @Override
    @NotNull
    protected Logger getLogger() {
        return log;
    }

    @Override
    @PostConstruct
    protected void postConstruct() {
        super.postConstruct();
    }

    @Override
    protected void init() {
        for (int i = 0; i < this.bucketCount; ++i) {
            Bucket bucket = this.buckets[i];
            SessionCounterDelta sessionCounterDelta = new SessionCounterDelta();
            bucket.getEnvironment().executeInExclusiveTransaction(txn -> {
                Store store = bucket.getStore();
                try (Cursor cursor = bucket.getStore().openCursor(txn);){
                    TransactionCommitActions commitActions = TransactionCommitActions.asCommitHookFor(txn);
                    while (cursor.getNext()) {
                        ClientSessionWill will;
                        byte[] bytes = XodusUtils.byteIterableToBytes(cursor.getValue());
                        ClientSession clientSession = this.serializer.deserializeValue(bytes);
                        if (ClientSessionXodusLocalPersistence.persistent(clientSession)) {
                            sessionCounterDelta.increment();
                        }
                        if ((will = clientSession.getWillPublish()) == null) continue;
                        commitActions.add(() -> {
                            this.payloadPersistence.incrementReferenceCounterOnBootstrap(will.getPublishId());
                            this.payloadPersistence.decrementReferenceCounter(will.getPublishId());
                        });
                        clientSession.setWillPublish(null);
                        long timestamp = this.serializer.deserializeTimestamp(bytes);
                        byte[] sessionsWithoutWill = this.serializer.serializeValue(clientSession, timestamp);
                        store.put(txn, cursor.getKey(), XodusUtils.bytesToByteIterable(sessionsWithoutWill));
                    }
                }
            });
            sessionCounterDelta.run();
        }
    }

    @Override
    @Nullable
    public ClientSession getSession(@NotNull String clientId) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        return this.getSession(clientId, this.getBucket(clientId), true, true);
    }

    @Override
    @Nullable
    public ClientSession getSession(@NotNull String clientId, int bucketIndex) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        this.checkBucketIndex(bucketIndex);
        return this.getSession(clientId, this.buckets[bucketIndex], true, true);
    }

    @Override
    @Nullable
    public ClientSession getSession(@NotNull String clientId, boolean checkExpired) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        return this.getSession(clientId, this.getBucket(clientId), checkExpired, true);
    }

    @Override
    @Nullable
    public ClientSession getSession(@NotNull String clientId, int bucketIndex, boolean checkExpired) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        this.checkBucketIndex(bucketIndex);
        return this.getSession(clientId, this.buckets[bucketIndex], checkExpired, true);
    }

    @Override
    @Nullable
    public ClientSession getSession(@NotNull String clientId, boolean checkExpired, boolean includeWill) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        return this.getSession(clientId, this.getBucket(clientId), checkExpired, includeWill);
    }

    @Nullable
    private ClientSession getSession(@NotNull String clientId, Bucket bucket, boolean checkExpired, boolean includeWill) {
        return (ClientSession)bucket.getEnvironment().computeInReadonlyTransaction(txn -> {
            ByteIterable byteIterable = bucket.getStore().get(txn, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(clientId)));
            if (byteIterable == null) {
                return null;
            }
            byte @NotNull [] bytes = XodusUtils.byteIterableToBytes(byteIterable);
            ClientSession clientSession = includeWill ? this.serializer.deserializeValue(bytes) : this.serializer.deserializeValueWithoutWill(bytes);
            if (checkExpired && clientSession.isExpired(System.currentTimeMillis() - this.serializer.deserializeTimestamp(bytes))) {
                return null;
            }
            if (includeWill) {
                this.loadWillPayload(clientSession);
            }
            return clientSession;
        });
    }

    @Override
    @Nullable
    public Long getTimestamp(@NotNull String clientId) {
        return this.getTimestamp(clientId, BucketUtils.getBucket(clientId, this.bucketCount));
    }

    @Override
    @Nullable
    public Long getTimestamp(@NotNull String clientId, int bucketIndex) {
        Bucket bucket = this.buckets[bucketIndex];
        return (Long)bucket.getEnvironment().computeInReadonlyTransaction(txn -> {
            ByteIterable byteIterable = bucket.getStore().get(txn, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(clientId)));
            if (byteIterable == null) {
                return null;
            }
            return this.serializer.deserializeTimestamp(XodusUtils.byteIterableToBytes(byteIterable));
        });
    }

    @Override
    public void put(@NotNull String clientId, @NotNull ClientSession newClientSession, long timestamp, int bucketIndex) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        Preconditions.checkNotNull((Object)newClientSession, (Object)"Client session must not be null");
        Preconditions.checkArgument((timestamp > 0L ? 1 : 0) != 0, (Object)"Timestamp must be greater than 0");
        ThreadPreConditions.startsWith("single-writer");
        Bucket bucket = this.buckets[bucketIndex];
        bucket.getEnvironment().executeInExclusiveTransaction(txn -> {
            ByteIterable key = XodusUtils.bytesToByteIterable(this.serializer.serializeKey(clientId));
            boolean isPersistent = ClientSessionXodusLocalPersistence.persistent(newClientSession);
            ByteIterable value = bucket.getStore().get(txn, key);
            txn.setCommitHook(() -> {
                if (value == null) {
                    ClientSessionWill newWill;
                    if (isPersistent || newClientSession.isConnected()) {
                        this.sessionsCount.incrementAndGet();
                    }
                    if ((newWill = newClientSession.getWillPublish()) != null) {
                        this.addWillReference(newWill);
                    }
                } else {
                    ClientSession prevClientSession = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(value));
                    this.handleWillPayloads(prevClientSession.getWillPublish(), newClientSession.getWillPublish());
                    boolean prevIsPersistent = ClientSessionXodusLocalPersistence.persistent(prevClientSession);
                    if ((isPersistent || newClientSession.isConnected()) && !prevIsPersistent && !prevClientSession.isConnected()) {
                        this.sessionsCount.incrementAndGet();
                    } else if ((prevIsPersistent || prevClientSession.isConnected()) && !isPersistent && !newClientSession.isConnected()) {
                        this.sessionsCount.decrementAndGet();
                    }
                }
            });
            bucket.getStore().put(txn, key, XodusUtils.bytesToByteIterable(this.serializer.serializeValue(newClientSession, timestamp)));
        });
    }

    @Override
    @NotNull
    public ClientSession disconnect(@NotNull String clientId, long timestamp, boolean sendWill, int bucketIndex, long sessionExpiryInterval) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        ThreadPreConditions.startsWith("single-writer");
        Bucket bucket = this.buckets[bucketIndex];
        return (ClientSession)bucket.getEnvironment().computeInExclusiveTransaction(txn -> {
            ByteIterable key = XodusUtils.bytesToByteIterable(this.serializer.serializeKey(clientId));
            ByteIterable byteIterable = bucket.getStore().get(txn, key);
            if (byteIterable == null) {
                ClientSession clientSession = new ClientSession(false, 0L);
                bucket.getStore().put(txn, key, XodusUtils.bytesToByteIterable(this.serializer.serializeValue(clientSession, timestamp)));
                return clientSession;
            }
            ClientSession clientSession = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(byteIterable));
            if (sessionExpiryInterval != Long.MAX_VALUE) {
                clientSession.setSessionExpiryIntervalSec(sessionExpiryInterval);
            }
            boolean isConnected = clientSession.isConnected();
            ClientSessionWill will = clientSession.getWillPublish();
            txn.setCommitHook(() -> {
                if (isConnected && !ClientSessionXodusLocalPersistence.persistent(clientSession)) {
                    this.sessionsCount.decrementAndGet();
                }
                if (!sendWill && will != null) {
                    this.removeWillReference(will);
                }
            });
            clientSession.setConnected(false);
            if (!sendWill && will != null) {
                clientSession.setWillPublish(null);
            }
            bucket.getStore().put(txn, key, XodusUtils.bytesToByteIterable(this.serializer.serializeValue(clientSession, timestamp)));
            this.loadWillPayload(clientSession);
            return clientSession;
        });
    }

    @Override
    @Nullable
    public PersistenceEntry<ClientSession> deleteWill(@NotNull String clientId, int bucketIndex) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client id must not be null");
        ThreadPreConditions.startsWith("single-writer");
        Bucket bucket = this.buckets[bucketIndex];
        return (PersistenceEntry)bucket.getEnvironment().computeInExclusiveTransaction(txn -> {
            ByteIterable key = XodusUtils.bytesToByteIterable(this.serializer.serializeKey(clientId));
            ByteIterable byteIterable = bucket.getStore().get(txn, key);
            if (byteIterable == null) {
                return null;
            }
            ClientSession clientSession = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(byteIterable));
            if (clientSession.isConnected()) {
                return null;
            }
            long timestamp = this.serializer.deserializeTimestamp(XodusUtils.byteIterableToBytes(byteIterable));
            ClientSessionWill will = clientSession.getWillPublish();
            if (will != null) {
                txn.setCommitHook(() -> this.removeWillReference(will));
                clientSession.setWillPublish(null);
                bucket.getStore().put(txn, key, XodusUtils.bytesToByteIterable(this.serializer.serializeValue(clientSession, timestamp)));
            }
            return new PersistenceEntry<ClientSession>(clientSession, timestamp);
        });
    }

    @Override
    @NotNull
    public BucketChunkResult<Map<String, ClientSession>> getAllClientsChunk(int bucketIndex, @Nullable String lastClientId, int maxResults) {
        this.checkBucketIndex(bucketIndex);
        Bucket bucket = this.buckets[bucketIndex];
        return (BucketChunkResult)bucket.getEnvironment().computeInReadonlyTransaction(txn -> {
            HashMap resultMap = Maps.newHashMap();
            try (Cursor cursor = bucket.getStore().openCursor(txn);){
                int counter = 0;
                if (lastClientId != null) {
                    ByteIterable lastClientKey = XodusUtils.bytesToByteIterable(this.serializer.serializeKey(lastClientId));
                    ByteIterable foundKey = cursor.getSearchKeyRange(lastClientKey);
                    if (foundKey == null) {
                        BucketChunkResult<HashMap> bucketChunkResult = new BucketChunkResult<HashMap>(resultMap, true, lastClientId, bucketIndex);
                        return bucketChunkResult;
                    }
                    if (cursor.getKey().equals(lastClientKey)) {
                        cursor.getNext();
                    }
                } else {
                    cursor.getNext();
                }
                String lastKey = lastClientId;
                do {
                    String key;
                    if (cursor.getKey() == ByteIterable.EMPTY) continue;
                    lastKey = key = this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(cursor.getKey()));
                    byte[] valueBytes = XodusUtils.byteIterableToBytes(cursor.getValue());
                    ClientSession clientSession = this.serializer.deserializeValueWithoutWill(valueBytes);
                    long timestamp = this.serializer.deserializeTimestamp(valueBytes);
                    boolean expired = clientSession.isExpired(System.currentTimeMillis() - timestamp);
                    if (expired) continue;
                    resultMap.put(key, clientSession);
                    if (++counter < maxResults) continue;
                    BucketChunkResult<HashMap> bucketChunkResult = new BucketChunkResult<HashMap>(resultMap, !cursor.getNext(), lastKey, bucketIndex);
                    return bucketChunkResult;
                } while (cursor.getNext());
                BucketChunkResult<HashMap> bucketChunkResult = new BucketChunkResult<HashMap>(resultMap, true, lastKey, bucketIndex);
                return bucketChunkResult;
            }
        });
    }

    @Override
    @NotNull
    public Set<String> getAllClients(int bucketIndex) {
        Bucket bucket = this.buckets[bucketIndex];
        return (Set)bucket.getEnvironment().computeInReadonlyTransaction(txn -> {
            ImmutableSet.Builder clientSessions = ImmutableSet.builder();
            try (Cursor cursor = bucket.getStore().openCursor(txn);){
                while (cursor.getNext()) {
                    String clientId = this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(cursor.getKey()));
                    clientSessions.add((Object)clientId);
                }
            }
            return clientSessions.build();
        });
    }

    @VisibleForTesting
    void removeWithTimestamp(@NotNull String client, int bucketIndex) {
        Bucket bucket = this.buckets[bucketIndex];
        bucket.getEnvironment().executeInExclusiveTransaction(txn -> {
            ByteIterable value = bucket.getStore().get(txn, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(client)));
            if (value != null) {
                ClientSession clientSession = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(value));
                txn.setCommitHook(() -> {
                    if (ClientSessionXodusLocalPersistence.persistent(clientSession) || clientSession.isConnected()) {
                        this.sessionsCount.decrementAndGet();
                    }
                    if (clientSession.getWillPublish() != null) {
                        this.removeWillReference(clientSession.getWillPublish());
                    }
                });
                bucket.getStore().delete(txn, XodusUtils.bytesToByteIterable(this.serializer.serializeKey(client)));
            }
        });
    }

    @Override
    public void setSessionExpiryInterval(@NotNull String clientId, long sessionExpiryInterval, int bucketIndex) {
        Preconditions.checkNotNull((Object)clientId, (Object)"Client Id must not be null");
        if (sessionExpiryInterval < 0L) {
            throw new InvalidSessionExpiryIntervalException("Invalid session expiry interval " + sessionExpiryInterval);
        }
        Bucket bucket = this.buckets[bucketIndex];
        bucket.getEnvironment().executeInExclusiveTransaction(txn -> {
            ByteIterable key = XodusUtils.bytesToByteIterable(this.serializer.serializeKey(clientId));
            ByteIterable valueFromStore = bucket.getStore().get(txn, key);
            if (valueFromStore == null) {
                throw NoSessionException.INSTANCE;
            }
            ClientSession clientSession = this.serializer.deserializeValue(XodusUtils.byteIterableToBytes(valueFromStore));
            if (!clientSession.isConnected() && !ClientSessionXodusLocalPersistence.persistent(clientSession)) {
                throw NoSessionException.INSTANCE;
            }
            clientSession.setSessionExpiryIntervalSec(sessionExpiryInterval);
            bucket.getStore().put(txn, key, XodusUtils.bytesToByteIterable(this.serializer.serializeValue(clientSession, System.currentTimeMillis())));
        });
    }

    @Override
    @NotNull
    public Set<String> cleanUp(int bucketIndex) {
        if (this.stopped.get()) {
            return ImmutableSet.of();
        }
        Bucket bucket = this.buckets[bucketIndex];
        return (Set)bucket.getEnvironment().computeInExclusiveTransaction(txn -> {
            ImmutableSet.Builder expiredSessionsBuilder = ImmutableSet.builder();
            try (Cursor cursor = bucket.getStore().openCursor(txn);){
                TransactionCommitActions commitActions = TransactionCommitActions.asCommitHookFor(txn);
                SessionCounterDelta sessionCounterDelta = new SessionCounterDelta();
                commitActions.add(sessionCounterDelta);
                while (cursor.getNext()) {
                    String clientId = this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(cursor.getKey()));
                    byte[] valueBytes = XodusUtils.byteIterableToBytes(cursor.getValue());
                    ClientSession clientSession = this.serializer.deserializeValue(valueBytes);
                    long timestamp = this.serializer.deserializeTimestamp(valueBytes);
                    long sessionExpiryInterval = clientSession.getSessionExpiryIntervalSec();
                    long timeSinceDisconnect = System.currentTimeMillis() - timestamp;
                    if (!clientSession.isExpired(timeSinceDisconnect)) continue;
                    if (sessionExpiryInterval > 0L) {
                        sessionCounterDelta.decrement();
                    }
                    commitActions.add(() -> this.eventLog.clientSessionExpired(timestamp + sessionExpiryInterval * 1000L, clientId));
                    cursor.deleteCurrent();
                    expiredSessionsBuilder.add((Object)clientId);
                }
            }
            return expiredSessionsBuilder.build();
        });
    }

    @Override
    @NotNull
    public Set<String> getDisconnectedClients(int bucketIndex) {
        this.checkBucketIndex(bucketIndex);
        Bucket bucket = this.buckets[bucketIndex];
        return (Set)bucket.getEnvironment().computeInReadonlyTransaction(txn -> {
            HashSet<String> collectSet = new HashSet<String>();
            try (Cursor cursor = bucket.getStore().openCursor(txn);){
                while (cursor.getNext()) {
                    long sessionExpiryIntervalInMillis;
                    byte[] valueBytes = XodusUtils.byteIterableToBytes(cursor.getValue());
                    ClientSession clientSession = this.serializer.deserializeValue(valueBytes);
                    String clientId = this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(cursor.getKey()));
                    if (clientSession.isConnected() || clientSession.getSessionExpiryIntervalSec() <= 0L) continue;
                    long timestamp = this.serializer.deserializeTimestamp(valueBytes);
                    long timeSinceDisconnect = System.currentTimeMillis() - timestamp;
                    if (timeSinceDisconnect >= (sessionExpiryIntervalInMillis = clientSession.getSessionExpiryIntervalSec() * 1000L)) continue;
                    collectSet.add(clientId);
                }
            }
            return collectSet;
        });
    }

    @Override
    public int getSessionsCount() {
        return this.sessionsCount.get();
    }

    @Override
    @NotNull
    public Map<String, PendingWillMessages.PendingWill> getPendingWills(int bucketIndex) {
        Bucket bucket = this.buckets[bucketIndex];
        return (Map)bucket.getEnvironment().computeInReadonlyTransaction(txn -> {
            HashMap<String, PendingWillMessages.PendingWill> resultMap = new HashMap<String, PendingWillMessages.PendingWill>();
            try (Cursor cursor = bucket.getStore().openCursor(txn);){
                while (cursor.getNext()) {
                    byte[] valueBytes = XodusUtils.byteIterableToBytes(cursor.getValue());
                    ClientSession clientSession = this.serializer.deserializeValue(valueBytes);
                    String clientId = this.serializer.deserializeKey(XodusUtils.byteIterableToBytes(cursor.getKey()));
                    long timestamp = this.serializer.deserializeTimestamp(valueBytes);
                    if (clientSession.isConnected() || clientSession.getWillPublish() == null) continue;
                    ClientSessionWill willPublish = clientSession.getWillPublish();
                    resultMap.put(clientId, new PendingWillMessages.PendingWill(Math.min(willPublish.getDelayInterval(), clientSession.getSessionExpiryIntervalSec()), timestamp));
                }
            }
            return resultMap;
        });
    }

    private void loadWillPayload(@NotNull ClientSession clientSession) {
        ClientSessionWill willPublish = clientSession.getWillPublish();
        if (willPublish == null) {
            return;
        }
        if (willPublish.getPayload() != null) {
            return;
        }
        byte[] payload = this.payloadPersistence.get(willPublish.getPublishId());
        if (payload == null) {
            clientSession.setWillPublish(null);
            log.warn("Will Payload for payloadId {} not found", (Object)willPublish.getPublishId());
            return;
        }
        willPublish.getMqttWillPublish().setPayload(payload);
    }

    private void handleWillPayloads(@Nullable ClientSessionWill previousWill, @Nullable ClientSessionWill currentWill) {
        if (previousWill != null && currentWill != null) {
            if (previousWill.getPublishId() != currentWill.getPublishId()) {
                this.payloadPersistence.decrementReferenceCounter(previousWill.getPublishId());
                this.payloadPersistence.add(currentWill.getPayload(), currentWill.getPublishId());
            }
        } else {
            if (previousWill != null) {
                this.removeWillReference(previousWill);
            }
            if (currentWill != null) {
                this.addWillReference(currentWill);
            }
        }
    }

    private static boolean persistent(@NotNull ClientSession clientSession) {
        return clientSession.getSessionExpiryIntervalSec() > 0L;
    }

    private void addWillReference(@NotNull ClientSessionWill will) {
        this.metricsHolder.getStoredWillMessagesCount().inc();
        this.payloadPersistence.add(will.getPayload(), will.getPublishId());
    }

    private void removeWillReference(@NotNull ClientSessionWill will) {
        this.metricsHolder.getStoredWillMessagesCount().dec();
        this.payloadPersistence.decrementReferenceCounter(will.getPublishId());
    }

    private class SessionCounterDelta
    implements Runnable {
        private int delta;

        private SessionCounterDelta() {
        }

        private void increment() {
            ++this.delta;
        }

        private void decrement() {
            --this.delta;
        }

        @Override
        public void run() {
            ClientSessionXodusLocalPersistence.this.sessionsCount.addAndGet(this.delta);
        }
    }
}

