package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.vertx.mutiny.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaLatestCommit.class */
public class KafkaLatestCommit extends ContextHolder implements KafkaCommitHandler {
    private final ReactiveKafkaConsumer<?, ?> consumer;
    private final Map<TopicPartition, Long> offsets;

    public KafkaLatestCommit(Vertx vertx, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, ReactiveKafkaConsumer<?, ?> reactiveKafkaConsumer) {
        super(vertx.mo3841getDelegate(), ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue());
        this.offsets = new HashMap();
        this.consumer = reactiveKafkaConsumer;
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        runOnContext(() -> {
            HashMap hashMap = new HashMap();
            TopicPartition topicPartition = new TopicPartition(incomingKafkaRecord.getTopic(), incomingKafkaRecord.getPartition());
            Long l = this.offsets.get(topicPartition);
            if (l != null && l.longValue() >= incomingKafkaRecord.getOffset() + 1) {
                completableFuture.complete(null);
                return;
            }
            this.offsets.put(topicPartition, Long.valueOf(incomingKafkaRecord.getOffset() + 1));
            hashMap.put(topicPartition, new OffsetAndMetadata(incomingKafkaRecord.getOffset() + 1, null));
            UniSubscribe<Void> subscribe = this.consumer.commit(hashMap).subscribe();
            Consumer<? super Void> consumer = r4 -> {
                completableFuture.complete(null);
            };
            Objects.requireNonNull(completableFuture);
            subscribe.with(consumer, completableFuture::completeExceptionally);
        });
        return completableFuture;
    }
}
