/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import lombok.Generated;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LastCumulativeAck;
import org.apache.pulsar.client.impl.MessageIdAdvUtils;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentAcknowledgmentsGroupingTracker
implements AcknowledgmentsGroupingTracker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PersistentAcknowledgmentsGroupingTracker.class);
    private final int maxAckGroupSize;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private volatile CompletableFuture<Void> currentIndividualAckFuture;
    private volatile CompletableFuture<Void> currentCumulativeAckFuture;
    private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
    private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
    private final ScheduledFuture<?> scheduledTask;
    private final boolean batchIndexAckEnabled;
    private final boolean ackReceiptEnabled;

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf, EventLoopGroup eventLoopGroup) {
        this.consumer = consumer;
        this.pendingIndividualAcks = new ConcurrentSkipListSet();
        this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap();
        this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
        this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize();
        this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
        this.ackReceiptEnabled = conf.isAckReceiptEnabled();
        this.currentIndividualAckFuture = new TimedCompletableFuture<Void>();
        this.currentCumulativeAckFuture = new TimedCompletableFuture<Void>();
        this.scheduledTask = this.acknowledgementGroupTimeMicros > 0L ? eventLoopGroup.next().scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::flush), this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS) : null;
    }

    @Override
    public boolean isDuplicate(MessageId messageId) {
        if (!(messageId instanceof MessageIdAdv)) {
            throw new IllegalArgumentException("isDuplicated cannot accept " + messageId.getClass().getName() + ": " + messageId);
        }
        MessageIdAdv messageIdAdv = (MessageIdAdv)messageId;
        if (this.lastCumulativeAck.compareTo(messageIdAdv) >= 0) {
            return true;
        }
        MessageIdAdv key = MessageIdAdvUtils.discardBatch(messageIdAdv);
        if (this.pendingIndividualAcks.contains(key)) {
            return true;
        }
        if (messageIdAdv.getBatchIndex() >= 0) {
            ConcurrentBitSetRecyclable bitSet = this.pendingIndividualBatchIndexAcks.get(key);
            return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex());
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, CommandAck.AckType ackType, Map<String, Long> properties) {
        if (CommandAck.AckType.Cumulative.equals((Object)ackType)) {
            if (this.consumer.isAckReceiptEnabled()) {
                HashSet completableFutureSet = new HashSet();
                messageIds.forEach(messageId -> completableFutureSet.add(this.addAcknowledgment((MessageId)messageId, ackType, properties)));
                return FutureUtil.waitForAll(new ArrayList(completableFutureSet));
            }
            messageIds.forEach(messageId -> this.addAcknowledgment((MessageId)messageId, ackType, properties));
            return CompletableFuture.completedFuture(null);
        }
        Optional<Lock> readLock = this.acquireReadLock();
        try {
            if (messageIds.size() != 0) {
                this.addListAcknowledgment(messageIds);
                CompletableFuture<Object> completableFuture = readLock.map(__ -> this.currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
                return completableFuture;
            }
            CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
            return completableFuture;
        }
        finally {
            readLock.ifPresent(Lock::unlock);
            if (this.acknowledgementGroupTimeMicros == 0L || this.pendingIndividualAcks.size() >= this.maxAckGroupSize) {
                this.flush();
            }
        }
    }

    private void addListAcknowledgment(List<MessageId> messageIds) {
        for (MessageId messageId : messageIds) {
            MessageIdAdv messageIdAdv = (MessageIdAdv)messageId;
            if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
                this.addIndividualAcknowledgment(MessageIdAdvUtils.discardBatch(messageIdAdv), messageIdAdv, this::doIndividualAckAsync, this::doIndividualBatchAckAsync);
                continue;
            }
            this.addIndividualAcknowledgment(messageIdAdv, null, this::doIndividualAckAsync, this::doIndividualBatchAckAsync);
        }
    }

    @Override
    public CompletableFuture<Void> addAcknowledgment(MessageId msgId, CommandAck.AckType ackType, Map<String, Long> properties) {
        MessageIdAdv msgIdAdv = (MessageIdAdv)msgId;
        if (MessageIdAdvUtils.isBatch(msgIdAdv)) {
            return this.addAcknowledgment(MessageIdAdvUtils.discardBatch(msgId), ackType, properties, msgIdAdv);
        }
        return this.addAcknowledgment(msgIdAdv, ackType, properties, null);
    }

    private CompletableFuture<Void> addIndividualAcknowledgment(MessageIdAdv msgId, @Nullable MessageIdAdv batchMessageId, Function<MessageIdAdv, CompletableFuture<Void>> individualAckFunction, Function<MessageIdAdv, CompletableFuture<Void>> batchAckFunction) {
        if (batchMessageId != null) {
            this.consumer.onAcknowledge(batchMessageId, null);
        } else {
            this.consumer.onAcknowledge(msgId, null);
        }
        if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, true)) {
            this.consumer.getStats().incrementNumAcksSent(batchMessageId != null ? (long)batchMessageId.getBatchSize() : 1L);
            this.consumer.getUnAckedMessageTracker().remove(msgId);
            if (this.consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
                this.consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId);
            }
            return individualAckFunction.apply(msgId);
        }
        if (this.batchIndexAckEnabled) {
            return batchAckFunction.apply(batchMessageId);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId, CommandAck.AckType ackType, Map<String, Long> properties, @Nullable MessageIdAdv batchMessageId) {
        switch (ackType) {
            case Individual: {
                return this.addIndividualAcknowledgment(msgId, batchMessageId, __ -> this.doIndividualAck((MessageIdAdv)__, properties), __ -> this.doIndividualBatchAck((MessageIdAdv)__, properties));
            }
            case Cumulative: {
                if (batchMessageId != null) {
                    this.consumer.onAcknowledgeCumulative(batchMessageId, null);
                } else {
                    this.consumer.onAcknowledgeCumulative(msgId, null);
                }
                if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
                    return this.doCumulativeAck(msgId, properties, null);
                }
                if (this.batchIndexAckEnabled) {
                    return this.doCumulativeBatchIndexAck(batchMessageId, properties);
                }
                this.doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, null);
                return CompletableFuture.completedFuture(null);
            }
        }
        throw new IllegalStateException("Unknown AckType: " + ackType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateAck(messageId, CommandAck.AckType.Individual, properties, null);
        }
        Optional<Lock> readLock = this.acquireReadLock();
        try {
            this.doIndividualAckAsync(messageId);
            CompletableFuture<Object> completableFuture = readLock.map(__ -> this.currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
            return completableFuture;
        }
        finally {
            readLock.ifPresent(Lock::unlock);
            if (this.pendingIndividualAcks.size() >= this.maxAckGroupSize) {
                this.flush();
            }
        }
    }

    private CompletableFuture<Void> doIndividualAckAsync(MessageIdAdv messageId) {
        this.pendingIndividualAcks.add(messageId);
        this.pendingIndividualBatchIndexAcks.remove(messageId);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), CommandAck.AckType.Individual, properties);
        }
        return this.doIndividualBatchAck(batchMessageId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId) {
        Optional<Lock> readLock = this.acquireReadLock();
        try {
            this.doIndividualBatchAckAsync(batchMessageId);
            CompletableFuture<Object> completableFuture = readLock.map(__ -> this.currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
            return completableFuture;
        }
        finally {
            readLock.ifPresent(Lock::unlock);
            if (this.pendingIndividualBatchIndexAcks.size() >= this.maxAckGroupSize) {
                this.flush();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, Map<String, Long> properties, BitSetRecyclable bitSet) {
        this.consumer.getStats().incrementNumAcksSent(this.consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateAck(messageId, CommandAck.AckType.Cumulative, properties, bitSet);
        }
        Optional<Lock> readLock = this.acquireReadLock();
        try {
            this.doCumulativeAckAsync(messageId, bitSet);
            CompletableFuture<Object> completableFuture = readLock.map(__ -> {
                if (this.consumer.isAckReceiptEnabled() && this.lastCumulativeAck.compareTo(messageId) == 0) {
                    return CompletableFuture.completedFuture(null);
                }
                return this.currentCumulativeAckFuture;
            }).orElse(CompletableFuture.completedFuture(null));
            return completableFuture;
        }
        finally {
            readLock.ifPresent(Lock::unlock);
        }
    }

    private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
        ConcurrentBitSetRecyclable bitSet = this.pendingIndividualBatchIndexAcks.computeIfAbsent(MessageIdAdvUtils.discardBatch(msgId), __ -> {
            ConcurrentBitSetRecyclable value;
            BitSet ackSet = msgId.getAckSet();
            if (ackSet != null) {
                BitSet bitSet = ackSet;
                synchronized (bitSet) {
                    if (!ackSet.isEmpty()) {
                        value = ConcurrentBitSetRecyclable.create(ackSet);
                    } else {
                        value = ConcurrentBitSetRecyclable.create();
                        value.set(0, msgId.getBatchSize());
                    }
                }
            } else {
                value = ConcurrentBitSetRecyclable.create();
                value.set(0, msgId.getBatchSize());
            }
            return value;
        });
        bitSet.clear(msgId.getBatchIndex());
        return CompletableFuture.completedFuture(null);
    }

    private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable bitSet) {
        this.lastCumulativeAck.update(msgId, bitSet);
    }

    private CompletableFuture<Void> doCumulativeBatchIndexAck(MessageIdAdv batchMessageId, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), CommandAck.AckType.Cumulative, properties);
        }
        BitSetRecyclable bitSet = BitSetRecyclable.create();
        bitSet.set(0, batchMessageId.getBatchSize());
        bitSet.clear(0, batchMessageId.getBatchIndex() + 1);
        return this.doCumulativeAck(batchMessageId, null, bitSet);
    }

    private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, CommandAck.AckType ackType, Map<String, Long> properties, BitSetRecyclable bitSet) {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.ConnectException("Consumer connect fail! consumer state:" + this.consumer.getState()));
        }
        return this.newImmediateAckAndFlush(this.consumer.consumerId, msgId, bitSet, ackType, properties, cnx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> doImmediateBatchIndexAck(MessageIdAdv msgId, int batchIndex, int batchSize, CommandAck.AckType ackType, Map<String, Long> properties) {
        BitSetRecyclable bitSet;
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.ConnectException("Consumer connect fail! consumer state:" + this.consumer.getState()));
        }
        BitSet ackSetFromMsgId = msgId.getAckSet();
        if (ackSetFromMsgId != null) {
            BitSet bitSet2 = ackSetFromMsgId;
            synchronized (bitSet2) {
                bitSet = BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray());
            }
        } else {
            bitSet = BitSetRecyclable.create();
            bitSet.set(0, batchSize);
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            bitSet.clear(0, batchIndex + 1);
        } else {
            bitSet.clear(batchIndex);
        }
        CompletableFuture<Void> completableFuture = this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, properties, true, null, null);
        bitSet.recycle();
        return completableFuture;
    }

    @Override
    public void flush() {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
            }
            return;
        }
        Optional<Lock> writeLock = this.acquireWriteLock();
        try {
            this.flushAsync(cnx);
        }
        finally {
            writeLock.ifPresent(Lock::unlock);
        }
    }

    private void flushAsync(ClientCnx cnx) {
        LastCumulativeAck lastCumulativeAckToFlush = this.lastCumulativeAck.flush();
        boolean shouldFlush = false;
        if (lastCumulativeAckToFlush != null) {
            shouldFlush = true;
            MessageIdAdv messageId = lastCumulativeAckToFlush.getMessageId();
            this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), lastCumulativeAckToFlush.getBitSetRecyclable(), CommandAck.AckType.Cumulative, Collections.emptyMap(), false, (TimedCompletableFuture)this.currentCumulativeAckFuture, null);
            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
        }
        ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(this.pendingIndividualAcks.size() + this.pendingIndividualBatchIndexAcks.size());
        if (!this.pendingIndividualAcks.isEmpty()) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
                        for (MessageIdImpl cMsgId : chunkMsgIds) {
                            if (cMsgId == null) continue;
                            entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                        }
                        this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
                        continue;
                    }
                    entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
                }
            } else {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, CommandAck.AckType.Individual, Collections.emptyMap(), false, null, null);
                    shouldFlush = true;
                }
            }
        }
        if (!this.pendingIndividualBatchIndexAcks.isEmpty()) {
            Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> iterator = this.pendingIndividualBatchIndexAcks.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next();
                entriesToAck.add(Triple.of(entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue()));
                iterator.remove();
            }
        }
        if (entriesToAck.size() > 0) {
            this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, 0L, 0L, null, CommandAck.AckType.Individual, null, true, (TimedCompletableFuture)this.currentIndividualAckFuture, entriesToAck);
            shouldFlush = true;
        }
        if (shouldFlush) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}", new Object[]{this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks, entriesToAck});
            }
            cnx.ctx().flush();
        }
    }

    @Override
    public void flushAndClean() {
        this.flush();
        this.lastCumulativeAck.reset();
        this.pendingIndividualAcks.clear();
    }

    @Override
    public void close() {
        this.flush();
        if (this.scheduledTask != null && !this.scheduledTask.isCancelled()) {
            this.scheduledTask.cancel(true);
        }
    }

    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdAdv msgId, BitSetRecyclable bitSet, CommandAck.AckType ackType, Map<String, Long> map, ClientCnx cnx) {
        CompletableFuture<Void> completableFuture;
        MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
        if (chunkMsgIds != null && ackType != CommandAck.AckType.Cumulative) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(chunkMsgIds.length);
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    if (cMsgId == null || chunkMsgIds.length <= 1) continue;
                    entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                }
                completableFuture = this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, 0L, 0L, null, ackType, null, true, null, entriesToAck);
            } else {
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    this.newMessageAckCommandAndWrite(cnx, consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(), bitSet, ackType, map, true, null, null);
                }
                completableFuture = CompletableFuture.completedFuture(null);
            }
        } else {
            completableFuture = this.newMessageAckCommandAndWrite(cnx, consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, map, true, null, null);
        }
        return completableFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> newMessageAckCommandAndWrite(ClientCnx cnx, long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, CommandAck.AckType ackType, Map<String, Long> properties, boolean flush, TimedCompletableFuture<Void> timedCompletableFuture, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
        if (this.consumer.isAckReceiptEnabled()) {
            long requestId = this.consumer.getClient().newRequestId();
            ByteBuf cmd = entriesToAck == null ? Commands.newAck(consumerId, ledgerId, entryId, ackSet, ackType, null, properties, requestId) : Commands.newMultiMessageAck(consumerId, entriesToAck, requestId);
            if (timedCompletableFuture == null) {
                return cnx.newAckForReceipt(cmd, requestId);
            }
            if (ackType == CommandAck.AckType.Individual) {
                this.currentIndividualAckFuture = new TimedCompletableFuture<Void>();
            } else {
                this.currentCumulativeAckFuture = new TimedCompletableFuture<Void>();
            }
            cnx.newAckForReceiptWithFuture(cmd, requestId, timedCompletableFuture);
            return timedCompletableFuture;
        }
        if (this.ackReceiptEnabled) {
            PersistentAcknowledgmentsGroupingTracker requestId = this;
            synchronized (requestId) {
                if (!this.currentCumulativeAckFuture.isDone()) {
                    this.currentCumulativeAckFuture.complete(null);
                }
                if (!this.currentIndividualAckFuture.isDone()) {
                    this.currentIndividualAckFuture.complete(null);
                }
            }
        }
        ByteBuf cmd = entriesToAck == null ? Commands.newAck(consumerId, ledgerId, entryId, ackSet, ackType, null, properties, -1L) : Commands.newMultiMessageAck(consumerId, entriesToAck, -1L);
        if (flush) {
            cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
        } else {
            cnx.ctx().write(cmd, cnx.ctx().voidPromise());
        }
        return CompletableFuture.completedFuture(null);
    }

    public Optional<Lock> acquireReadLock() {
        Optional<Object> optionalLock = Optional.ofNullable(this.consumer.isAckReceiptEnabled() ? this.lock.readLock() : null);
        optionalLock.ifPresent(Lock::lock);
        return optionalLock;
    }

    public Optional<Lock> acquireWriteLock() {
        Optional<Object> optionalLock = Optional.ofNullable(this.consumer.isAckReceiptEnabled() ? this.lock.writeLock() : null);
        optionalLock.ifPresent(Lock::lock);
        return optionalLock;
    }
}

