package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.vertx.core.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStream.class */
public class KafkaRecordStream<K, V> extends AbstractMulti<ConsumerRecord<K, V>> {
    private final ReactiveKafkaConsumer<K, V> client;
    private final KafkaConnectorIncomingConfiguration config;
    private final Context context;
    private final Set<KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>>> subscriptions = Collections.newSetFromMap(new ConcurrentHashMap());

    public KafkaRecordStream(ReactiveKafkaConsumer<K, V> reactiveKafkaConsumer, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Context context) {
        this.config = kafkaConnectorIncomingConfiguration;
        this.client = reactiveKafkaConsumer;
        this.context = context;
    }

    @Override // io.smallrye.mutiny.operators.AbstractMulti
    public void subscribe(MultiSubscriber<? super ConsumerRecord<K, V>> multiSubscriber) {
        KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>> kafkaRecordStreamSubscription = new KafkaRecordStreamSubscription<>(this.client, this.config, multiSubscriber, this.context, ((Integer) this.config.config().getOptionalValue(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.class).orElse(500)).intValue(), (consumerRecords, recordQueue) -> {
            recordQueue.addAll(consumerRecords);
        });
        this.subscriptions.add(kafkaRecordStreamSubscription);
        multiSubscriber.onSubscribe(kafkaRecordStreamSubscription);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeFromQueueRecordsFromTopicPartitions(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        collection.forEach(topicPartition -> {
            ((Set) hashMap.computeIfAbsent(topicPartition.topic(), str -> {
                return new HashSet();
            })).add(Integer.valueOf(topicPartition.partition()));
        });
        this.subscriptions.forEach(kafkaRecordStreamSubscription -> {
            removeFromQueue(kafkaRecordStreamSubscription, hashMap);
        });
    }

    private void removeFromQueue(KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>> kafkaRecordStreamSubscription, Map<String, Set<Integer>> map) {
        kafkaRecordStreamSubscription.rewriteQueue(consumerRecord -> {
            Set set = (Set) map.get(consumerRecord.topic());
            if (set == null || !set.contains(Integer.valueOf(consumerRecord.partition()))) {
                return consumerRecord;
            }
            return null;
        });
    }
}
