package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.mutiny.core.Vertx;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.class */
public class KafkaThrottledLatestProcessedCommit extends ContextHolder implements KafkaCommitHandler {
    private static final Map<String, Map<Integer, TopicPartition>> TOPIC_PARTITIONS_CACHE = new ConcurrentHashMap();
    private final Map<TopicPartition, OffsetStore> offsetStores;
    private final String groupId;
    private final ReactiveKafkaConsumer<?, ?> consumer;
    private final KafkaSource<?, ?> source;
    private final int unprocessedRecordMaxAge;
    private final int autoCommitInterval;
    private volatile long timerId;
    private final Collection<TopicPartition> assignments;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$OffsetReceivedAt.class */
    public static class OffsetReceivedAt {
        private final long offset;
        private final long receivedAt;

        private OffsetReceivedAt(long j, long j2) {
            this.offset = j;
            this.receivedAt = j2;
        }

        static OffsetReceivedAt received(long j) {
            return new OffsetReceivedAt(j, System.currentTimeMillis());
        }

        public long getOffset() {
            return this.offset;
        }

        public long getReceivedAt() {
            return this.receivedAt;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$OffsetStore.class */
    public class OffsetStore {
        private final TopicPartition topicPartition;
        private final int unprocessedRecordMaxAge;
        private long lastProcessedOffset;
        private final Queue<OffsetReceivedAt> receivedOffsets = new LinkedList();
        private final Set<Long> processedOffsets = new HashSet();
        private final AtomicLong unProcessedTotal = new AtomicLong();

        OffsetStore(TopicPartition topicPartition, int i, long j) {
            this.topicPartition = topicPartition;
            this.unprocessedRecordMaxAge = i;
            KafkaLogging.log.initializeStoreAtPosition(topicPartition, j);
            this.lastProcessedOffset = j;
        }

        long getLastProcessedOffset() {
            return this.lastProcessedOffset;
        }

        void received(long j) {
            if (j <= this.lastProcessedOffset) {
                KafkaLogging.log.receivedOutdatedOffset(this.topicPartition, j, this.lastProcessedOffset);
            } else {
                this.receivedOffsets.offer(OffsetReceivedAt.received(j));
                this.unProcessedTotal.incrementAndGet();
            }
        }

        void processed(long j) {
            OffsetReceivedAt peek = this.receivedOffsets.peek();
            if (peek == null || peek.getOffset() > j) {
                return;
            }
            this.processedOffsets.add(Long.valueOf(j));
        }

        long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
            if (!this.processedOffsets.isEmpty()) {
                long j = -1;
                while (!this.receivedOffsets.isEmpty() && this.processedOffsets.remove(Long.valueOf(this.receivedOffsets.peek().getOffset()))) {
                    this.unProcessedTotal.decrementAndGet();
                    OffsetReceivedAt poll = this.receivedOffsets.poll();
                    if (poll != null) {
                        j = poll.getOffset();
                    }
                }
                if (j > -1) {
                    this.lastProcessedOffset = j;
                    removeReceivedOffsetsFromLastProcessedOffset();
                    return j;
                }
            }
            removeReceivedOffsetsFromLastProcessedOffset();
            return -1L;
        }

        private void removeReceivedOffsetsFromLastProcessedOffset() {
            this.receivedOffsets.removeIf(offsetReceivedAt -> {
                return offsetReceivedAt.getOffset() <= this.lastProcessedOffset;
            });
        }

        long hasTooManyMessagesWithoutAck() {
            OffsetReceivedAt peek;
            if (this.receivedOffsets.isEmpty() || !isStillAssigned() || (peek = this.receivedOffsets.peek()) == null) {
                return -1L;
            }
            long currentTimeMillis = System.currentTimeMillis() - peek.getReceivedAt();
            long size = this.receivedOffsets.size();
            if (!(currentTimeMillis > ((long) this.unprocessedRecordMaxAge))) {
                return -1L;
            }
            KafkaLogging.log.waitingForAckForTooLong(peek.getOffset(), this.topicPartition, currentTimeMillis / 1000, this.unprocessedRecordMaxAge, size, this.lastProcessedOffset);
            return currentTimeMillis;
        }

        private boolean isStillAssigned() {
            return KafkaThrottledLatestProcessedCommit.this.assignments.contains(this.topicPartition);
        }

        long getUnprocessedCount() {
            return this.unProcessedTotal.get();
        }
    }

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException.class */
    public static class TooManyMessagesWithoutAckException extends NoStackTraceThrowable {
        public TooManyMessagesWithoutAckException(TopicPartition topicPartition, long j, long j2, long j3, long j4) {
            super(String.format("The record %d from topic/partition '%s' has waited for %d seconds to be acknowledged. At the moment %d messages from this partition are awaiting acknowledgement. The last committed offset for this partition was %d.", Long.valueOf(j), topicPartition, Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4)));
        }
    }

