package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.vertx.core.Context;
import io.vertx.core.cli.UsageMessageFormatter;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.class */
public class ReactiveKafkaConsumer<K, V> implements KafkaConsumer<K, V> {
    private final KafkaSource<K, V> source;
    private Consumer<K, V> consumer;
    private final KafkaConnectorIncomingConfiguration configuration;
    private final Duration pollTimeout;
    private ConsumerRebalanceListener rebalanceListener;
    private final ScheduledExecutorService kafkaWorker;
    private final KafkaRecordStream<K, V> stream;
    private final KafkaRecordBatchStream<K, V> batchStream;
    private final Map<String, Object> kafkaConfiguration;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private final AtomicBoolean paused = new AtomicBoolean();

    public ReactiveKafkaConsumer(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaSource<K, V> kafkaSource) {
        this.configuration = kafkaConnectorIncomingConfiguration;
        this.source = kafkaSource;
        this.kafkaConfiguration = getKafkaConsumerConfiguration(this.configuration, kafkaSource.getConsumerGroup(), kafkaSource.getConsumerIndex());
        Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers = kafkaSource.getDeserializationFailureHandlers();
        String str = (String) this.kafkaConfiguration.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        String str2 = (String) this.kafkaConfiguration.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        if (str2 == null) {
            throw KafkaExceptions.ex.missingValueDeserializer(kafkaConnectorIncomingConfiguration.getChannel(), kafkaConnectorIncomingConfiguration.getChannel());
        }
        DeserializerWrapper deserializerWrapper = new DeserializerWrapper(str, true, getDeserializationHandler(true, deserializationFailureHandlers), kafkaSource, kafkaConnectorIncomingConfiguration.getFailOnDeserializationFailure().booleanValue());
        DeserializerWrapper deserializerWrapper2 = new DeserializerWrapper(str2, false, getDeserializationHandler(false, deserializationFailureHandlers), kafkaSource, kafkaConnectorIncomingConfiguration.getFailOnDeserializationFailure().booleanValue());
        deserializerWrapper.configure(this.kafkaConfiguration, true);
        deserializerWrapper2.configure(this.kafkaConfiguration, false);
        this.pollTimeout = Duration.ofMillis(kafkaConnectorIncomingConfiguration.getPollTimeout().intValue());
        this.kafkaWorker = Executors.newSingleThreadScheduledExecutor(KafkaPollingThread::new);
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(this.kafkaConfiguration, deserializerWrapper, deserializerWrapper2);
        this.stream = new KafkaRecordStream<>(this, kafkaConnectorIncomingConfiguration, kafkaSource.getContext().getDelegate());
        this.batchStream = new KafkaRecordBatchStream<>(this, kafkaConnectorIncomingConfiguration, kafkaSource.getContext().getDelegate());
    }

    public void setRebalanceListener() {
        try {
            this.rebalanceListener = RebalanceListeners.createRebalanceListener(this, this.configuration, this.source.getConsumerGroup(), this.source.getConsumerRebalanceListeners(), this.source.getCommitHandler());
        } catch (Exception e) {
            close();
            throw e;
        }
    }

