/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import io.smallrye.reactive.messaging.kafka.companion.KafkaTask;
import io.smallrye.reactive.messaging.kafka.companion.RecordQualifiers;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.jboss.logging.Logger;

public class ConsumerBuilder<K, V>
implements ConsumerRebalanceListener,
Closeable {
    private static final Logger LOGGER = Logger.getLogger(ConsumerBuilder.class);
    private final Map<String, Object> props;
    private final Function<Map<String, Object>, KafkaConsumer<K, V>> consumerCreator;
    private KafkaConsumer<K, V> kafkaConsumer;
    private ScheduledExecutorService consumerExecutor;
    private Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> commitAsyncFunction;
    private Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> commitSyncFunction;
    private OffsetCommitCallback commitAsyncCallback;
    private Consumer<Collection<TopicPartition>> onPartitionsAssigned;
    private Consumer<Collection<TopicPartition>> onPartitionsRevoked;
    private BiConsumer<KafkaConsumer<K, V>, Throwable> onTermination = this::terminate;
    private final Duration kafkaApiTimeout;
    private Duration pollTimeout = Duration.ofMillis(10L);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private final Set<TopicPartition> assignment = new HashSet<TopicPartition>();

    public ConsumerBuilder(Map<String, Object> props, Duration kafkaApiTimeout, Deserializer<K> keyDeser, Deserializer<V> valueDeser) {
        this.props = props;
        this.kafkaApiTimeout = kafkaApiTimeout;
        this.consumerCreator = p -> new KafkaConsumer(p, keyDeser, valueDeser);
    }

    public ConsumerBuilder(Map<String, Object> props, Duration kafkaApiTimeout, String keyDeserializerType, String valueDeserializerType) {
        this.props = props;
        this.kafkaApiTimeout = kafkaApiTimeout;
        props.put("key.deserializer", keyDeserializerType);
        props.put("value.deserializer", valueDeserializerType);
        this.consumerCreator = KafkaConsumer::new;
    }

    private synchronized KafkaConsumer<K, V> getOrCreateConsumer() {
        if (this.kafkaConsumer == null) {
            this.kafkaConsumer = this.consumerCreator.apply(this.props);
        }
        return this.kafkaConsumer;
    }

    private synchronized ScheduledExecutorService getOrCreateExecutor() {
        if (this.consumerExecutor == null) {
            this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(c -> new Thread(c, "consumer-" + this.clientId()));
        }
        return this.consumerExecutor;
    }

    public KafkaConsumer<K, V> unwrap() {
        return this.kafkaConsumer;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOGGER.infof("%s revoked partitions %s", (Object)this.clientId(), partitions);
        this.assignment.removeAll(partitions);
        if (this.onPartitionsRevoked != null) {
            this.onPartitionsRevoked.accept(partitions);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOGGER.infof("%s assigned partitions %s", (Object)this.clientId(), partitions);
        this.assignment.addAll(partitions);
        if (this.onPartitionsAssigned != null) {
            this.onPartitionsAssigned.accept(partitions);
        }
    }

    @Override
    public synchronized void close() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
            Uni.createFrom().voidItem().invoke(() -> {
                LOGGER.infof("Closing consumer %s", (Object)this.clientId());
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.close(this.kafkaApiTimeout);
                    this.kafkaConsumer = null;
                }
                if (this.consumerExecutor != null) {
                    this.consumerExecutor.shutdown();
                    this.consumerExecutor = null;
                }
                this.polling.compareAndSet(true, false);
            }).runSubscriptionOn((Executor)this.getOrCreateExecutor()).subscribeAsCompletionStage();
        }
    }

    public void terminate(KafkaConsumer<K, V> consumer, Throwable throwable) {
        this.close();
    }

    public String clientId() {
        return (String)this.props.get("client.id");
    }

    public String groupId() {
        return (String)this.props.get("group.id");
    }

    public ConsumerBuilder<K, V> withProp(String key, Object value) {
        this.props.put(key, value);
        return this;
    }

    public ConsumerBuilder<K, V> withProps(Map<String, Object> properties) {
        this.props.putAll(properties);
        return this;
    }

    public ConsumerBuilder<K, V> withClientId(String clientId) {
        return this.withProp("client.id", clientId);
    }

    public ConsumerBuilder<K, V> withGroupId(String groupId) {
        return this.withProp("group.id", groupId);
    }

    public ConsumerBuilder<K, V> withOffsetReset(OffsetResetStrategy offsetResetStrategy) {
        return this.withProp("auto.offset.reset", offsetResetStrategy.name().toLowerCase());
    }

    public ConsumerBuilder<K, V> withIsolationLevel(IsolationLevel isolationLevel) {
        return this.withProp("isolation.level", isolationLevel.name().toLowerCase());
    }

    public ConsumerBuilder<K, V> withPollTimeout(Duration timeout) {
        this.pollTimeout = timeout;
        return this;
    }

    public ConsumerBuilder<K, V> withAutoCommit() {
        this.withProp("enable.auto.commit", "true");
        return this;
    }

    public ConsumerBuilder<K, V> withCommitAsyncWhen(Predicate<ConsumerRecord<K, V>> commitPredicate) {
        return this.withCommitAsync(cr -> commitPredicate.test((ConsumerRecord<K, V>)cr) ? this.getOffsetMapFromConsumerRecord((ConsumerRecord<?, ?>)cr) : Collections.emptyMap(), (offsets, exception) -> {});
    }

    public ConsumerBuilder<K, V> withCommitAsync(Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> commitFunction, OffsetCommitCallback callback) {
        this.commitAsyncFunction = commitFunction;
        this.commitAsyncCallback = callback;
        return this;
    }

    public ConsumerBuilder<K, V> withCommitSyncWhen(Predicate<ConsumerRecord<K, V>> commitPredicate) {
        this.commitSyncFunction = cr -> commitPredicate.test((ConsumerRecord<K, V>)cr) ? this.getOffsetMapFromConsumerRecord((ConsumerRecord<?, ?>)cr) : Collections.emptyMap();
        return this;
    }

    public ConsumerBuilder<K, V> withCommitSync(Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> commitFunction) {
        this.commitSyncFunction = commitFunction;
        return this;
    }

    public ConsumerBuilder<K, V> withOnPartitionsAssigned(Consumer<Collection<TopicPartition>> onPartitionsAssigned) {
        this.onPartitionsAssigned = onPartitionsAssigned;
        return this;
    }

    public ConsumerBuilder<K, V> withOnPartitionsRevoked(Consumer<Collection<TopicPartition>> onPartitionsRevoked) {
        this.onPartitionsRevoked = onPartitionsRevoked;
        return this;
    }

    public ConsumerBuilder<K, V> withOnTermination(BiConsumer<KafkaConsumer<K, V>, Throwable> onTermination) {
        this.onTermination = onTermination;
        return this;
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetMapFromConsumerRecord(ConsumerRecord<?, ?> consumerRecord) {
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        map.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1L));
        return map;
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.getOrCreateConsumer().groupMetadata();
    }

    public Set<TopicPartition> currentAssignment() {
        return Collections.unmodifiableSet(this.assignment);
    }

    private Uni<Set<TopicPartition>> assignmentUni() {
        return Uni.createFrom().item(() -> this.getOrCreateConsumer().assignment()).runSubscriptionOn((Executor)this.getOrCreateExecutor());
    }

    public Set<TopicPartition> assignment() {
        return (Set)Uni.createFrom().item(() -> this.getOrCreateConsumer().assignment()).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Uni<Set<TopicPartition>> waitForAssignment() {
        return KafkaCompanion.waitFor(this.assignmentUni(), partitions -> partitions != null && !partitions.isEmpty(), this.pollTimeout);
    }

    public long position(TopicPartition partition) {
        return (Long)Uni.createFrom().item(() -> this.getOrCreateConsumer().position(partition)).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Map<TopicPartition, OffsetAndMetadata> position() {
        return (Map)this.assignmentUni().onItem().transform(assignment -> {
            KafkaConsumer consumer = this.getOrCreateConsumer();
            return assignment.stream().collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(consumer.position(tp))));
        }).await().atMost(this.kafkaApiTimeout);
    }

    public void resetToLastCommittedPositions() {
        Map<TopicPartition, OffsetAndMetadata> committed = this.committed();
        for (TopicPartition tp : this.assignment()) {
            OffsetAndMetadata offsetAndMetadata = committed.get(tp);
            if (offsetAndMetadata != null) {
                this.getOrCreateConsumer().seek(tp, offsetAndMetadata.offset());
                continue;
            }
            this.getOrCreateConsumer().seekToBeginning(Collections.singleton(tp));
        }
    }

    public OffsetAndMetadata committed(TopicPartition partition) {
        return (OffsetAndMetadata)Uni.createFrom().item(() -> this.getOrCreateConsumer().committed(Collections.singleton(partition))).onItem().transform(map -> (OffsetAndMetadata)map.get(partition)).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed(TopicPartition ... partitions) {
        return (Map)Uni.createFrom().item(() -> this.getOrCreateConsumer().committed(new HashSet<TopicPartition>(Arrays.asList(partitions)))).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed() {
        return (Map)this.assignmentUni().onItem().transform(partitions -> this.getOrCreateConsumer().committed(partitions)).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public void pause() {
        this.assignmentUni().onItem().invoke(partitions -> this.getOrCreateConsumer().pause((Collection)partitions)).replaceWithVoid().await().indefinitely();
    }

    public void resume() {
        this.assignmentUni().onItem().invoke(partitions -> this.getOrCreateConsumer().resume((Collection)partitions)).replaceWithVoid().await().atMost(this.kafkaApiTimeout);
    }

    private Uni<ConsumerRecords<K, V>> poll() {
        return Uni.createFrom().item(() -> {
            try {
                return this.getOrCreateConsumer().poll(this.pollTimeout);
            }
            catch (WakeupException e) {
                return ConsumerRecords.EMPTY;
            }
        }).onItem().transformToUni(cr -> {
            if (cr.isEmpty()) {
                return Uni.createFrom().item(cr).onItem().delayIt().onExecutor(this.getOrCreateExecutor()).by(Duration.ofMillis(2L));
            }
            return Uni.createFrom().item(cr);
        });
    }

    <T> Multi<T> process(Set<String> topics, Function<Multi<ConsumerRecord<K, V>>, Multi<T>> plugFunction) {
        return Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure((Throwable)new IllegalStateException("Consumer already in use"));
            }
            this.getOrCreateConsumer().subscribe((Collection)topics, (ConsumerRebalanceListener)this);
            return this.getConsumeMulti().plug(plugFunction);
        });
    }

    <T> Multi<T> processBatch(Set<String> topics, Function<ConsumerRecords<K, V>, Multi<T>> consumerRecordFunction) {
        return Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure((Throwable)new IllegalStateException("Consumer already in use"));
            }
            this.getOrCreateConsumer().subscribe((Collection)topics, (ConsumerRebalanceListener)this);
            return this.getProcessingMulti(consumerRecordFunction);
        });
    }

    private <T> Multi<T> getProcessingMulti(Function<ConsumerRecords<K, V>, Multi<T>> consumerRecordFunction) {
        return this.poll().repeat().indefinitely().onItem().transformToMulti(consumerRecordFunction).concatenate().runSubscriptionOn((Executor)this.getOrCreateExecutor()).onTermination().invoke((throwable, cancelled) -> this.onTermination.accept(this.kafkaConsumer, (Throwable)throwable));
    }

    private Multi<ConsumerRecord<K, V>> getConsumeMulti() {
        return this.poll().repeat().indefinitely().onItem().transformToMulti(cr -> Multi.createFrom().items(StreamSupport.stream(cr.spliterator(), false))).concatenate().runSubscriptionOn((Executor)this.getOrCreateExecutor()).plug(m -> {
            Multi multi = m;
            if (this.commitAsyncFunction != null) {
                multi = multi.invoke(cr -> {
                    Map<TopicPartition, OffsetAndMetadata> offsetMap = this.commitAsyncFunction.apply((ConsumerRecord<K, V>)cr);
                    if (!offsetMap.isEmpty()) {
                        this.getOrCreateConsumer().commitAsync(offsetMap, this.commitAsyncCallback);
                    }
                });
            }
            if (this.commitSyncFunction != null) {
                multi = multi.onItem().transformToUniAndConcatenate(cr -> {
                    Map<TopicPartition, OffsetAndMetadata> offsetMap = this.commitSyncFunction.apply((ConsumerRecord<K, V>)cr);
                    if (!offsetMap.isEmpty()) {
                        return Uni.createFrom().item(cr).invoke(() -> this.getOrCreateConsumer().commitSync(offsetMap));
                    }
                    return Uni.createFrom().item(cr);
                });
            }
            return multi.onTermination().invoke((throwable, cancelled) -> this.onTermination.accept(this.kafkaConsumer, (Throwable)throwable));
        });
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> offsets, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
        return new ConsumerTask(Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure((Throwable)new IllegalStateException("Consumer already in use"));
            }
            this.getOrCreateConsumer().unsubscribe();
            Set topicPartitions = offsets.keySet();
            this.getOrCreateConsumer().assign(topicPartitions);
            for (Map.Entry entry : offsets.entrySet()) {
                TopicPartition tp = (TopicPartition)entry.getKey();
                long offset = (Long)entry.getValue();
                if (offset > -1L) {
                    this.getOrCreateConsumer().seek(tp, offset);
                    continue;
                }
                this.getOrCreateConsumer().seekToEnd(Collections.singleton(tp));
                long lastOffset = this.getOrCreateConsumer().position(tp);
                this.getOrCreateConsumer().seek(tp, lastOffset + offset);
            }
            return this.getConsumeMulti().plug(plugFunction);
        }));
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> offsets) {
        return this.fromOffsets(offsets, Function.identity());
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> offsets, Duration during) {
        return this.fromOffsets(offsets, RecordQualifiers.until(during));
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> offsets, long numberOfRecords) {
        return this.fromOffsets(offsets, RecordQualifiers.until(numberOfRecords));
    }

    public ConsumerTask<K, V> fromTopics(Set<String> topics, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
        return new ConsumerTask(Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure((Throwable)new IllegalStateException("Consumer already in use"));
            }
            this.getOrCreateConsumer().subscribe((Collection)topics, (ConsumerRebalanceListener)this);
            return this.getConsumeMulti().plug(plugFunction);
        }));
    }

    public ConsumerTask<K, V> fromTopics(Pattern topicsPattern, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
        return new ConsumerTask(Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure((Throwable)new IllegalStateException("Consumer already in use"));
            }
            this.getOrCreateConsumer().subscribe(topicsPattern, (ConsumerRebalanceListener)this);
            return this.getConsumeMulti().plug(plugFunction);
        }));
    }

    public ConsumerTask<K, V> fromTopics(String topic, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
        return this.fromTopics(Collections.singleton(topic), plugFunction);
    }

    public ConsumerTask<K, V> fromTopics(String ... topics) {
        return this.fromTopics(new HashSet<String>(Arrays.asList(topics)));
    }

    public ConsumerTask<K, V> fromTopics(Pattern topicsPattern) {
        return this.fromTopics(topicsPattern, Function.identity());
    }

    public ConsumerTask<K, V> fromTopics(Set<String> topics) {
        return this.fromTopics(topics, Function.identity());
    }

    public ConsumerTask<K, V> fromTopics(Set<String> topics, long numberOfRecords) {
        return this.fromTopics(topics, RecordQualifiers.until(numberOfRecords));
    }

    public ConsumerTask<K, V> fromTopics(Pattern topicsPattern, long numberOfRecords) {
        return this.fromTopics(topicsPattern, RecordQualifiers.until(numberOfRecords));
    }

    public ConsumerTask<K, V> fromTopics(Set<String> topics, Duration during) {
        return this.fromTopics(topics, RecordQualifiers.until(during));
    }

    public ConsumerTask<K, V> fromTopics(Pattern topicsPattern, Duration during) {
        return this.fromTopics(topicsPattern, RecordQualifiers.until(during));
    }

    public ConsumerTask<K, V> fromTopics(String topic, long numberOfRecords) {
        return this.fromTopics(Collections.singleton(topic), RecordQualifiers.until(numberOfRecords));
    }

    public ConsumerTask<K, V> fromTopics(String topic, Duration during) {
        return this.fromTopics(Collections.singleton(topic), RecordQualifiers.until(during));
    }

    public ConsumerTask<K, V> fromTopics(String topic, long numberOfRecords, Duration during) {
        return this.fromTopics(Collections.singleton(topic), RecordQualifiers.until(numberOfRecords, during, null));
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> task, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
        Map<TopicPartition, Long> offsets = task.latestOffsets().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> (Long)e.getValue() + 1L));
        return this.fromOffsets(offsets, plugFunction);
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> task) {
        return this.fromPrevious(task, Function.identity());
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> task, long numberOfRecords) {
        return this.fromPrevious(task, RecordQualifiers.until(numberOfRecords));
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> task, Duration during) {
        return this.fromPrevious(task, RecordQualifiers.until(during));
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> task, long numberOfRecords, Duration during) {
        return this.fromPrevious(task, RecordQualifiers.until(numberOfRecords, during, null));
    }

    public void commitAndClose(Map<TopicPartition, OffsetAndMetadata> offsets) {
        Uni.createFrom().voidItem().onItem().invoke(() -> this.getOrCreateConsumer().commitSync(offsets)).onItem().invoke(this::close).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public void commitAndClose() {
        Uni.createFrom().voidItem().onItem().invoke(() -> this.getOrCreateConsumer().commitSync()).onItem().invoke(this::close).runSubscriptionOn((Executor)this.getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }
}

