/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.providers;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.providers.AbstractMediator;
import io.smallrye.reactive.messaging.providers.helpers.ClassUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.providers.i18n.ProviderMessages;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;

public class ProcessorMediator
extends AbstractMediator {
    private Function<Multi<? extends Message<?>>, Multi<? extends Message<?>>> mapper;
    private Multi<? extends Message<?>> publisher;

    public ProcessorMediator(MediatorConfiguration configuration) {
        super(configuration);
        if (configuration.shape() != Shape.PROCESSOR) {
            throw ProviderExceptions.ex.illegalArgumentForProcessorShape(configuration.shape());
        }
        if (configuration.production() == MediatorConfiguration.Production.STREAM_OF_MESSAGE && configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD) {
            throw ProviderExceptions.ex.definitionProduceMessageStreamAndConsumePayload(configuration.methodAsString());
        }
        if (configuration.production() == MediatorConfiguration.Production.STREAM_OF_PAYLOAD && configuration.consumption() == MediatorConfiguration.Consumption.MESSAGE) {
            throw ProviderExceptions.ex.definitionProducePayloadStreamAndConsumeMessage(configuration.methodAsString());
        }
    }

    @Override
    public void connectToUpstream(Multi<? extends Message<?>> publisher) {
        assert (this.mapper != null);
        this.publisher = this.decorate(publisher.plug(m -> this.mapper.apply(this.convert((Multi<? extends Message<?>>)m))));
    }

    @Override
    public Multi<? extends Message<?>> getStream() {
        return Objects.requireNonNull(this.publisher);
    }

    @Override
    public boolean isConnected() {
        return this.publisher != null;
    }

    @Override
    protected <T> Uni<T> invokeBlocking(Object ... args) {
        return super.invokeBlocking(args);
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.production()) {
            case STREAM_OF_MESSAGE: {
                if (this.isReturningAProcessorOrAProcessorBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAProcessorBuilderOfMessages();
                        break;
                    }
                    this.processMethodReturningAProcessorOfMessages();
                    break;
                }
                if (this.isReturningAPublisherOrAPublisherBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages();
                        break;
                    }
                    this.processMethodReturningAPublisherOfMessageAndConsumingMessages();
                    break;
                }
                throw ProviderExceptions.ex.illegalArgumentForInitialize(this.configuration.methodAsString());
            }
            case STREAM_OF_PAYLOAD: {
                if (this.isReturningAProcessorOrAProcessorBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAProcessorBuilderOfPayloads();
                        break;
                    }
                    this.processMethodReturningAProcessorOfPayloads();
                    break;
                }
                if (this.isReturningAPublisherOrAPublisherBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads();
                        break;
                    }
                    this.processMethodReturningAPublisherOfPayloadsAndConsumingPayloads();
                    break;
                }
                throw ProviderExceptions.ex.illegalArgumentForInitialize(this.configuration.methodAsString());
            }
            case INDIVIDUAL_MESSAGE: {
                this.processMethodReturningIndividualMessageAndConsumingIndividualItem();
                break;
            }
            case INDIVIDUAL_PAYLOAD: {
                this.processMethodReturningIndividualPayloadAndConsumingIndividualItem();
                break;
            }
            case COMPLETION_STAGE_OF_MESSAGE: {
                this.processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem();
                break;
            }
            case COMPLETION_STAGE_OF_PAYLOAD: {
                this.processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem();
                break;
            }
            case UNI_OF_MESSAGE: {
                this.processMethodReturningAUniOfMessageAndConsumingIndividualItem();
                break;
            }
            case UNI_OF_PAYLOAD: {
                this.processMethodReturningAUniOfPayloadAndConsumingIndividualItem();
                break;
            }
            default: {
                throw ProviderExceptions.ex.illegalArgumentForUnexpectedProduction(this.configuration.production());
            }
        }
    }

    private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages() {
        this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(msg -> ((PublisherBuilder)this.invoke(msg)).buildRs());
    }

    private void processMethodReturningAPublisherOfMessageAndConsumingMessages() {
        this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(msg -> (Publisher)this.invoke(msg));
    }

    private void processMethodReturningAProcessorBuilderOfMessages() {
        ProcessorBuilder builder = Objects.requireNonNull((ProcessorBuilder)this.invoke(new Object[0]), ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.mapper = upstream -> {
            Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return Multi.createFrom().publisher(ReactiveStreams.fromPublisher(multi).via(builder).buildRs());
        };
    }

    private void processMethodReturningAProcessorOfMessages() {
        Processor result2 = Objects.requireNonNull((Processor)this.invoke(new Object[0]), ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.mapper = upstream -> {
            Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return Multi.createFrom().publisher(ReactiveStreams.fromPublisher(multi).via(result2).buildRs());
        };
    }

    private void processMethodReturningAProcessorOfPayloads() {
        Processor returnedProcessor = (Processor)this.invoke(new Object[0]);
        Objects.requireNonNull(returnedProcessor, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.mapper = upstream -> {
            Multi<Object> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transform(Message::getPayload);
            return Multi.createFrom().publisher(ReactiveStreams.fromPublisher(multi).via(returnedProcessor).buildRs()).onItem().transform(Message::of);
        };
    }

    private void processMethodReturningAProcessorBuilderOfPayloads() {
        ProcessorBuilder returnedProcessorBuilder = (ProcessorBuilder)this.invoke(new Object[0]);
        Objects.requireNonNull(returnedProcessorBuilder, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.mapper = upstream -> {
            Multi<Object> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transform(Message::getPayload);
            return Multi.createFrom().publisher(ReactiveStreams.fromPublisher(multi).via(returnedProcessorBuilder).buildRs()).onItem().transform(Message::of);
        };
    }

    private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() {
        this.mapper = upstream -> {
            Multi<Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return multi.onItem().transformToMultiAndConcatenate(message -> {
                PublisherBuilder pb = (PublisherBuilder)this.invoke(message.getPayload());
                return Multi.createFrom().publisher(pb.buildRs()).onItem().transform(payload -> Message.of(payload, message.getMetadata()));
            });
        };
    }

    private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
        this.mapper = upstream -> {
            Multi<Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return multi.onItem().transformToMultiAndConcatenate(message -> {
                Publisher pub = (Publisher)this.invoke(message.getPayload());
                return Multi.createFrom().publisher(pub).onItem().transform(payload -> Message.of(payload, message.getMetadata()));
            });
        };
    }

    private void processMethodReturningIndividualMessageAndConsumingIndividualItem() {
        this.mapper = this.configuration.isBlocking() ? (this.configuration.isBlockingExecutionOrdered() ? upstream -> {
            Multi<Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return multi.onItem().transformToMultiAndConcatenate(message -> this.invokeBlocking(this.withPayloadOrMessage((Message<?>)message)).onItemOrFailure().transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message)o, (Throwable)t)).onItem().transformToMulti(this::handleSkip));
        } : upstream -> {
            Multi<Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return multi.onItem().transformToMultiAndMerge(message -> this.invokeBlocking(this.withPayloadOrMessage((Message<?>)message)).onItemOrFailure().transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message)o, (Throwable)t)).onItem().transformToMulti(this::handleSkip));
        }) : upstream -> {
            Multi<Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            return multi.onItem().transformToMultiAndConcatenate(message -> Uni.createFrom().item(() -> this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItem().transform(o -> (Message)o).onItemOrFailure().transformToUni(this::handlePostInvocationWithMessage).onItem().transformToMulti(this::handleSkip));
        };
    }

    private boolean isPostAck() {
        return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING;
    }

    private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() {
        this.mapper = this.configuration.isBlocking() ? (this.configuration.isBlockingExecutionOrdered() ? upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(message -> this.invokeBlocking(this.withPayloadOrMessage((Message<?>)message)).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocation((Message<?>)message, r, (Throwable)f)).onItem().transformToMulti(this::handleSkip)) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndMerge(message -> this.invokeBlocking(this.withPayloadOrMessage((Message<?>)message)).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocation((Message<?>)message, r, (Throwable)f)).onItem().transformToMulti(this::handleSkip))) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(message -> Uni.createFrom().item(() -> this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocation((Message<?>)message, r, (Throwable)f)).onItem().transformToMulti(this::handleSkip));
    }

    private Publisher<? extends Message<Object>> handleSkip(Message<Object> m) {
        if (m == null) {
            return Multi.createFrom().empty();
        }
        return Multi.createFrom().item(m);
    }

    private Uni<? extends Message<Object>> handlePostInvocation(Message<?> message, Object res, Throwable fail) {
        if (fail != null) {
            if (this.isPostAck()) {
                return Uni.createFrom().completionStage(message.nack(fail).thenApply(x -> null));
            }
            throw ProviderExceptions.ex.processingException(this.getMethodAsString(), fail);
        }
        if (res != null) {
            if (this.isPostAck()) {
                return Uni.createFrom().item(message.withPayload(res));
            }
            return Uni.createFrom().item(Message.of(res, message.getMetadata()));
        }
        if (this.isPostAck()) {
            return Uni.createFrom().completionStage(message.ack().thenApply(x -> null));
        }
        return Uni.createFrom().nullItem();
    }

    private Uni<? extends Message<Object>> handlePostInvocationWithMessage(Message<?> res, Throwable fail) {
        if (fail != null) {
            throw ProviderExceptions.ex.processingException(this.getMethodAsString(), fail);
        }
        if (res != null) {
            return Uni.createFrom().item(res);
        }
        return Uni.createFrom().nullItem();
    }

    private void processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem() {
        this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(message -> Uni.createFrom().completionStage((CompletionStage)this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocationWithMessage((Message)r, (Throwable)f)).onItem().transformToMulti(this::handleSkip));
    }

    private void processMethodReturningAUniOfMessageAndConsumingIndividualItem() {
        this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(message -> Uni.createFrom().deferred(() -> (Uni)this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocationWithMessage((Message)r, (Throwable)f)).onItem().transformToMulti(this::handleSkip));
    }

    private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem() {
        this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(message -> Uni.createFrom().completionStage(() -> (CompletionStage)this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocation((Message<?>)message, r, (Throwable)f)).onItem().transformToMulti(this::handleSkip));
    }

    private void processMethodReturningAUniOfPayloadAndConsumingIndividualItem() {
        this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToMultiAndConcatenate(message -> Uni.createFrom().deferred(() -> (Uni)this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((r, f) -> this.handlePostInvocation((Message<?>)message, r, (Throwable)f)).onItem().transformToMulti(this::handleSkip));
    }

    private boolean isReturningAPublisherOrAPublisherBuilder() {
        Class<?> returnType = this.configuration.getReturnType();
        return ClassUtils.isAssignable(returnType, Publisher.class) || ClassUtils.isAssignable(returnType, PublisherBuilder.class);
    }

    private boolean isReturningAProcessorOrAProcessorBuilder() {
        Class<?> returnType = this.configuration.getReturnType();
        return ClassUtils.isAssignable(returnType, Processor.class) || ClassUtils.isAssignable(returnType, ProcessorBuilder.class);
    }

    private Object withPayloadOrMessage(Message<?> message) {
        return this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD ? message.getPayload() : message;
    }
}

