package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerConfig;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.class */
public class KafkaThrottledLatestProcessedCommit extends ContextHolder implements KafkaCommitHandler {
    private static final Map<String, Map<Integer, TopicPartition>> TOPIC_PARTITIONS_CACHE = new ConcurrentHashMap();
    private final Map<TopicPartition, OffsetStore> offsetStores;
    private final String groupId;
    private final KafkaConsumer<?, ?> consumer;
    private final KafkaSource<?, ?> source;
    private final int unprocessedRecordMaxAge;
    private final int autoCommitInterval;
    private volatile long timerId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$OffsetReceivedAt.class */
    public static class OffsetReceivedAt {
        private final long offset;
        private final long receivedAt;

        private OffsetReceivedAt(long j, long j2) {
            this.offset = j;
            this.receivedAt = j2;
        }

        static OffsetReceivedAt received(long j) {
            return new OffsetReceivedAt(j, System.currentTimeMillis());
        }

        public long getOffset() {
            return this.offset;
        }

        public long getReceivedAt() {
            return this.receivedAt;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$OffsetStore.class */
    public static class OffsetStore {
        private final TopicPartition topicPartition;
        private final int unprocessedRecordMaxAge;
        private final Queue<OffsetReceivedAt> receivedOffsets = new LinkedList();
        private final Set<Long> processedOffsets = new HashSet();
        private long unProcessedTotal = 0;
        private long lastCommitted = -1;

        OffsetStore(TopicPartition topicPartition, int i) {
            this.topicPartition = topicPartition;
            this.unprocessedRecordMaxAge = i;
        }

        long getLastCommittedOffset() {
            return this.lastCommitted;
        }

        void received(long j) {
            this.receivedOffsets.offer(OffsetReceivedAt.received(j));
            this.unProcessedTotal++;
        }

        void processed(long j) {
            if (this.receivedOffsets.isEmpty() || this.receivedOffsets.peek().getOffset() > j) {
                return;
            }
            this.processedOffsets.add(Long.valueOf(j));
        }

        long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
            if (this.processedOffsets.isEmpty()) {
                return -1L;
            }
            long j = -1;
            while (!this.receivedOffsets.isEmpty() && this.processedOffsets.remove(Long.valueOf(this.receivedOffsets.peek().getOffset()))) {
                this.unProcessedTotal--;
                OffsetReceivedAt poll = this.receivedOffsets.poll();
                if (poll != null) {
                    j = poll.getOffset();
                }
            }
            if (j <= -1) {
                return -1L;
            }
            this.lastCommitted = j;
            return j;
        }

        boolean hasTooManyMessagesWithoutAck() {
            if (this.receivedOffsets.isEmpty()) {
                return false;
            }
            OffsetReceivedAt peek = this.receivedOffsets.peek();
            if ((peek == null ? 0L : System.currentTimeMillis() - peek.getReceivedAt()) <= this.unprocessedRecordMaxAge) {
                return false;
            }
            KafkaLogging.log.receivedTooManyMessagesWithoutAcking(this.topicPartition.toString(), this.unProcessedTotal, this.lastCommitted);
            return true;
        }
    }

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException.class */
    public static class TooManyMessagesWithoutAckException extends NoStackTraceThrowable {
        public TooManyMessagesWithoutAckException(String str, int i, long j) {
            super("Too Many Messages without acknowledgement in topic " + str + " (partition:" + i + "), last committed offset is " + j);
        }
    }

    private KafkaThrottledLatestProcessedCommit(String str, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, KafkaSource<?, ?> kafkaSource, int i, int i2, int i3) {
        super(vertx, i3);
        this.offsetStores = new HashMap();
        this.timerId = -1L;
        this.groupId = str;
        this.consumer = kafkaConsumer;
        this.source = kafkaSource;
        this.unprocessedRecordMaxAge = i;
        this.autoCommitInterval = i2;
    }

    public static void clearCache() {
        TOPIC_PARTITIONS_CACHE.clear();
    }

