package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSenderProcessor.class */
class KafkaSenderProcessor implements Processor<Message<?>, Message<?>>, Subscription {
    private final long inflights;
    private final boolean waitForCompletion;
    private final Function<Message<?>, Uni<Void>> send;
    private final AtomicReference<Subscription> subscription = new AtomicReference<>();
    private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference<>();

    public KafkaSenderProcessor(long j, boolean z, Function<Message<?>, Uni<Void>> function) {
        this.inflights = j;
        this.waitForCompletion = z;
        this.send = function;
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, KafkaExceptions.ex.illegalStateOnlyOneSubscriber());
        } else if (this.subscription.get() != null) {
            subscriber.onSubscribe(this);
        }
    }

    public void onSubscribe(Subscription subscription) {
        if (this.subscription.compareAndSet(null, subscription)) {
            Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe(this);
                return;
            }
            return;
        }
        Subscriber<? super Message<?>> subscriber2 = this.downstream.get();
        if (subscriber2 != null) {
            subscriber2.onSubscribe(Subscriptions.CANCELLED);
        }
    }

    public void onNext(Message<?> message) {
        if (this.waitForCompletion) {
            this.send.apply(message).subscribe().with(r5 -> {
                requestNext(message);
            }, this::onError);
        } else {
            this.send.apply(message).subscribe().with(r1 -> {
            }, this::onError);
            requestNext(message);
        }
    }

    public void request(long j) {
        if (j != Long.MAX_VALUE) {
            throw KafkaExceptions.ex.illegalStateConsumeWithoutBackPressure();
        }
        this.subscription.get().request(this.inflights);
    }

    public void cancel() {
        Subscription andSet = this.subscription.getAndSet(Subscriptions.CANCELLED);
        if (andSet != null) {
            andSet.cancel();
        }
    }

    private void requestNext(Message<?> message) {
        Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (subscriber != null) {
            subscriber.onNext(message);
        }
        Subscription subscription = this.subscription.get();
        if (subscription == null || this.inflights == Long.MAX_VALUE) {
            return;
        }
        subscription.request(1L);
    }

    public void onError(Throwable th) {
        Subscriber<? super Message<?>> andSet = this.downstream.getAndSet(null);
        if (andSet != null) {
            andSet.onError(th);
        }
    }

    public void onComplete() {
        Subscriber<? super Message<?>> andSet = this.downstream.getAndSet(null);
        if (andSet != null) {
            andSet.onComplete();
        }
    }
}