    public ConsumerRebalanceListener getRebalanceListener() {
        return this.rebalanceListener;
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public <T> Uni<T> runOnPollingThread(Function<Consumer<K, V>, T> function) {
        return Uni.createFrom().item((Supplier) () -> {
            return function.apply(this.consumer);
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> runOnPollingThread(java.util.function.Consumer<Consumer<K, V>> consumer) {
        return Uni.createFrom().item(() -> {
            consumer.accept(this.consumer);
            return null;
        }).runSubscriptionOn(this.kafkaWorker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Uni<Void> executeWithDelay(Runnable runnable, Duration duration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            this.kafkaWorker.schedule(() -> {
                try {
                    runnable.run();
                    uniEmitter.complete(null);
                } catch (Exception e) {
                    uniEmitter.fail(e);
                }
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Uni<ConsumerRecords<K, V>> poll() {
        return this.polling.compareAndSet(false, true) ? runOnPollingThread(consumer -> {
            return System.getSecurityManager() == null ? this.paused.get() ? consumer.poll(Duration.ZERO) : consumer.poll(this.pollTimeout) : (ConsumerRecords) AccessController.doPrivileged(new PrivilegedAction<ConsumerRecords<K, V>>() { // from class: io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.1
                @Override // java.security.PrivilegedAction
                public ConsumerRecords<K, V> run() {
                    return ReactiveKafkaConsumer.this.paused.get() ? consumer.poll(Duration.ZERO) : consumer.poll(ReactiveKafkaConsumer.this.pollTimeout);
                }
            });
        }).eventually(() -> {
            this.polling.set(false);
        }).onFailure(WakeupException.class).recoverWithItem((UniOnFailure) ConsumerRecords.EMPTY) : Uni.createFrom().item((UniCreate) ConsumerRecords.EMPTY);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Set<TopicPartition>> pause() {
        return this.paused.compareAndSet(false, true) ? runOnPollingThread(consumer -> {
            Set<TopicPartition> assignment = this.consumer.assignment();
            this.consumer.pause(assignment);
            return assignment;
        }) : runOnPollingThread((v0) -> {
            return v0.paused();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Set<TopicPartition>> paused() {
        return runOnPollingThread((v0) -> {
            return v0.paused();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Map<TopicPartition, OffsetAndMetadata>> committed(TopicPartition... topicPartitionArr) {
        return runOnPollingThread(consumer -> {
            return consumer.committed(new LinkedHashSet(Arrays.asList(topicPartitionArr)));
        });
    }

    @CheckReturnValue
    public Multi<ConsumerRecord<K, V>> subscribe(Set<String> set) {
        return (Multi<ConsumerRecord<K, V>>) runOnPollingThread(consumer -> {
            consumer.subscribe(set, this.rebalanceListener);
        }).onItem().transformToMulti(r3 -> {
            return this.stream;
        });
    }

    @CheckReturnValue
    public Multi<ConsumerRecord<K, V>> subscribe(Pattern pattern) {
        return (Multi<ConsumerRecord<K, V>>) runOnPollingThread(consumer -> {
            consumer.subscribe(pattern, this.rebalanceListener);
        }).onItem().transformToMulti(r3 -> {
            return this.stream;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @CheckReturnValue
    public Multi<ConsumerRecords<K, V>> subscribeBatch(Set<String> set) {
        return (Multi<ConsumerRecords<K, V>>) runOnPollingThread(consumer -> {
            consumer.subscribe(set, this.rebalanceListener);
        }).onItem().transformToMulti(r3 -> {
            return this.batchStream;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @CheckReturnValue
    public Multi<ConsumerRecords<K, V>> subscribeBatch(Pattern pattern) {
        return (Multi<ConsumerRecords<K, V>>) runOnPollingThread(consumer -> {
            consumer.subscribe(pattern, this.rebalanceListener);
        }).onItem().transformToMulti(r3 -> {
            return this.batchStream;
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> resume() {
        return this.paused.get() ? runOnPollingThread(consumer -> {
            this.consumer.resume(consumer.assignment());
        }).invoke(() -> {
            this.paused.set(false);
        }) : Uni.createFrom().voidItem();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v25, types: [java.lang.String] */
    /* JADX WARN: Type inference failed for: r0v33, types: [java.lang.String] */
    private Map<String, Object> getKafkaConsumerConfiguration(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, String str, int i) {
        HashMap hashMap = new HashMap();
        JsonHelper.asJsonObject(kafkaConnectorIncomingConfiguration.config()).forEach(entry -> {
            hashMap.put((String) entry.getKey(), entry.getValue().toString());
        });
        hashMap.put("group.id", str);
        if (!hashMap.containsKey("reconnect.backoff.max.ms")) {
            hashMap.put("reconnect.backoff.max.ms", "10000");
        }
        String bootstrapServers = kafkaConnectorIncomingConfiguration.getBootstrapServers();
        if (!hashMap.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", bootstrapServers);
            hashMap.put("bootstrap.servers", bootstrapServers);
        }
        if (!hashMap.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
            KafkaLogging.log.keyDeserializerOmitted();
            hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConnectorIncomingConfiguration.getKeyDeserializer());
        }
        if (!hashMap.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
            KafkaLogging.log.disableAutoCommit(kafkaConnectorIncomingConfiguration.getChannel());
            hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        }
        if (i == -1) {
            hashMap.computeIfAbsent("client.id", str2 -> {
                return "kafka-consumer-" + kafkaConnectorIncomingConfiguration.getChannel();
            });
        } else {
            String str3 = (String) hashMap.get("client.id");
            hashMap.put("client.id", str3 == null ? "kafka-consumer-" + kafkaConnectorIncomingConfiguration.getChannel() + UsageMessageFormatter.DEFAULT_OPT_PREFIX + i : str3 + UsageMessageFormatter.DEFAULT_OPT_PREFIX + i);
        }
        ConfigurationCleaner.cleanupConsumerConfiguration(hashMap);
        return hashMap;
    }

    private <T> DeserializationFailureHandler<T> getDeserializationHandler(boolean z, Instance<DeserializationFailureHandler<?>> instance) {
        return createDeserializationFailureHandler(z, instance, this.configuration);
    }

    private static <T> DeserializationFailureHandler<T> createDeserializationFailureHandler(boolean z, Instance<DeserializationFailureHandler<?>> instance, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration) {
        String orElse = z ? kafkaConnectorIncomingConfiguration.getKeyDeserializationFailureHandler().orElse(null) : kafkaConnectorIncomingConfiguration.getValueDeserializationFailureHandler().orElse(null);
        if (orElse == null) {
            return null;
        }
        Instance<DeserializationFailureHandler<?>> select = instance.select(Identifier.Literal.of(orElse));
        if (select.isUnsatisfied()) {
            select = instance.select(NamedLiteral.of(orElse));
            if (!select.isUnsatisfied()) {
                ProviderLogging.log.deprecatedNamed();
            }
        }
        if (select.isUnsatisfied()) {
            throw KafkaExceptions.ex.unableToFindDeserializationFailureHandler(orElse, kafkaConnectorIncomingConfiguration.getChannel());
        }
        if (select.stream().count() > 1) {
            throw KafkaExceptions.ex.unableToFindDeserializationFailureHandler(orElse, kafkaConnectorIncomingConfiguration.getChannel(), (int) select.stream().count());
        }
        if (select.stream().count() == 1) {
            return (DeserializationFailureHandler) select.get();
        }
        return null;
    }

    public String get(String str) {
        return (String) this.kafkaConfiguration.get(str);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    public Consumer<K, V> unwrap() {
        return this.consumer;
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> commit(Map<TopicPartition, OffsetAndMetadata> map) {
        return runOnPollingThread(consumer -> {
            consumer.commitSync((Map<TopicPartition, OffsetAndMetadata>) map);
        });
    }

    @CheckReturnValue
    public Uni<Void> commitAsync(Map<TopicPartition, OffsetAndMetadata> map) {
        return Uni.createFrom().emitter(uniEmitter -> {
            this.consumer.commitAsync(map, (map2, exc) -> {
                if (exc != null) {
                    uniEmitter.fail(exc);
                } else {
                    uniEmitter.complete(null);
                }
            });
        }).runSubscriptionOn(this.kafkaWorker);
    }

    public Map<String, ?> configuration() {
        return this.kafkaConfiguration;
    }

    public void close() {
        int intValue = ((Integer) this.configuration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(1000)).intValue();
        if (this.closed.compareAndSet(false, true)) {
            UniOnItem<Void> onItem = runOnPollingThread(consumer -> {
                if (System.getSecurityManager() == null) {
                    consumer.close(Duration.ofMillis(intValue));
                } else {
                    AccessController.doPrivileged(() -> {
                        consumer.close(Duration.ofMillis(intValue));
                        return null;
                    });
                }
            }).onItem();
            ScheduledExecutorService scheduledExecutorService = this.kafkaWorker;
            Objects.requireNonNull(scheduledExecutorService);
            Uni<Void> invoke = onItem.invoke(scheduledExecutorService::shutdown);
            this.consumer.wakeup();
            if (Context.isOnEventLoopThread()) {
                invoke.subscribeAsCompletionStage();
            } else {
                invoke.await().atMost(Duration.ofMillis(intValue * 2));
            }
        }
    }

    public void injectClient(MockConsumer<?, ?> mockConsumer) {
        Consumer<K, V> consumer = this.consumer;
        this.consumer = mockConsumer;
        consumer.close();
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Map<TopicPartition, Long>> getPositions() {
        return runOnPollingThread(consumer -> {
            HashMap hashMap = new HashMap();
            consumer.assignment().forEach(topicPartition -> {
                hashMap.put(topicPartition, Long.valueOf(consumer.position(topicPartition)));
            });
            return hashMap;
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Set<TopicPartition>> getAssignments() {
        return runOnPollingThread((v0) -> {
            return v0.assignment();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> seek(TopicPartition topicPartition, long j) {
        return runOnPollingThread(consumer -> {
            consumer.seek(topicPartition, j);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        return runOnPollingThread(consumer -> {
            consumer.seek(topicPartition, offsetAndMetadata);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> seekToBeginning(Collection<TopicPartition> collection) {
        return runOnPollingThread(consumer -> {
            consumer.seekToBeginning(collection);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaConsumer
    @CheckReturnValue
    public Uni<Void> seekToEnd(Collection<TopicPartition> collection) {
        return runOnPollingThread(consumer -> {
            consumer.seekToEnd(collection);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isPaused() {
        return this.paused.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeFromQueueRecordsFromTopicPartitions(Collection<TopicPartition> collection) {
        this.stream.removeFromQueueRecordsFromTopicPartitions(collection);
        this.batchStream.removeFromQueueRecordsFromTopicPartitions(collection);
    }
}