    public static KafkaThrottledLatestProcessedCommit create(Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, String str, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaSource<?, ?> kafkaSource) {
        int intValue = kafkaConnectorIncomingConfiguration.getThrottledUnprocessedRecordMaxAgeMs().intValue();
        int intValue2 = ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, Integer.class).orElse(5000)).intValue();
        int intValue3 = ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue();
        KafkaLogging.log.settingCommitInterval(str, intValue2);
        if (intValue <= 0) {
            KafkaLogging.log.disableThrottledCommitStrategyHealthCheck(str);
        } else {
            KafkaLogging.log.setThrottledCommitStrategyReceivedRecordMaxAge(str, intValue);
        }
        return new KafkaThrottledLatestProcessedCommit(str, vertx, kafkaConsumer, kafkaSource, intValue, intValue2, intValue3);
    }

    private <K, V> TopicPartition getTopicPartition(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        return TOPIC_PARTITIONS_CACHE.computeIfAbsent(incomingKafkaRecord.getTopic(), str -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(incomingKafkaRecord.getPartition()), num -> {
            return new TopicPartition(incomingKafkaRecord.getTopic(), num.intValue());
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsAssigned(Collection<TopicPartition> collection) {
        runOnContextAndAwait(() -> {
            stopFlushAndCheckHealthTimer();
            if (collection.isEmpty() && this.offsetStores.isEmpty()) {
                return null;
            }
            startFlushAndCheckHealthTimer();
            return null;
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsRevoked(Collection<TopicPartition> collection) {
        runOnContextAndAwait(() -> {
            stopFlushAndCheckHealthTimer();
            return null;
        });
        Tuple2 tuple2 = (Tuple2) runOnContextAndAwait(() -> {
            HashMap hashMap = new HashMap();
            Iterator it = new HashSet(this.offsetStores.keySet()).iterator();
            while (it.hasNext()) {
                TopicPartition topicPartition = (TopicPartition) it.next();
                if (collection.contains(topicPartition)) {
                    long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset = this.offsetStores.remove(topicPartition).clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset();
                    if (clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset > -1) {
                        hashMap.put(topicPartition, new OffsetAndMetadata(clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset + 1, null));
                    }
                }
            }
            return Tuple2.of(hashMap, Boolean.valueOf(!this.offsetStores.isEmpty()));
        });
        if (!((Map) tuple2.getItem1()).isEmpty()) {
            this.consumer.mo3815getDelegate().unwrap().commitSync(unwrap((Map) tuple2.getItem1()));
        }
        if (((Boolean) tuple2.getItem2()).booleanValue()) {
            runOnContext(this::startFlushAndCheckHealthTimer);
        }
    }

    private static Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> unwrap(Map<TopicPartition, OffsetAndMetadata> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndMetadata value = entry.getValue();
            hashMap.put(new org.apache.kafka.common.TopicPartition(key.getTopic(), key.getPartition()), new org.apache.kafka.clients.consumer.OffsetAndMetadata(value.getOffset(), value.getMetadata()));
        }
        return hashMap;
    }

    private void stopFlushAndCheckHealthTimer() {
        if (this.timerId != -1) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    private void startFlushAndCheckHealthTimer() {
        this.timerId = this.vertx.setTimer(this.autoCommitInterval, (v1) -> {
            flushAndCheckHealth(v1);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> IncomingKafkaRecord<K, V> received(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        this.offsetStores.computeIfAbsent(getTopicPartition(incomingKafkaRecord), topicPartition -> {
            return new OffsetStore(topicPartition, this.unprocessedRecordMaxAge);
        }).received(incomingKafkaRecord.getOffset());
        if (this.timerId < 0) {
            startFlushAndCheckHealthTimer();
        }
        return incomingKafkaRecord;
    }

    private Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetStore> entry : this.offsetStores.entrySet()) {
            long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset = entry.getValue().clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset();
            if (clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset > -1) {
                hashMap.put(entry.getKey(), Long.valueOf(clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset));
            }
        }
        return hashMap;
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        runOnContext(() -> {
            TopicPartition topicPartition = getTopicPartition(incomingKafkaRecord);
            OffsetStore offsetStore = this.offsetStores.get(topicPartition);
            if (offsetStore != null) {
                offsetStore.processed(incomingKafkaRecord.getOffset());
            } else {
                KafkaLogging.log.messageAckedForRevokedTopicPartition(incomingKafkaRecord.getOffset(), this.groupId, topicPartition.toString());
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private void flushAndCheckHealth(long j) {
        Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping = clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping();
        if (clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping.isEmpty()) {
            startFlushAndCheckHealthTimer();
        } else {
            this.consumer.mo3815getDelegate().commit(getOffsets(clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping), obj -> {
                startFlushAndCheckHealthTimer();
            });
        }
        if (this.unprocessedRecordMaxAge > 0) {
            for (OffsetStore offsetStore : this.offsetStores.values()) {
                if (offsetStore.hasTooManyMessagesWithoutAck()) {
                    this.source.reportFailure(new TooManyMessagesWithoutAckException(offsetStore.topicPartition.getTopic(), offsetStore.topicPartition.getPartition(), offsetStore.getLastCommittedOffset()), true);
                }
            }
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void terminate() {
        commitAllAndAwait();
        runOnContextAndAwait(() -> {
            this.offsetStores.clear();
            stopFlushAndCheckHealthTimer();
            return null;
        });
    }

    private void commitAllAndAwait() {
        commitAndAwait((Map) runOnContextAndAwait(this::clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping));
    }

    private void commitAndAwait(Map<TopicPartition, Long> map) {
        if (map.isEmpty()) {
            return;
        }
        final CompletableFuture completableFuture = new CompletableFuture();
        this.consumer.mo3815getDelegate().commit(getOffsets(map), new Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>>() { // from class: io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit.1
            @Override // io.vertx.core.Handler
            public void handle(AsyncResult<Map<TopicPartition, OffsetAndMetadata>> asyncResult) {
                if (asyncResult.failed()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                } else {
                    completableFuture.complete(null);
                }
            }
        });
        try {
            completableFuture.get(this.autoCommitInterval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            throw new RuntimeException(e2);
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsets(Map<TopicPartition, Long> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue() + 1, null));
        }
        return hashMap;
    }
}
