package io.smallrye.reactive.messaging;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.helpers.ClassUtils;
import io.smallrye.reactive.messaging.helpers.IgnoringSubscriber;
import io.smallrye.reactive.messaging.helpers.MultiUtils;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.i18n.ProviderLogging;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/smallrye/reactive/messaging/SubscriberMediator.class */
public class SubscriberMediator extends AbstractMediator {
    private Multi<? extends Message<?>> source;
    private Subscriber<Message<?>> subscriber;
    private Function<Multi<? extends Message<?>>, Multi<? extends Message<?>>> function;
    private final AtomicReference<Subscription> subscription;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SubscriberMediator(MediatorConfiguration mediatorConfiguration) {
        super(mediatorConfiguration);
        this.subscription = new AtomicReference<>();
        if (mediatorConfiguration.shape() != Shape.SUBSCRIBER) {
            throw ProviderExceptions.ex.illegalArgumentForSubscriberShape(mediatorConfiguration.shape());
        }
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void initialize(Object obj) {
        super.initialize(obj);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE:
            case STREAM_OF_PAYLOAD:
                processMethodReturningASubscriber();
                break;
            case MESSAGE:
            case PAYLOAD:
                if (!ClassUtils.isAssignable(this.configuration.getReturnType(), CompletionStage.class)) {
                    if (!ClassUtils.isAssignable(this.configuration.getReturnType(), Uni.class)) {
                        processMethodReturningVoid();
                        break;
                    } else {
                        processMethodReturningAUni();
                        break;
                    }
                } else {
                    processMethodReturningACompletionStage();
                    break;
                }
            default:
                throw ProviderExceptions.ex.illegalArgumentForUnexpectedConsumption(this.configuration.consumption());
        }
        if (!$assertionsDisabled && this.subscriber == null) {
            throw new AssertionError();
        }
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public Subscriber<Message<?>> getComputedSubscriber() {
        return this.subscriber;
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public boolean isConnected() {
        return this.source != null;
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void connectToUpstream(Multi<? extends Message<?>> multi) {
        this.source = convert(multi);
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void run() {
        if (!$assertionsDisabled && this.source == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.function == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.subscriber == null) {
            throw new AssertionError();
        }
        final AtomicReference atomicReference = new AtomicReference();
        final Subscriber<Message<?>> subscriber = this.subscriber;
        this.function.apply(this.source).subscribe(new Subscriber<Message<?>>() { // from class: io.smallrye.reactive.messaging.SubscriberMediator.1
            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(final Subscription subscription) {
                SubscriberMediator.this.subscription.set(subscription);
                subscriber.onSubscribe(new Subscription() { // from class: io.smallrye.reactive.messaging.SubscriberMediator.1.1
                    @Override // org.reactivestreams.Subscription
                    public void request(long j) {
                        subscription.request(j);
                    }

                    @Override // org.reactivestreams.Subscription
                    public void cancel() {
                        subscription.cancel();
                    }
                });
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(Message<?> message) {
                try {
                    subscriber.onNext(message);
                } catch (Exception e) {
                    ProviderLogging.log.messageProcessingException(e);
                    atomicReference.set(e);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                ProviderLogging.log.messageProcessingException(th);
                atomicReference.set(th);
                subscriber.onError(th);
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                subscriber.onComplete();
            }
        });
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw ProviderExceptions.ex.weavingForIncoming(this.configuration.getIncoming(), th);
        }
    }

    private void processMethodReturningVoid() {
        this.subscriber = IgnoringSubscriber.INSTANCE;
        if (!this.configuration.isBlocking()) {
            this.function = multi -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration).onItem().transformToUniAndConcatenate(message -> {
                    return Uni.createFrom().item(() -> {
                        return invoke(message.getPayload());
                    }).onItemOrFailure().transformToUni(handleInvocationResult(message));
                }).onFailure().invoke(th -> {
                    this.health.reportApplicationFailure(this.configuration.methodAsString(), th);
                });
            };
        } else if (this.configuration.isBlockingExecutionOrdered()) {
            this.function = multi2 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi2, this.configuration).onItem().transformToUniAndConcatenate(message -> {
                    return invokeBlocking(message.getPayload()).onItemOrFailure().transformToUni(handleInvocationResult(message));
                }).onFailure().invoke(th -> {
                    this.health.reportApplicationFailure(this.configuration.methodAsString(), th);
                });
            };
        } else {
            this.function = multi3 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi3, this.configuration).onItem().transformToUniAndMerge(message -> {
                    return invokeBlocking(message.getPayload()).onItemOrFailure().transformToUni(handleInvocationResult(message));
                }).onFailure().invoke(th -> {
                    this.health.reportApplicationFailure(this.configuration.methodAsString(), th);
                });
            };
        }
    }

    private BiFunction<Object, Throwable, Uni<? extends Message<?>>> handleInvocationResult(Message<?> message) {
        return (obj, th) -> {
            return th != null ? this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? Uni.createFrom().completionStage((CompletionStage) message.nack(th).thenApply(r3 -> {
                return message;
            })) : Uni.createFrom().failure(th) : this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? Uni.createFrom().completionStage((CompletionStage) message.ack().thenApply(r32 -> {
                return message;
            })) : Uni.createFrom().item((UniCreate) message);
        };
    }

    private void processMethodReturningACompletionStage() {
        this.subscriber = IgnoringSubscriber.INSTANCE;
        boolean z = MediatorConfiguration.Consumption.PAYLOAD == this.configuration.consumption();
        if (!this.configuration.isBlocking()) {
            this.function = multi -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration).onItem().transformToUniAndConcatenate(message -> {
                    return (z ? Uni.createFrom().completionStage(() -> {
                        return (CompletionStage) invoke(message.getPayload());
                    }) : Uni.createFrom().completionStage(() -> {
                        return (CompletionStage) invoke(message);
                    })).onItemOrFailure().transformToUni(handleInvocationResult(message));
                }).onFailure().invoke(this::reportFailure);
            };
        } else if (this.configuration.isBlockingExecutionOrdered()) {
            this.function = multi2 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi2, this.configuration).onItem().transformToUniAndConcatenate(message -> {
                    return invokeBlockingAndHandleOutcome(z, message);
                }).onFailure().invoke(this::reportFailure);
            };
        } else {
            this.function = multi3 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi3, this.configuration).onItem().transformToUniAndMerge(message -> {
                    return invokeBlockingAndHandleOutcome(z, message);
                }).onFailure().invoke(this::reportFailure);
            };
        }
    }

    private Uni<? extends Message<?>> invokeBlockingAndHandleOutcome(boolean z, Message<?> message) {
        return (z ? invokeBlocking(message.getPayload()) : invokeBlocking(message)).onItemOrFailure().transformToUni(handleInvocationResult(message));
    }

    private void reportFailure(Throwable th) {
        ProviderLogging.log.messageProcessingException(th);
        this.health.reportApplicationFailure(this.configuration.methodAsString(), th);
    }

    private void processMethodReturningAUni() {
        this.subscriber = IgnoringSubscriber.INSTANCE;
        boolean z = MediatorConfiguration.Consumption.PAYLOAD == this.configuration.consumption();
        if (!this.configuration.isBlocking()) {
            this.function = multi -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration).onItem().transformToUniAndConcatenate(message -> {
                    return (z ? (Uni) invoke(message.getPayload()) : (Uni) invoke(message)).onItemOrFailure().transformToUni(handleInvocationResult(message));
                }).onFailure().invoke(this::reportFailure);
            };
        } else if (this.configuration.isBlockingExecutionOrdered()) {
            this.function = multi2 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi2, this.configuration).onItem().transformToUniAndConcatenate(message -> {
                    return invokeBlockingAndHandleOutcome(z, message);
                }).onFailure().invoke(this::reportFailure);
            };
        } else {
            this.function = multi3 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi3, this.configuration).onItem().transformToUniAndMerge(message -> {
                    return invokeBlockingAndHandleOutcome(z, message);
                }).onFailure().invoke(this::reportFailure);
            };
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [org.reactivestreams.Subscriber] */
    /* JADX WARN: Type inference failed for: r0v25, types: [org.reactivestreams.Subscriber] */
    private void processMethodReturningASubscriber() {
        Object invoke = invoke(new Object[0]);
        if (!(invoke instanceof Subscriber) && !(invoke instanceof SubscriberBuilder)) {
            throw ProviderExceptions.ex.illegalStateExceptionForSubscriberOrSubscriberBuilder(invoke.getClass().getName());
        }
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD) {
            SubscriberWrapper subscriberWrapper = new SubscriberWrapper(invoke instanceof Subscriber ? (Subscriber) invoke : ((SubscriberBuilder) invoke).build(), (v0) -> {
                return v0.getPayload();
            }, (message, th) -> {
                if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                    return th != null ? message.nack(th) : message.ack();
                }
                CompletableFuture completableFuture = new CompletableFuture();
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(null);
                }
                return completableFuture;
            });
            this.function = multi -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration);
            };
            this.subscriber = subscriberWrapper;
        } else {
            CompletionSubscriber build = invoke instanceof Subscriber ? (Subscriber) invoke : ((SubscriberBuilder) invoke).build();
            this.function = multi2 -> {
                return MultiUtils.handlePreProcessingAcknowledgement(multi2, this.configuration);
            };
            this.subscriber = build;
        }
    }

    static {
        $assertionsDisabled = !SubscriberMediator.class.desiredAssertionStatus();
    }
}
