/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

public class RebalanceListeners {
    static ConsumerRebalanceListener createRebalanceListener(KafkaConnectorIncomingConfiguration config, final String consumerGroup, Instance<KafkaConsumerRebalanceListener> instances, final KafkaConsumer<?, ?> consumer, final KafkaCommitHandler commitHandler) {
        Optional<KafkaConsumerRebalanceListener> rebalanceListener = RebalanceListeners.findMatchingListener(config, consumerGroup, instances);
        if (rebalanceListener.isPresent()) {
            final KafkaConsumerRebalanceListener listener = rebalanceListener.get();
            return new ConsumerRebalanceListener(){

                @Override
                public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                    long demand = consumer.demand();
                    consumer.pause();
                    KafkaLogging.log.executingConsumerRevokedRebalanceListener(consumerGroup);
                    try {
                        listener.onPartitionsRevoked(consumer.getDelegate().unwrap(), partitions);
                        KafkaLogging.log.executedConsumerRevokedRebalanceListener(consumerGroup);
                    }
                    catch (RuntimeException e) {
                        KafkaLogging.log.unableToExecuteConsumerRevokedRebalanceListener(consumerGroup, e);
                        throw e;
                    }
                    finally {
                        consumer.fetch(demand);
                    }
                }

                @Override
                public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                    long demand = consumer.demand();
                    consumer.pause();
                    Collection tps = RebalanceListeners.wrap(partitions);
                    commitHandler.partitionsAssigned(tps);
                    try {
                        listener.onPartitionsAssigned(consumer.getDelegate().unwrap(), partitions);
                        KafkaLogging.log.executedConsumerAssignedRebalanceListener(consumerGroup);
                    }
                    catch (RuntimeException e) {
                        KafkaLogging.log.reEnablingConsumerForGroup(consumerGroup);
                        throw e;
                    }
                    finally {
                        consumer.fetch(demand);
                    }
                }
            };
        }
        return new ConsumerRebalanceListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Collection tps = RebalanceListeners.wrap(partitions);
                long demand = consumer.demand();
                consumer.pause();
                try {
                    commitHandler.partitionsRevoked(tps);
                }
                finally {
                    consumer.fetch(demand);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Collection tps = RebalanceListeners.wrap(partitions);
                long demand = consumer.demand();
                consumer.pause();
                try {
                    commitHandler.partitionsAssigned(tps);
                }
                finally {
                    consumer.fetch(demand);
                }
            }
        };
    }

    private static Collection<TopicPartition> wrap(Collection<org.apache.kafka.common.TopicPartition> partitions) {
        ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>(partitions.size());
        for (org.apache.kafka.common.TopicPartition partition : partitions) {
            tps.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        return tps;
    }

    private static Optional<KafkaConsumerRebalanceListener> findMatchingListener(KafkaConnectorIncomingConfiguration config, String consumerGroup, Instance<KafkaConsumerRebalanceListener> instances) {
        return config.getConsumerRebalanceListenerName().map(name -> {
            KafkaLogging.log.loadingConsumerRebalanceListenerFromConfiguredName((String)name);
            Instance matching = instances.select(NamedLiteral.of(name));
            if (matching.isUnsatisfied()) {
                throw KafkaExceptions.ex.unableToFindRebalanceListener((String)name, config.getChannel());
            }
            if (matching.stream().count() > 1L) {
                throw KafkaExceptions.ex.unableToFindRebalanceListener((String)name, config.getChannel(), (int)matching.stream().count());
            }
            if (matching.stream().count() == 1L) {
                return Optional.of(matching.get());
            }
            return Optional.empty();
        }).orElseGet(() -> {
            Instance matching = instances.select(NamedLiteral.of(consumerGroup));
            if (!matching.isUnsatisfied()) {
                KafkaLogging.log.loadingConsumerRebalanceListenerFromGroupId(consumerGroup);
                return Optional.of(matching.get());
            }
            return Optional.empty();
        });
    }

    public static void inject(KafkaConsumer<?, ?> consumer, ConsumerRebalanceListener listener) {
        KafkaReadStream readStream = consumer.getDelegate().asStream();
        if (readStream instanceof KafkaReadStreamImpl) {
            try {
                Field field = readStream.getClass().getDeclaredField("rebalanceListener");
                field.setAccessible(true);
                field.set(readStream, listener);
            }
            catch (Exception e) {
                throw new IllegalArgumentException("Cannot inject rebalance listener", e);
            }
        } else {
            throw new IllegalArgumentException("Cannot inject rebalance listener - not a Kafka Read Stream");
        }
    }
}

