/*
 * Decompiled with CFR 0.152.
 */
package dev.snowdrop.vertx.http.common;

import io.vertx.core.streams.ReadStream;
import java.util.Objects;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;

public class ReadStreamFluxBuilder<T, R> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private ReadStream<T> readStream;
    private Function<T, R> dataConverter;

    public ReadStreamFluxBuilder<T, R> readStream(ReadStream<T> readStream) {
        this.readStream = readStream;
        return this;
    }

    public ReadStreamFluxBuilder<T, R> dataConverter(Function<T, R> dataConverter) {
        this.dataConverter = dataConverter;
        return this;
    }

    public Flux<R> build() {
        Objects.requireNonNull(this.readStream, "Read stream is required");
        Objects.requireNonNull(this.dataConverter, "Data converter is required");
        this.readStream.pause();
        return Flux.create(sink -> {
            String logPrefix = "[" + ObjectUtils.getIdentityHexString(this.readStream) + "] ";
            this.readStream.handler(data -> {
                this.logger.debug("{}Received '{}'", (Object)logPrefix, data);
                sink.next(this.dataConverter.apply(data));
            }).exceptionHandler(throwable -> {
                this.logger.debug("{}Received exception '{}'", (Object)logPrefix, (Object)throwable.toString());
                sink.error(throwable);
            }).endHandler(v -> {
                this.logger.debug("{}Read stream ended", (Object)logPrefix);
                sink.complete();
            });
            sink.onRequest(i -> {
                this.logger.debug("{} Fetching '{}' entries from a read stream", (Object)logPrefix, (Object)i);
                this.readStream.fetch(i);
            });
        });
    }
}

