package io.smallrye.reactive.messaging;

import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.helpers.ClassUtils;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/smallrye/reactive/messaging/ProcessorMediator.class */
public class ProcessorMediator extends AbstractMediator {
    private Processor<Message, Message> processor;
    private PublisherBuilder<? extends Message> publisher;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ProcessorMediator(MediatorConfiguration mediatorConfiguration) {
        super(mediatorConfiguration);
        if (mediatorConfiguration.shape() != Shape.PROCESSOR) {
            throw new IllegalArgumentException("Expected a Processor shape, received a " + mediatorConfiguration.shape());
        }
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void connectToUpstream(PublisherBuilder<? extends Message> publisherBuilder) {
        if (!$assertionsDisabled && this.processor == null) {
            throw new AssertionError();
        }
        this.publisher = decorate(publisherBuilder.via((Processor<? super Object, ? extends R>) this.processor));
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public PublisherBuilder<? extends Message> getStream() {
        return (PublisherBuilder) Objects.requireNonNull(this.publisher);
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public boolean isConnected() {
        return this.publisher != null;
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void initialize(Object obj) {
        super.initialize(obj);
        switch (this.configuration.production()) {
            case STREAM_OF_MESSAGE:
                if (isReturningAProcessorOrAProcessorBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        processMethodReturningAProcessorBuilderOfMessages();
                        return;
                    } else {
                        processMethodReturningAProcessorOfMessages();
                        return;
                    }
                }
                if (!isReturningAPublisherOrAPublisherBuilder()) {
                    throw new IllegalArgumentException("Invalid Processor - unsupported signature for " + this.configuration.methodAsString());
                }
                if (this.configuration.usesBuilderTypes()) {
                    processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages();
                    return;
                } else {
                    processMethodReturningAPublisherOfMessageAndConsumingMessages();
                    return;
                }
            case STREAM_OF_PAYLOAD:
                if (isReturningAProcessorOrAProcessorBuilder()) {
                    if (this.configuration.usesBuilderTypes()) {
                        processMethodReturningAProcessorBuilderOfPayloads();
                        return;
                    } else {
                        processMethodReturningAProcessorOfPayloads();
                        return;
                    }
                }
                if (!isReturningAPublisherOrAPublisherBuilder()) {
                    throw new IllegalArgumentException("Invalid Processor - unsupported signature for " + this.configuration.methodAsString());
                }
                if (this.configuration.usesBuilderTypes()) {
                    processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads();
                    return;
                } else {
                    processMethodReturningAPublisherOfPayloadsAndConsumingPayloads();
                    return;
                }
            case INDIVIDUAL_MESSAGE:
                processMethodReturningIndividualMessageAndConsumingIndividualItem();
                return;
            case INDIVIDUAL_PAYLOAD:
                processMethodReturningIndividualPayloadAndConsumingIndividualItem();
                return;
            case COMPLETION_STAGE_OF_MESSAGE:
                processMethodReturningACompletionStageOfMessageAndConsumingIndividualMessage();
                return;
            case COMPLETION_STAGE_OF_PAYLOAD:
                processMethodReturningACompletionStageOfPayloadAndConsumingIndividualPayload();
                return;
            default:
                throw new IllegalArgumentException("Unexpected production type: " + this.configuration.production());
        }
    }

    private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message -> {
            return (PublisherBuilder) invoke(message);
        }).flatMap(Function.identity()).buildRs();
    }

    private void processMethodReturningAPublisherOfMessageAndConsumingMessages() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message -> {
            return (Publisher) invoke(message);
        }).flatMapRsPublisher(Function.identity()).buildRs();
    }

    private void processMethodReturningAProcessorBuilderOfMessages() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).via((ProcessorBuilder) Objects.requireNonNull(invoke(new Object[0]), "The method " + this.configuration.methodAsString() + " returned `null`")).buildRs();
    }

    private void processMethodReturningAProcessorOfMessages() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.PRE_PROCESSING ? message.ack().thenApply(obj -> {
                return message;
            }) : CompletableFuture.completedFuture(message);
        }).via((Processor) Objects.requireNonNull(invoke(new Object[0]), "The method " + this.configuration.methodAsString() + " returned `null`")).buildRs();
    }

    private void processMethodReturningAProcessorOfPayloads() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message -> {
            return message.getPayload();
        }).via((Processor) invoke(new Object[0])).map(Message::of).buildRs();
    }

    private void processMethodReturningAProcessorBuilderOfPayloads() {
        ProcessorBuilder processorBuilder = (ProcessorBuilder) invoke(new Object[0]);
        Objects.requireNonNull(processorBuilder, "The method " + this.configuration.methodAsString() + " has returned an invalid value: null");
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message -> {
            return message.getPayload();
        }).via(processorBuilder).map(Message::of).buildRs();
    }

    private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).flatMap(message -> {
            return ((PublisherBuilder) invoke(message.getPayload())).map(obj -> {
                return Message.of(obj, message.getMetadata());
            });
        }).buildRs();
    }

    private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).flatMap(message -> {
            return ReactiveStreams.fromPublisher((Publisher) invoke(message.getPayload())).map(obj -> {
                return Message.of(obj, message.getMetadata());
            });
        }).buildRs();
    }

    private void processMethodReturningIndividualMessageAndConsumingIndividualItem() {
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD) {
            this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message -> {
                return (Message) invoke(message.getPayload());
            }).buildRs();
        } else {
            this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message2 -> {
                return (Message) invoke(message2);
            }).buildRs();
        }
    }

    private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() {
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD) {
            this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message -> {
                Object invoke = invoke(message.getPayload());
                return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? message.withPayload(invoke) : Message.of(invoke, message.getMetadata());
            }).buildRs();
        } else {
            this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).map(message2 -> {
                Object invoke = invoke(message2);
                return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? Message.of(invoke, (Supplier<CompletionStage<Void>>) () -> {
                    return message2.ack();
                }) : Message.of(invoke);
            }).buildRs();
        }
    }

    private void processMethodReturningACompletionStageOfMessageAndConsumingIndividualMessage() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).flatMapCompletionStage(message -> {
            return (CompletionStage) invoke(message);
        }).buildRs();
    }

    private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividualPayload() {
        this.processor = ReactiveStreams.builder().flatMapCompletionStage((Function) managePreProcessingAck()).flatMapCompletionStage(message -> {
            return ((CompletionStage) invoke(message.getPayload())).thenApply(obj -> {
                return Message.of(obj, message.getMetadata(), (Supplier<CompletionStage<Void>>) () -> {
                    return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? message.ack() : CompletableFuture.completedFuture(null);
                });
            });
        }).buildRs();
    }

    private boolean isReturningAPublisherOrAPublisherBuilder() {
        Class<?> returnType = this.configuration.getReturnType();
        return ClassUtils.isAssignable(returnType, Publisher.class) || ClassUtils.isAssignable(returnType, PublisherBuilder.class);
    }

    private boolean isReturningAProcessorOrAProcessorBuilder() {
        Class<?> returnType = this.configuration.getReturnType();
        return ClassUtils.isAssignable(returnType, Processor.class) || ClassUtils.isAssignable(returnType, ProcessorBuilder.class);
    }

    static {
        $assertionsDisabled = !ProcessorMediator.class.desiredAssertionStatus();
    }
}
