/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.providers;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.providers.AbstractMediator;
import io.smallrye.reactive.messaging.providers.SubscriberWrapper;
import io.smallrye.reactive.messaging.providers.helpers.ClassUtils;
import io.smallrye.reactive.messaging.providers.helpers.IgnoringSubscriber;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class SubscriberMediator
extends AbstractMediator {
    private Multi<? extends Message<?>> source;
    private Subscriber<Message<?>> subscriber;
    private Function<Multi<? extends Message<?>>, Multi<? extends Message<?>>> function;
    private final AtomicReference<Subscription> subscription = new AtomicReference();

    public SubscriberMediator(MediatorConfiguration configuration) {
        super(configuration);
        if (configuration.shape() != Shape.SUBSCRIBER) {
            throw ProviderExceptions.ex.illegalArgumentForSubscriberShape(configuration.shape());
        }
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE: 
            case STREAM_OF_PAYLOAD: {
                this.processMethodReturningASubscriber();
                break;
            }
            case MESSAGE: 
            case PAYLOAD: {
                if (ClassUtils.isAssignable(this.configuration.getReturnType(), CompletionStage.class)) {
                    this.processMethodReturningACompletionStage();
                    break;
                }
                if (ClassUtils.isAssignable(this.configuration.getReturnType(), Uni.class)) {
                    this.processMethodReturningAUni();
                    break;
                }
                this.processMethodReturningVoid();
                break;
            }
            default: {
                throw ProviderExceptions.ex.illegalArgumentForUnexpectedConsumption(this.configuration.consumption());
            }
        }
        assert (this.subscriber != null);
    }

    @Override
    public Subscriber<Message<?>> getComputedSubscriber() {
        return this.subscriber;
    }

    @Override
    public boolean isConnected() {
        return this.source != null;
    }

    @Override
    public void connectToUpstream(Multi<? extends Message<?>> publisher) {
        this.source = this.convert(publisher);
    }

    @Override
    public void run() {
        assert (this.source != null);
        assert (this.function != null);
        assert (this.subscriber != null);
        final AtomicReference syncErrorCatcher = new AtomicReference();
        final Subscriber<Message<?>> delegate = this.subscriber;
        Subscriber delegating = new Subscriber<Message<?>>(){

            @Override
            public void onSubscribe(final Subscription s) {
                SubscriberMediator.this.subscription.set(s);
                delegate.onSubscribe(new Subscription(){

                    @Override
                    public void request(long n) {
                        s.request(n);
                    }

                    @Override
                    public void cancel() {
                        s.cancel();
                    }
                });
            }

            @Override
            public void onNext(Message<?> o) {
                try {
                    delegate.onNext(o);
                }
                catch (Exception e) {
                    ProviderLogging.log.messageProcessingException(e);
                    syncErrorCatcher.set(e);
                }
            }

            @Override
            public void onError(Throwable t) {
                ProviderLogging.log.messageProcessingException(t);
                syncErrorCatcher.set(t);
                delegate.onError(t);
            }

            @Override
            public void onComplete() {
                delegate.onComplete();
            }
        };
        this.function.apply(this.source).subscribe(delegating);
        Throwable throwable = (Throwable)syncErrorCatcher.get();
        if (throwable != null) {
            throw ProviderExceptions.ex.weavingForIncoming(this.configuration.getIncoming(), throwable);
        }
    }

    private void processMethodReturningVoid() {
        this.subscriber = IgnoringSubscriber.INSTANCE;
        this.function = this.configuration.isBlocking() ? (this.configuration.isBlockingExecutionOrdered() ? upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndConcatenate(msg -> this.invokeBlocking(msg.getPayload()).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)msg))).onFailure().invoke((Throwable failure) -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndMerge(msg -> this.invokeBlocking(msg.getPayload()).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)msg))).onFailure().invoke((Throwable failure) -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure))) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().item(() -> this.invoke(msg.getPayload())).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)msg))).onFailure().invoke((Throwable failure) -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure));
    }

    private BiFunction<Object, Throwable, Uni<? extends Message<?>>> handleInvocationResult(Message<?> m) {
        return (success, failure) -> {
            if (failure != null) {
                if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                    return Uni.createFrom().completionStage(m.nack((Throwable)failure).thenApply(x -> m));
                }
                return Uni.createFrom().failure((Throwable)failure);
            }
            if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                return Uni.createFrom().completionStage(m.ack().thenApply(x -> m));
            }
            return Uni.createFrom().item(m);
        };
    }

    private void processMethodReturningACompletionStage() {
        boolean invokeWithPayload;
        this.subscriber = IgnoringSubscriber.INSTANCE;
        boolean bl = invokeWithPayload = MediatorConfiguration.Consumption.PAYLOAD == this.configuration.consumption();
        this.function = this.configuration.isBlocking() ? (this.configuration.isBlockingExecutionOrdered() ? upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndConcatenate(msg -> this.invokeBlockingAndHandleOutcome(invokeWithPayload, (Message<?>)msg)).onFailure().invoke(this::reportFailure) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndMerge(msg -> this.invokeBlockingAndHandleOutcome(invokeWithPayload, (Message<?>)msg)).onFailure().invoke(this::reportFailure)) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndConcatenate(msg -> {
            Uni uni = invokeWithPayload ? Uni.createFrom().completionStage(() -> (CompletionStage)this.invoke(msg.getPayload())) : Uni.createFrom().completionStage(() -> (CompletionStage)this.invoke(msg));
            return uni.onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)msg));
        }).onFailure().invoke(this::reportFailure);
    }

    private Uni<? extends Message<?>> invokeBlockingAndHandleOutcome(boolean invokeWithPayload, Message<?> msg) {
        Uni uni = invokeWithPayload ? this.invokeBlocking(msg.getPayload()) : this.invokeBlocking(msg);
        return uni.onItemOrFailure().transformToUni(this.handleInvocationResult(msg));
    }

    private void reportFailure(Throwable failure) {
        ProviderLogging.log.messageProcessingException(failure);
        this.health.reportApplicationFailure(this.configuration.methodAsString(), failure);
    }

    private void processMethodReturningAUni() {
        boolean invokeWithPayload;
        this.subscriber = IgnoringSubscriber.INSTANCE;
        boolean bl = invokeWithPayload = MediatorConfiguration.Consumption.PAYLOAD == this.configuration.consumption();
        this.function = this.configuration.isBlocking() ? (this.configuration.isBlockingExecutionOrdered() ? upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndConcatenate(msg -> this.invokeBlockingAndHandleOutcome(invokeWithPayload, (Message<?>)msg)).onFailure().invoke(this::reportFailure) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndMerge(msg -> this.invokeBlockingAndHandleOutcome(invokeWithPayload, (Message<?>)msg)).onFailure().invoke(this::reportFailure)) : upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration).onItem().transformToUniAndConcatenate(msg -> {
            Uni uni = invokeWithPayload ? (Uni)this.invoke(msg.getPayload()) : (Uni)this.invoke(msg);
            return uni.onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)msg));
        }).onFailure().invoke(this::reportFailure);
    }

    private void processMethodReturningASubscriber() {
        Object result2 = this.invoke(new Object[0]);
        if (!(result2 instanceof Subscriber) && !(result2 instanceof SubscriberBuilder)) {
            throw ProviderExceptions.ex.illegalStateExceptionForSubscriberOrSubscriberBuilder(result2.getClass().getName());
        }
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD) {
            CompletionSubscriber userSubscriber = result2 instanceof Subscriber ? (CompletionSubscriber)result2 : ((SubscriberBuilder)result2).build();
            SubscriberWrapper<Object, Message> wrapper = new SubscriberWrapper<Object, Message>(userSubscriber, Message::getPayload, (m, t) -> {
                if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                    if (t != null) {
                        return m.nack((Throwable)t);
                    }
                    return m.ack();
                }
                CompletableFuture future = new CompletableFuture();
                if (t != null) {
                    future.completeExceptionally((Throwable)t);
                } else {
                    future.complete(null);
                }
                return future;
            });
            this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            this.subscriber = wrapper;
        } else {
            CompletionSubscriber sub = result2 instanceof Subscriber ? (CompletionSubscriber)result2 : ((SubscriberBuilder)result2).build();
            this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, this.configuration);
            this.subscriber = sub;
        }
    }
}

