/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.helpers.ClassUtils;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.i18n.ProviderMessages;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;

public class ProcessorMediator
extends AbstractMediator {
    private Processor<Message<?>, ? extends Message<?>> processor;
    private PublisherBuilder<? extends Message<?>> publisher;

    public ProcessorMediator(MediatorConfiguration configuration) {
        super(configuration);
        if (configuration.shape() != Shape.PROCESSOR) {
            throw ProviderExceptions.ex.illegalArgumentForProcessorShape(configuration.shape());
        }
    }

    @Override
    public void connectToUpstream(PublisherBuilder<? extends Message<?>> publisher) {
        assert (this.processor != null);
        this.publisher = this.decorate((PublisherBuilder<? extends Message<?>>)this.convert(publisher).via(this.processor));
    }

    @Override
    public PublisherBuilder<? extends Message<?>> getStream() {
        return Objects.requireNonNull(this.publisher);
    }

    @Override
    public boolean isConnected() {
        return this.publisher != null;
    }

    @Override
    protected <T> Uni<T> invokeBlocking(Object ... args) {
        return super.invokeBlocking(args);
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.production()) {
            case STREAM_OF_MESSAGE: {
                if (this.isReturningAProcessorOrAProcessorBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAProcessorBuilderOfMessages();
                        break;
                    }
                    this.processMethodReturningAProcessorOfMessages();
                    break;
                }
                if (this.isReturningAPublisherOrAPublisherBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages();
                        break;
                    }
                    this.processMethodReturningAPublisherOfMessageAndConsumingMessages();
                    break;
                }
                throw ProviderExceptions.ex.illegalArgumentForInitialize(this.configuration.methodAsString());
            }
            case STREAM_OF_PAYLOAD: {
                if (this.isReturningAProcessorOrAProcessorBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAProcessorBuilderOfPayloads();
                        break;
                    }
                    this.processMethodReturningAProcessorOfPayloads();
                    break;
                }
                if (this.isReturningAPublisherOrAPublisherBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        this.processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads();
                        break;
                    }
                    this.processMethodReturningAPublisherOfPayloadsAndConsumingPayloads();
                    break;
                }
                throw ProviderExceptions.ex.illegalArgumentForInitialize(this.configuration.methodAsString());
            }
            case INDIVIDUAL_MESSAGE: {
                this.processMethodReturningIndividualMessageAndConsumingIndividualItem();
                break;
            }
            case INDIVIDUAL_PAYLOAD: {
                this.processMethodReturningIndividualPayloadAndConsumingIndividualItem();
                break;
            }
            case COMPLETION_STAGE_OF_MESSAGE: {
                this.processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem();
                break;
            }
            case COMPLETION_STAGE_OF_PAYLOAD: {
                this.processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem();
                break;
            }
            case UNI_OF_MESSAGE: {
                this.processMethodReturningAUniOfMessageAndConsumingIndividualItem();
                break;
            }
            case UNI_OF_PAYLOAD: {
                this.processMethodReturningAUniOfPayloadAndConsumingIndividualItem();
                break;
            }
            default: {
                throw ProviderExceptions.ex.illegalArgumentForUnexpectedProduction(this.configuration.production());
            }
        }
    }

    private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).map(msg -> (PublisherBuilder)this.invoke(msg)).flatMap(Function.identity()).buildRs();
    }

    private void processMethodReturningAPublisherOfMessageAndConsumingMessages() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).map(msg -> (Publisher)this.invoke(msg)).flatMapRsPublisher(Function.identity()).buildRs();
    }

    private void processMethodReturningAProcessorBuilderOfMessages() {
        ProcessorBuilder builder = (ProcessorBuilder)Objects.requireNonNull(this.invoke(new Object[0]), ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).via(builder).buildRs();
    }

    private void processMethodReturningAProcessorOfMessages() {
        Processor result = (Processor)Objects.requireNonNull(this.invoke(new Object[0]), ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(msg -> {
            if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.PRE_PROCESSING) {
                return msg.ack().thenApply(x -> msg);
            }
            return CompletableFuture.completedFuture(msg);
        }).via(result).buildRs();
    }

    private void processMethodReturningAProcessorOfPayloads() {
        Processor returnedProcessor = (Processor)this.invoke(new Object[0]);
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).map(Message::getPayload).via(returnedProcessor).map(Message::of).buildRs();
    }

    private void processMethodReturningAProcessorBuilderOfPayloads() {
        ProcessorBuilder returnedProcessorBuilder = (ProcessorBuilder)this.invoke(new Object[0]);
        Objects.requireNonNull(returnedProcessorBuilder, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).map(Message::getPayload).via(returnedProcessorBuilder).map(Message::of).buildRs();
    }

    private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).flatMap(message -> {
            PublisherBuilder pb = (PublisherBuilder)this.invoke(message.getPayload());
            return pb.map(payload -> Message.of(payload, message.getMetadata()));
        }).buildRs();
    }

    private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).flatMap(message -> {
            Publisher pub = (Publisher)this.invoke(message.getPayload());
            return ReactiveStreams.fromPublisher(pub).map(payload -> Message.of(payload, message.getMetadata()));
        }).buildRs();
    }

    private void processMethodReturningIndividualMessageAndConsumingIndividualItem() {
        this.processor = this.configuration.isBlocking() ? ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> this.invokeBlocking(this.withPayloadOrMessage((Message<?>)message))).onItem().transform(x -> (Message)x).onItemOrFailure().transformToUni(this::handlePostInvocationWithMessage).onItem().transformToMulti(this::handleSkip)).buildRs() : ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transform(x -> this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItem().transform(x -> (Message)x).onItemOrFailure().transformToUni(this::handlePostInvocationWithMessage).onItem().transformToMulti(this::handleSkip)).buildRs();
    }

    private boolean isPostAck() {
        return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING;
    }

    private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() {
        this.processor = this.configuration.isBlocking() ? ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> this.invokeBlocking(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((res, fail) -> this.handlePostInvocation((Message<?>)message, res, (Throwable)fail)).onItem().transformToMulti(this::handleSkip)).buildRs() : ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transform(input -> this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((res, fail) -> this.handlePostInvocation((Message<?>)message, res, (Throwable)fail)).onItem().transformToMulti(this::handleSkip)).buildRs();
    }

    private Publisher<? extends Message<Object>> handleSkip(Message<Object> m) {
        if (m == null) {
            return Multi.createFrom().empty();
        }
        return Multi.createFrom().item(m);
    }

    private Uni<? extends Message<Object>> handlePostInvocation(Message<?> message, Object res, Throwable fail) {
        if (fail != null) {
            if (this.isPostAck()) {
                return Uni.createFrom().completionStage(message.nack(fail).thenApply(x -> null));
            }
            throw ProviderExceptions.ex.processingException(this.getMethodAsString(), fail);
        }
        if (res != null) {
            if (this.isPostAck()) {
                return Uni.createFrom().item(message.withPayload(res));
            }
            return Uni.createFrom().item(Message.of(res, message.getMetadata()));
        }
        if (this.isPostAck()) {
            return Uni.createFrom().completionStage(message.ack().thenApply(x -> null));
        }
        return Uni.createFrom().nullItem();
    }

    private Uni<? extends Message<Object>> handlePostInvocationWithMessage(Message<?> res, Throwable fail) {
        if (fail != null) {
            throw ProviderExceptions.ex.processingException(this.getMethodAsString(), fail);
        }
        if (res != null) {
            return Uni.createFrom().item(res);
        }
        return Uni.createFrom().nullItem();
    }

    private void processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem() {
        this.processor = ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> Uni.createFrom().completionStage((CompletionStage)this.invoke(this.withPayloadOrMessage((Message<?>)message)))).onItemOrFailure().transformToUni((res, fail) -> this.handlePostInvocationWithMessage((Message)res, (Throwable)fail)).onItem().transformToMulti(this::handleSkip)).buildRs();
    }

    private void processMethodReturningAUniOfMessageAndConsumingIndividualItem() {
        this.processor = ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> (Uni)this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((res, fail) -> this.handlePostInvocationWithMessage((Message)res, (Throwable)fail)).onItem().transformToMulti(this::handleSkip)).buildRs();
    }

    private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem() {
        this.processor = ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> Uni.createFrom().completionStage((CompletionStage)this.invoke(this.withPayloadOrMessage((Message<?>)message)))).onItemOrFailure().transformToUni((res, fail) -> this.handlePostInvocation((Message<?>)message, res, (Throwable)fail)).onItem().transformToMulti(this::handleSkip)).buildRs();
    }

    private void processMethodReturningAUniOfPayloadAndConsumingIndividualItem() {
        this.processor = ReactiveStreams.builder().flatMapRsPublisher(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> (Uni)this.invoke(this.withPayloadOrMessage((Message<?>)message))).onItemOrFailure().transformToUni((res, fail) -> this.handlePostInvocation((Message<?>)message, res, (Throwable)fail)).onItem().transformToMulti(this::handleSkip)).buildRs();
    }

    private boolean isReturningAPublisherOrAPublisherBuilder() {
        Class<?> returnType = this.configuration.getReturnType();
        return ClassUtils.isAssignable(returnType, Publisher.class) || ClassUtils.isAssignable(returnType, PublisherBuilder.class);
    }

    private boolean isReturningAProcessorOrAProcessorBuilder() {
        Class<?> returnType = this.configuration.getReturnType();
        return ClassUtils.isAssignable(returnType, Processor.class) || ClassUtils.isAssignable(returnType, ProcessorBuilder.class);
    }

    private Object withPayloadOrMessage(Message<?> message) {
        return this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD ? message.getPayload() : message;
    }
}

