/*
 * Decompiled with CFR 0.152.
 */
package dev.snowdrop.vertx.kafka;

import dev.snowdrop.vertx.kafka.ConsumerRecord;
import dev.snowdrop.vertx.kafka.KafkaConsumer;
import dev.snowdrop.vertx.kafka.Partition;
import dev.snowdrop.vertx.kafka.PartitionInfo;
import dev.snowdrop.vertx.kafka.SnowdropConsumerRecord;
import dev.snowdrop.vertx.kafka.SnowdropPartition;
import dev.snowdrop.vertx.kafka.SnowdropPartitionInfo;
import io.vertx.core.streams.ReadStream;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.consumer.OffsetAndTimestamp;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class SnowdropKafkaConsumer<K, V>
implements KafkaConsumer<K, V> {
    private final io.vertx.axle.kafka.client.consumer.KafkaConsumer<K, V> delegate;

    SnowdropKafkaConsumer(io.vertx.axle.kafka.client.consumer.KafkaConsumer<K, V> delegate) {
        this.delegate = delegate;
    }

    @Override
    public Mono<Void> subscribe(String topic) {
        if (StringUtils.isEmpty((Object)topic)) {
            throw new IllegalArgumentException("Topic cannot be empty");
        }
        return Mono.fromCompletionStage(() -> this.delegate.subscribe(topic));
    }

    @Override
    public Mono<Void> subscribe(Collection<String> topics) {
        Objects.requireNonNull(topics, "Topics cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.subscribe(new HashSet(topics)));
    }

    @Override
    public Mono<Void> assign(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.assign(this.toVertxTopicPartition(partition)));
    }

    @Override
    public Mono<Void> assign(Collection<Partition> partitions) {
        Objects.requireNonNull(partitions, "Partitions cannot be null");
        Set vertxPartitions = partitions.stream().map(this::toVertxTopicPartition).collect(Collectors.toSet());
        return Mono.fromCompletionStage(() -> this.delegate.assign(vertxPartitions));
    }

    @Override
    public Mono<Void> unsubscribe() {
        return Mono.fromCompletionStage(() -> this.delegate.unsubscribe());
    }

    @Override
    public Flux<String> subscriptions() {
        return Mono.fromCompletionStage(() -> this.delegate.subscription()).flatMapMany(Flux::fromIterable);
    }

    @Override
    public Flux<Partition> assignments() {
        return Mono.fromCompletionStage(() -> this.delegate.assignment()).flatMapMany(Flux::fromIterable).map(SnowdropPartition::new);
    }

    @Override
    public Flux<PartitionInfo> partitionsFor(String topic) {
        if (StringUtils.isEmpty((Object)topic)) {
            throw new IllegalArgumentException("Topic cannot be empty");
        }
        return Mono.fromCompletionStage(() -> this.delegate.partitionsFor(topic)).flatMapMany(Flux::fromIterable).map(SnowdropPartitionInfo::new);
    }

    @Override
    public void partitionsRevokedHandler(Consumer<Set<Partition>> handler) {
        Objects.requireNonNull(handler, "Handler cannot be null");
        this.delegate.partitionsRevokedHandler(axleTopicPartitions -> {
            Set partitions = axleTopicPartitions.stream().map(SnowdropPartition::new).collect(Collectors.toSet());
            handler.accept(partitions);
        });
    }

    @Override
    public void partitionsAssignedHandler(Consumer<Set<Partition>> handler) {
        Objects.requireNonNull(handler, "Handler cannot be null");
        this.delegate.partitionsAssignedHandler(axleTopicPartitions -> {
            Set partitions = axleTopicPartitions.stream().map(SnowdropPartition::new).collect(Collectors.toSet());
            handler.accept(partitions);
        });
    }

    @Override
    public Mono<Void> seek(Partition partition, long offset) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        if (offset < 0L) {
            throw new IllegalArgumentException("Offset cannot be negative");
        }
        return Mono.fromCompletionStage(() -> this.delegate.seek(this.toVertxTopicPartition(partition), offset));
    }

    @Override
    public Mono<Void> seekToBeginning(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.seekToBeginning(this.toVertxTopicPartition(partition)));
    }

    @Override
    public Mono<Void> seekToBeginning(Collection<Partition> partitions) {
        Objects.requireNonNull(partitions, "Partitions cannot be null");
        Set vertxPartitions = partitions.stream().map(this::toVertxTopicPartition).collect(Collectors.toSet());
        return Mono.fromCompletionStage(() -> this.delegate.seekToBeginning(vertxPartitions));
    }

    @Override
    public Mono<Void> seekToEnd(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.seekToEnd(this.toVertxTopicPartition(partition)));
    }

    @Override
    public Mono<Void> seekToEnd(Collection<Partition> partitions) {
        Objects.requireNonNull(partitions, "Partitions cannot be null");
        Set vertxPartitions = partitions.stream().map(this::toVertxTopicPartition).collect(Collectors.toSet());
        return Mono.fromCompletionStage(() -> this.delegate.seekToEnd(vertxPartitions));
    }

    @Override
    public Mono<Long> position(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.position(this.toVertxTopicPartition(partition)));
    }

    @Override
    public Mono<Long> committed(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.committed(this.toVertxTopicPartition(partition))).map(OffsetAndMetadata::getOffset);
    }

    @Override
    public Mono<Long> beginningOffset(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.beginningOffsets(this.toVertxTopicPartition(partition)));
    }

    @Override
    public Mono<Long> endOffset(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.endOffsets(this.toVertxTopicPartition(partition)));
    }

    @Override
    public Mono<Long> timeOffset(Partition partition, long timestamp) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        if (timestamp < 0L) {
            throw new IllegalArgumentException("Timestamp cannot be negative");
        }
        return Mono.fromCompletionStage(() -> this.delegate.offsetsForTimes(this.toVertxTopicPartition(partition), Long.valueOf(timestamp))).map(OffsetAndTimestamp::getOffset);
    }

    @Override
    public Mono<Void> commit() {
        return Mono.fromCompletionStage(() -> this.delegate.commit());
    }

    @Override
    public Mono<Void> close() {
        return Mono.fromCompletionStage(() -> this.delegate.close());
    }

    @Override
    public <T> Mono<T> doOnVertxConsumer(Function<io.vertx.kafka.client.consumer.KafkaConsumer<K, V>, T> function) {
        Objects.requireNonNull(function, "Function cannot be null");
        return Mono.create(sink -> {
            try {
                Object result = function.apply(this.delegate.getDelegate());
                sink.success(result);
            }
            catch (Throwable t) {
                sink.error(t);
            }
        });
    }

    public Mono<ConsumerRecord<K, V>> mono() {
        return Mono.from((Publisher)this.delegate.toPublisher()).map(SnowdropConsumerRecord::new);
    }

    public Flux<ConsumerRecord<K, V>> flux() {
        return Flux.from((Publisher)this.delegate.toPublisher()).map(SnowdropConsumerRecord::new);
    }

    public ReadStream vertxReadStream() {
        return this.delegate.getDelegate();
    }

    private TopicPartition toVertxTopicPartition(Partition partition) {
        return new TopicPartition(partition.topic(), partition.partition());
    }
}

