package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit.class */
public class KafkaThrottledLatestProcessedCommit implements KafkaCommitHandler {
    private static final long THROTTLE_TIME_IN_MILLIS = 5000;
    private static final Map<String, Map<Integer, TopicPartition>> TOPIC_PARTITIONS_CACHE = new ConcurrentHashMap();
    private final Map<TopicPartition, OffsetStore> offsetStores = new HashMap();
    private final KafkaConsumer<?, ?> consumer;
    private final KafkaSource<?, ?> source;
    private final int maxReceivedWithoutAckAllowed;
    private volatile Context context;
    private long nextCommitTime;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$OffsetStore.class */
    public static class OffsetStore {
        private final TopicPartition topicPartition;
        private final int maxReceivedWithoutAckAllowed;
        private final int maxReceivedWithoutAckAllowedMinusOne;
        private final Queue<Long> receivedOffsets = new LinkedList();
        private final Set<Long> processedOffsets = new HashSet();
        private long unProcessedTotal = 0;

        OffsetStore(TopicPartition topicPartition, int i) {
            this.topicPartition = topicPartition;
            this.maxReceivedWithoutAckAllowed = i;
            this.maxReceivedWithoutAckAllowedMinusOne = i - 1;
        }

        void received(long j) throws TooManyMessagegsWithoutAckingException {
            this.receivedOffsets.offer(Long.valueOf(j));
            this.unProcessedTotal++;
            if (this.unProcessedTotal < this.maxReceivedWithoutAckAllowed || (this.unProcessedTotal & this.maxReceivedWithoutAckAllowedMinusOne) != 0) {
                return;
            }
            KafkaLogging.log.receivedTooManyMessagesWithoutAcking(this.topicPartition.toString(), this.unProcessedTotal);
            throw new TooManyMessagegsWithoutAckingException();
        }

        void processed(long j) {
            if (this.receivedOffsets.isEmpty() || this.receivedOffsets.peek().longValue() > j) {
                return;
            }
            this.processedOffsets.add(Long.valueOf(j));
        }

        OptionalLong clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
            long j;
            if (!this.processedOffsets.isEmpty()) {
                long j2 = -1;
                while (true) {
                    j = j2;
                    if (this.receivedOffsets.isEmpty() || !this.processedOffsets.remove(this.receivedOffsets.peek())) {
                        break;
                    }
                    this.unProcessedTotal--;
                    j2 = this.receivedOffsets.poll().longValue();
                }
                if (j > -1) {
                    return OptionalLong.of(j);
                }
            }
            return OptionalLong.empty();
        }
    }

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaThrottledLatestProcessedCommit$TooManyMessagegsWithoutAckingException.class */
    public static class TooManyMessagegsWithoutAckingException extends Exception {
        public TooManyMessagegsWithoutAckingException() {
            super("Too Many Messages without Acking");
        }
    }

    private KafkaThrottledLatestProcessedCommit(KafkaConsumer<?, ?> kafkaConsumer, KafkaSource<?, ?> kafkaSource, int i) {
        this.consumer = kafkaConsumer;
        this.source = kafkaSource;
        this.maxReceivedWithoutAckAllowed = i;
    }

    private static int getNextPowerOfTwoEqualOrGreater(int i) {
        if (i <= 0) {
            return 1;
        }
        int i2 = i - 1;
        int i3 = i2 | (i2 >>> 1);
        int i4 = i3 | (i3 >>> 2);
        int i5 = i4 | (i4 >>> 4);
        int i6 = i5 | (i5 >>> 8);
        return (i6 | (i6 >>> 16)) + 1;
    }

    public static void clearCache() {
        TOPIC_PARTITIONS_CACHE.clear();
    }

    public static KafkaThrottledLatestProcessedCommit create(KafkaConsumer<?, ?> kafkaConsumer, Map<String, String> map, KafkaSource<?, ?> kafkaSource) {
        int nextPowerOfTwoEqualOrGreater = getNextPowerOfTwoEqualOrGreater(Integer.parseInt(map.getOrDefault(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500")) * 2);
        KafkaLogging.log.settingMaxReceivedWithoutAckAllowed(map.get("group.id"), nextPowerOfTwoEqualOrGreater);
        return new KafkaThrottledLatestProcessedCommit(kafkaConsumer, kafkaSource, nextPowerOfTwoEqualOrGreater);
    }

    private <K, V> TopicPartition getTopicPartition(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        return TOPIC_PARTITIONS_CACHE.computeIfAbsent(incomingKafkaRecord.getTopic(), str -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(incomingKafkaRecord.getPartition()), num -> {
            return new TopicPartition(incomingKafkaRecord.getTopic(), num.intValue());
        });
    }

    private OffsetStore getOffsetStore(TopicPartition topicPartition) {
        return this.offsetStores.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new OffsetStore(topicPartition2, this.maxReceivedWithoutAckAllowed);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsAssigned(Context context, Set<TopicPartition> set) {
        this.context = context;
        this.offsetStores.clear();
        resetNextCommitTime();
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> IncomingKafkaRecord<K, V> received(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        try {
            getOffsetStore(getTopicPartition(incomingKafkaRecord)).received(incomingKafkaRecord.getOffset());
        } catch (TooManyMessagegsWithoutAckingException e) {
            this.source.reportFailure(e);
        }
        return incomingKafkaRecord;
    }

    private void resetNextCommitTime() {
        this.nextCommitTime = System.currentTimeMillis() + 5000;
    }

    private Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping() {
        HashMap hashMap = new HashMap();
        this.offsetStores.entrySet().forEach(entry -> {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            ((OffsetStore) entry.getValue()).clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset().ifPresent(j -> {
            });
        });
        return hashMap;
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.context.runOnContext(r8 -> {
            this.offsetStores.get(getTopicPartition(incomingKafkaRecord)).processed(incomingKafkaRecord.getOffset());
            if (System.currentTimeMillis() > this.nextCommitTime) {
                resetNextCommitTime();
                Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping = clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping();
                if (!clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping.isEmpty()) {
                    this.consumer.mo3543getDelegate().commit((Map) clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping.entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, entry -> {
                        return new OffsetAndMetadata().setOffset(((Long) entry.getValue()).longValue() + 1);
                    })), obj -> {
                        completableFuture.complete(null);
                    });
                    return;
                }
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }
}
