package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/RebalanceListeners.class */
public class RebalanceListeners {
    /* JADX INFO: Access modifiers changed from: package-private */
    public static ConsumerRebalanceListener createRebalanceListener(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, final String str, Instance<KafkaConsumerRebalanceListener> instance, final KafkaConsumer<?, ?> kafkaConsumer, final KafkaCommitHandler kafkaCommitHandler) {
        Optional<KafkaConsumerRebalanceListener> findMatchingListener = findMatchingListener(kafkaConnectorIncomingConfiguration, str, instance);
        if (!findMatchingListener.isPresent()) {
            return new ConsumerRebalanceListener() { // from class: io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.2
                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    Collection<io.vertx.kafka.client.common.TopicPartition> wrap = RebalanceListeners.wrap(collection);
                    long demand = KafkaConsumer.this.demand();
                    KafkaConsumer.this.pause2();
                    try {
                        kafkaCommitHandler.partitionsRevoked(wrap);
                        KafkaConsumer.this.fetch2(demand);
                    } catch (Throwable th) {
                        KafkaConsumer.this.fetch2(demand);
                        throw th;
                    }
                }

                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    Collection<io.vertx.kafka.client.common.TopicPartition> wrap = RebalanceListeners.wrap(collection);
                    long demand = KafkaConsumer.this.demand();
                    KafkaConsumer.this.pause2();
                    try {
                        kafkaCommitHandler.partitionsAssigned(wrap);
                        KafkaConsumer.this.fetch2(demand);
                    } catch (Throwable th) {
                        KafkaConsumer.this.fetch2(demand);
                        throw th;
                    }
                }
            };
        }
        final KafkaConsumerRebalanceListener kafkaConsumerRebalanceListener = findMatchingListener.get();
        return new ConsumerRebalanceListener() { // from class: io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.1
            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                long demand = KafkaConsumer.this.demand();
                KafkaConsumer.this.pause2();
                KafkaLogging.log.executingConsumerRevokedRebalanceListener(str);
                try {
                    try {
                        kafkaConsumerRebalanceListener.onPartitionsRevoked(KafkaConsumer.this.mo3401getDelegate().unwrap(), collection);
                        KafkaLogging.log.executedConsumerRevokedRebalanceListener(str);
                        KafkaConsumer.this.fetch2(demand);
                    } catch (RuntimeException e) {
                        KafkaLogging.log.unableToExecuteConsumerRevokedRebalanceListener(str, e);
                        throw e;
                    }
                } catch (Throwable th) {
                    KafkaConsumer.this.fetch2(demand);
                    throw th;
                }
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                long demand = KafkaConsumer.this.demand();
                KafkaConsumer.this.pause2();
                kafkaCommitHandler.partitionsAssigned(RebalanceListeners.wrap(collection));
                try {
                    try {
                        kafkaConsumerRebalanceListener.onPartitionsAssigned(KafkaConsumer.this.mo3401getDelegate().unwrap(), collection);
                        KafkaLogging.log.executedConsumerAssignedRebalanceListener(str);
                        KafkaConsumer.this.fetch2(demand);
                    } catch (RuntimeException e) {
                        KafkaLogging.log.reEnablingConsumerForGroup(str);
                        throw e;
                    }
                } catch (Throwable th) {
                    KafkaConsumer.this.fetch2(demand);
                    throw th;
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Collection<io.vertx.kafka.client.common.TopicPartition> wrap(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        for (TopicPartition topicPartition : collection) {
            arrayList.add(new io.vertx.kafka.client.common.TopicPartition(topicPartition.topic(), topicPartition.partition()));
        }
        return arrayList;
    }

    private static Optional<KafkaConsumerRebalanceListener> findMatchingListener(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, String str, Instance<KafkaConsumerRebalanceListener> instance) {
        return (Optional) kafkaConnectorIncomingConfiguration.getConsumerRebalanceListenerName().map(str2 -> {
            KafkaLogging.log.loadingConsumerRebalanceListenerFromConfiguredName(str2);
            Instance select = instance.select(NamedLiteral.of(str2));
            if (select.isUnsatisfied()) {
                throw KafkaExceptions.ex.unableToFindRebalanceListener(str2, kafkaConnectorIncomingConfiguration.getChannel());
            }
            if (select.stream().count() > 1) {
                throw KafkaExceptions.ex.unableToFindRebalanceListener(str2, kafkaConnectorIncomingConfiguration.getChannel(), (int) select.stream().count());
            }
            return select.stream().count() == 1 ? Optional.of(select.get()) : Optional.empty();
        }).orElseGet(() -> {
            Instance select = instance.select(NamedLiteral.of(str));
            if (select.isUnsatisfied()) {
                return Optional.empty();
            }
            KafkaLogging.log.loadingConsumerRebalanceListenerFromGroupId(str);
            return Optional.of(select.get());
        });
    }

    public static void inject(KafkaConsumer<?, ?> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener) {
        KafkaReadStream asStream = kafkaConsumer.mo3401getDelegate().asStream();
        if (!(asStream instanceof KafkaReadStreamImpl)) {
            throw new IllegalArgumentException("Cannot inject rebalance listener - not a Kafka Read Stream");
        }
        try {
            Field declaredField = asStream.getClass().getDeclaredField("rebalanceListener");
            declaredField.setAccessible(true);
            declaredField.set(asStream, consumerRebalanceListener);
        } catch (Exception e) {
            throw new IllegalArgumentException("Cannot inject rebalance listener", e);
        }
    }
}