    private KafkaThrottledLatestProcessedCommit(String str, Vertx vertx, ReactiveKafkaConsumer<?, ?> reactiveKafkaConsumer, KafkaSource<?, ?> kafkaSource, int i, int i2, int i3) {
        super(vertx.mo4340getDelegate(), i3);
        this.offsetStores = new HashMap();
        this.timerId = -1L;
        this.assignments = Collections.newSetFromMap(new ConcurrentHashMap());
        this.groupId = str;
        this.consumer = reactiveKafkaConsumer;
        this.source = kafkaSource;
        this.unprocessedRecordMaxAge = i;
        this.autoCommitInterval = i2;
    }

    public static void clearCache() {
        TOPIC_PARTITIONS_CACHE.clear();
    }

    public static KafkaThrottledLatestProcessedCommit create(Vertx vertx, ReactiveKafkaConsumer<?, ?> reactiveKafkaConsumer, String str, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaSource<?, ?> kafkaSource) {
        int intValue = kafkaConnectorIncomingConfiguration.getThrottledUnprocessedRecordMaxAgeMs().intValue();
        int intValue2 = ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, Integer.class).orElse(5000)).intValue();
        int intValue3 = ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue();
        KafkaLogging.log.settingCommitInterval(str, intValue2);
        if (intValue <= 0) {
            KafkaLogging.log.disableThrottledCommitStrategyHealthCheck(str);
        } else {
            KafkaLogging.log.setThrottledCommitStrategyReceivedRecordMaxAge(str, intValue);
        }
        return new KafkaThrottledLatestProcessedCommit(str, vertx, reactiveKafkaConsumer, kafkaSource, intValue, intValue2, intValue3);
    }

    private <K, V> TopicPartition getTopicPartition(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        return TOPIC_PARTITIONS_CACHE.computeIfAbsent(incomingKafkaRecord.getTopic(), str -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(incomingKafkaRecord.getPartition()), num -> {
            return new TopicPartition(incomingKafkaRecord.getTopic(), num.intValue());
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsAssigned(Collection<TopicPartition> collection) {
        runOnContextAndAwait(() -> {
            stopFlushAndCheckHealthTimer();
            this.assignments.addAll(collection);
            if (collection.isEmpty() && this.offsetStores.isEmpty()) {
                return null;
            }
            startFlushAndCheckHealthTimer();
            return null;
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsRevoked(Collection<TopicPartition> collection) {
        Tuple2 tuple2 = (Tuple2) runOnContextAndAwait(() -> {
            OffsetStore remove;
            stopFlushAndCheckHealthTimer();
            this.assignments.removeAll(collection);
            HashMap hashMap = new HashMap();
            Iterator it = new HashSet(this.offsetStores.keySet()).iterator();
            while (it.hasNext()) {
                TopicPartition topicPartition = (TopicPartition) it.next();
                if (!this.assignments.contains(topicPartition) && (remove = this.offsetStores.remove(topicPartition)) != null) {
                    long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset = remove.clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset();
                    if (clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset > -1) {
                        hashMap.put(topicPartition, new OffsetAndMetadata(clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset + 1, null));
                        KafkaLogging.log.partitionRevokedCollectingRecordsToCommit(topicPartition, clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset + 1);
                    }
                }
            }
            return Tuple2.of(hashMap, Boolean.valueOf(!this.offsetStores.isEmpty()));
        });
        if (!((Map) tuple2.getItem1()).isEmpty()) {
            this.consumer.unwrap().commitSync((Map<TopicPartition, OffsetAndMetadata>) tuple2.getItem1());
        }
        if (((Boolean) tuple2.getItem2()).booleanValue()) {
            runOnContext(this::startFlushAndCheckHealthTimer);
        }
    }

    private void stopFlushAndCheckHealthTimer() {
        if (this.timerId != -1) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    private void startFlushAndCheckHealthTimer() {
        this.timerId = this.vertx.setTimer(this.autoCommitInterval, l -> {
            runOnContext(() -> {
                flushAndCheckHealth(l.longValue());
            });
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        TopicPartition topicPartition = getTopicPartition(incomingKafkaRecord);
        OffsetStore offsetStore = this.offsetStores.get(topicPartition);
        return (offsetStore == null ? this.consumer.committed(topicPartition).emitOn(runnable -> {
            this.context.runOnContext(r3 -> {
                runnable.run();
            });
        }).onItem().transform(map -> {
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) map.get(topicPartition);
            OffsetStore offsetStore2 = new OffsetStore(topicPartition, this.unprocessedRecordMaxAge, offsetAndMetadata == null ? -1L : offsetAndMetadata.offset() - 1);
            this.offsetStores.put(topicPartition, offsetStore2);
            return offsetStore2;
        }) : Uni.createFrom().item((UniCreate) offsetStore)).onItem().invoke(offsetStore2 -> {
            offsetStore2.received(incomingKafkaRecord.getOffset());
            if (this.timerId < 0) {
                startFlushAndCheckHealthTimer();
            }
        }).onItem().transform(offsetStore3 -> {
            return incomingKafkaRecord;
        });
    }

    private Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping() {
        cleanupPartitionOffsetStore();
        HashMap hashMap = new HashMap();
        Iterator it = new HashSet(this.offsetStores.entrySet()).iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            if (this.assignments.contains(entry.getKey())) {
                long clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset = ((OffsetStore) entry.getValue()).clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset();
                if (clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset > -1) {
                    hashMap.put((TopicPartition) entry.getKey(), Long.valueOf(clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset));
                }
            }
        }
        return hashMap;
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        runOnContext(() -> {
            TopicPartition topicPartition = getTopicPartition(incomingKafkaRecord);
            OffsetStore offsetStore = this.offsetStores.get(topicPartition);
            if (offsetStore != null) {
                offsetStore.processed(incomingKafkaRecord.getOffset());
            } else {
                KafkaLogging.log.acknowledgementFromRevokedTopicPartition(incomingKafkaRecord.getOffset(), topicPartition, this.groupId, this.assignments);
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private void flushAndCheckHealth(long j) {
        OffsetReceivedAt offsetReceivedAt;
        Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping = clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping();
        if (clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping.isEmpty()) {
            startFlushAndCheckHealthTimer();
        } else {
            Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping);
            this.consumer.commit(offsets).subscribe().with(r5 -> {
                KafkaLogging.log.committed(offsets);
                startFlushAndCheckHealthTimer();
            }, th -> {
                KafkaLogging.log.failedToCommit(offsets, th);
                startFlushAndCheckHealthTimer();
            });
        }
        if (this.unprocessedRecordMaxAge > 0) {
            for (OffsetStore offsetStore : this.offsetStores.values()) {
                long hasTooManyMessagesWithoutAck = offsetStore.hasTooManyMessagesWithoutAck();
                if (hasTooManyMessagesWithoutAck != -1 && (offsetReceivedAt = (OffsetReceivedAt) offsetStore.receivedOffsets.peek()) != null) {
                    this.source.reportFailure(new TooManyMessagesWithoutAckException(offsetStore.topicPartition, offsetReceivedAt.offset, hasTooManyMessagesWithoutAck / 1000, offsetStore.receivedOffsets.size(), offsetStore.getLastProcessedOffset()), true);
                }
            }
        }
    }

    private void cleanupPartitionOffsetStore() {
        Iterator it = new HashSet(this.offsetStores.keySet()).iterator();
        while (it.hasNext()) {
            TopicPartition topicPartition = (TopicPartition) it.next();
            if (!this.assignments.contains(topicPartition)) {
                KafkaLogging.log.removingPartitionFromStore(topicPartition, this.assignments);
                this.offsetStores.remove(topicPartition);
            }
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void terminate(boolean z) {
        if (z) {
            long waitForProcessing = waitForProcessing();
            if (waitForProcessing > 0) {
                KafkaLogging.log.messageStillUnprocessedAfterTimeout(waitForProcessing);
            }
        }
        commitAllAndAwait();
        runOnContextAndAwait(() -> {
            this.offsetStores.clear();
            stopFlushAndCheckHealthTimer();
            return null;
        });
    }

    private long waitForProcessing() {
        int i = this.autoCommitInterval / 100;
        for (int i2 = 0; i2 < i; i2++) {
            long sum = this.offsetStores.values().stream().map((v0) -> {
                return v0.getUnprocessedCount();
            }).mapToLong(l -> {
                return l.longValue();
            }).sum();
            if (sum == 0) {
                return sum;
            }
            KafkaLogging.log.waitingForMessageProcessing(sum);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return this.offsetStores.values().stream().map((v0) -> {
            return v0.getUnprocessedCount();
        }).mapToLong(l2 -> {
            return l2.longValue();
        }).sum();
    }

    private void commitAllAndAwait() {
        commitAndAwait((Map) runOnContextAndAwait(this::clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping));
    }

    private void commitAndAwait(Map<TopicPartition, Long> map) {
        if (map.isEmpty()) {
            return;
        }
        try {
            this.consumer.commit(getOffsets(map)).subscribeAsCompletionStage().get(this.autoCommitInterval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            throw new RuntimeException(e2);
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsets(Map<TopicPartition, Long> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue() + 1, null));
        }
        return hashMap;
    }
}
