package io.smallrye.mutiny.streams;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.streams.operators.Operator;
import io.smallrye.mutiny.streams.operators.ProcessorOperator;
import io.smallrye.mutiny.streams.operators.PublisherOperator;
import io.smallrye.mutiny.streams.operators.TerminalOperator;
import io.smallrye.mutiny.streams.spi.Transformer;
import io.smallrye.mutiny.streams.stages.Stages;
import io.smallrye.mutiny.streams.utils.ConnectableProcessor;
import io.smallrye.mutiny.streams.utils.DefaultSubscriberWithCompletionStage;
import io.smallrye.mutiny.streams.utils.WrappedProcessor;
import java.util.concurrent.CompletionStage;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.spi.Graph;
import org.eclipse.microprofile.reactive.streams.operators.spi.ReactiveStreamsEngine;
import org.eclipse.microprofile.reactive.streams.operators.spi.Stage;
import org.eclipse.microprofile.reactive.streams.operators.spi.SubscriberWithCompletionStage;
import org.eclipse.microprofile.reactive.streams.operators.spi.UnsupportedStageException;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/smallrye/mutiny/streams/Engine.class */
public class Engine implements ReactiveStreamsEngine {
    @Override // org.eclipse.microprofile.reactive.streams.operators.spi.ReactiveStreamsEngine
    public <T> Publisher<T> buildPublisher(Graph graph) {
        Multi multi = null;
        for (Stage stage : graph.getStages()) {
            Operator lookup = Stages.lookup(stage);
            if (multi == null) {
                if (!(lookup instanceof PublisherOperator)) {
                    throw new IllegalArgumentException("Expecting a publisher stage, got a " + stage);
                }
                multi = createPublisher(stage, (PublisherOperator) lookup);
            } else {
                if (!(lookup instanceof ProcessorOperator)) {
                    throw new IllegalArgumentException("Expecting a processor stage, got a " + stage);
                }
                multi = applyProcessors(multi, stage, (ProcessorOperator) lookup);
            }
        }
        return AdaptersToReactiveStreams.publisher(multi);
    }

    @Override // org.eclipse.microprofile.reactive.streams.operators.spi.ReactiveStreamsEngine
    public <T, R> SubscriberWithCompletionStage<T, R> buildSubscriber(Graph graph) {
        ConnectableProcessor connectableProcessor = new ConnectableProcessor();
        Multi<T> publisher = Multi.createFrom().publisher(AdaptersToFlow.publisher(connectableProcessor));
        for (Stage stage : graph.getStages()) {
            Operator lookup = Stages.lookup(stage);
            if (!(lookup instanceof ProcessorOperator)) {
                if (lookup instanceof TerminalOperator) {
                    return new DefaultSubscriberWithCompletionStage(connectableProcessor, applySubscriber(Transformer.apply(publisher), stage, (TerminalOperator) lookup));
                }
                throw new UnsupportedStageException(stage);
            }
            publisher = applyProcessors(publisher, stage, (ProcessorOperator) lookup);
        }
        throw new IllegalArgumentException("The graph does not have a valid final stage");
    }

    @Override // org.eclipse.microprofile.reactive.streams.operators.spi.ReactiveStreamsEngine
    public <T, R> Processor<T, R> buildProcessor(Graph graph) {
        ConnectableProcessor connectableProcessor = new ConnectableProcessor();
        Multi<T> publisher = Multi.createFrom().publisher(AdaptersToFlow.publisher(connectableProcessor));
        for (Stage stage : graph.getStages()) {
            publisher = applyProcessors(publisher, stage, (ProcessorOperator) Stages.lookup(stage));
        }
        return new WrappedProcessor(connectableProcessor, AdaptersToReactiveStreams.publisher(publisher));
    }

    @Override // org.eclipse.microprofile.reactive.streams.operators.spi.ReactiveStreamsEngine
    public <T> CompletionStage<T> buildCompletion(Graph graph) {
        Multi<T> multi = null;
        for (Stage stage : graph.getStages()) {
            Operator lookup = Stages.lookup(stage);
            if (lookup instanceof PublisherOperator) {
                multi = createPublisher(stage, (PublisherOperator) lookup);
            } else {
                if (!(lookup instanceof ProcessorOperator)) {
                    return applySubscriber(multi, stage, (TerminalOperator) lookup);
                }
                multi = applyProcessors(multi, stage, (ProcessorOperator) lookup);
            }
        }
        throw new IllegalArgumentException("Graph did not have terminal stage");
    }

    private <I, O> Multi<O> applyProcessors(Multi<I> multi, Stage stage, ProcessorOperator processorOperator) {
        return Transformer.apply(processorOperator.create(this, stage).apply((Multi) multi));
    }

    private <T, R> CompletionStage<R> applySubscriber(Multi<T> multi, Stage stage, TerminalOperator terminalOperator) {
        return terminalOperator.create(this, stage).apply(Transformer.apply(multi));
    }

    private <O> Multi<O> createPublisher(Stage stage, PublisherOperator publisherOperator) {
        return Transformer.apply(publisherOperator.create(this, stage).get());
    }
}
