package io.smallrye.reactive.messaging.amqp;

import io.netty.handler.codec.http.HttpHeaders;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.vertx.amqp.impl.AmqpMessageImpl;
import io.vertx.mutiny.amqp.AmqpSender;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.class */
public class AmqpCreditBasedSender implements Flow.Processor<Message<?>, Message<?>>, Flow.Subscription {
    private final ConnectionHolder holder;
    private final Uni<AmqpSender> retrieveSender;
    private final AmqpConnectorOutgoingConfiguration configuration;
    private final AmqpConnector connector;
    private final boolean durable;
    private final long ttl;
    private final String configuredAddress;
    private final boolean tracingEnabled;
    private final boolean mandatoryCloudEventAttributeSet;
    private final boolean writeCloudEvents;
    private final boolean writeAsBinaryCloudEvent;
    private final int retryAttempts;
    private final int retryInterval;
    private final AmqpOpenTelemetryInstrumenter amqpInstrumenter;
    private volatile boolean isAnonymous;
    private final AtomicLong requested = new AtomicLong();
    private final AtomicReference<Flow.Subscription> upstream = new AtomicReference<>();
    private final AtomicReference<Flow.Subscriber<? super Message<?>>> downstream = new AtomicReference<>();
    private final AtomicBoolean once = new AtomicBoolean();
    private volatile boolean creditRetrievalInProgress = false;

    public AmqpCreditBasedSender(AmqpConnector amqpConnector, ConnectionHolder connectionHolder, AmqpConnectorOutgoingConfiguration amqpConnectorOutgoingConfiguration, Uni<AmqpSender> uni) {
        this.connector = amqpConnector;
        this.holder = connectionHolder;
        this.retrieveSender = uni;
        this.configuration = amqpConnectorOutgoingConfiguration;
        this.durable = amqpConnectorOutgoingConfiguration.getDurable().booleanValue();
        this.ttl = amqpConnectorOutgoingConfiguration.getTtl().longValue();
        Optional<String> address = amqpConnectorOutgoingConfiguration.getAddress();
        Objects.requireNonNull(amqpConnectorOutgoingConfiguration);
        this.configuredAddress = address.orElseGet(amqpConnectorOutgoingConfiguration::getChannel);
        this.tracingEnabled = amqpConnectorOutgoingConfiguration.getTracingEnabled().booleanValue();
        this.mandatoryCloudEventAttributeSet = amqpConnectorOutgoingConfiguration.getCloudEventsType().isPresent() && amqpConnectorOutgoingConfiguration.getCloudEventsSource().isPresent();
        this.writeCloudEvents = amqpConnectorOutgoingConfiguration.getCloudEvents().booleanValue();
        this.writeAsBinaryCloudEvent = amqpConnectorOutgoingConfiguration.getCloudEventsMode().equalsIgnoreCase(HttpHeaders.Values.BINARY);
        this.retryAttempts = amqpConnectorOutgoingConfiguration.getReconnectAttempts().intValue();
        this.retryInterval = amqpConnectorOutgoingConfiguration.getReconnectInterval().intValue();
        if (this.tracingEnabled) {
            this.amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender();
        } else {
            this.amqpInstrumenter = null;
        }
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, AMQPExceptions.ex.illegalStateOnlyOneSubscriberAllowed());
        } else if (this.upstream.get() != null) {
            subscriber.onSubscribe(this);
        }
    }

    private Uni<AmqpSender> getSenderAndCredits() {
        return this.retrieveSender.onItem().call(amqpSender -> {
            this.isAnonymous = this.configuration.getUseAnonymousSender().orElseGet(() -> {
                return Boolean.valueOf(ConnectionHolder.supportAnonymousRelay(amqpSender.connection()));
            }).booleanValue();
            CompletableFuture completableFuture = new CompletableFuture();
            this.holder.getContext().runOnContext(() -> {
                setCreditsAndRequest(amqpSender);
                completableFuture.complete(null);
            });
            return Uni.createFrom().completionStage(completableFuture);
        });
    }

    @CheckReturnValue
    public Uni<Boolean> isConnected() {
        return isConnected(true);
    }

    public int getHealthTimeout() {
        return this.configuration.getHealthTimeout().intValue();
    }

    private Uni<Boolean> isConnected(boolean z) {
        return this.holder.isConnected().chain(bool -> {
            return (bool.booleanValue() || !z) ? Uni.createFrom().item((UniCreate) bool) : this.holder.getOrEstablishConnection().chain(amqpConnection -> {
                return isConnected(false);
            });
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.upstream.compareAndSet(null, subscription)) {
            Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe(this);
                return;
            }
            return;
        }
        Flow.Subscriber<? super Message<?>> subscriber2 = this.downstream.get();
        if (subscriber2 != null) {
            subscriber2.onSubscribe(Subscriptions.CANCELLED);
        }
    }

    private long setCreditsAndRequest(AmqpSender amqpSender) {
        long remainingCredits = amqpSender.remainingCredits();
        Flow.Subscription subscription = this.upstream.get();
        if (remainingCredits != 0 && subscription != Subscriptions.CANCELLED) {
            this.requested.set(remainingCredits);
            AMQPLogging.log.retrievedCreditsForChannel(this.configuration.getChannel(), remainingCredits);
            subscription.request(remainingCredits);
            return remainingCredits;
        }
        if (remainingCredits != 0 || subscription == Subscriptions.CANCELLED) {
            return 0L;
        }
        onNoMoreCredit(amqpSender);
        return 0L;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(Message<?> message) {
        if (isCancelled()) {
            return;
        }
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        UniSubscribe subscribe = this.retrieveSender.onItem().transformToUni(amqpSender -> {
            try {
                return send(amqpSender, message, this.durable, this.ttl, this.configuredAddress, this.isAnonymous).onItem().transform(message2 -> {
                    return Tuple2.of(amqpSender, message2);
                });
            } catch (Exception e) {
                message.nack(e);
                AMQPLogging.log.serializationFailure(this.configuration.getChannel(), e);
                return Uni.createFrom().nullItem();
            }
        }).subscribe();
        Consumer consumer = tuple2 -> {
            if (tuple2 != null) {
                subscriber.onNext(tuple2.getItem2());
                if (this.requested.decrementAndGet() == 0) {
                    onNoMoreCredit((AmqpSender) tuple2.getItem1());
                }
            }
        };
        Objects.requireNonNull(subscriber);
        subscribe.with(consumer, subscriber::onError);
    }

    private void onNoMoreCredit(AmqpSender amqpSender) {
        if (this.creditRetrievalInProgress) {
            return;
        }
        this.creditRetrievalInProgress = true;
        AMQPLogging.log.noMoreCreditsForChannel(this.configuration.getChannel());
        this.holder.getContext().runOnContext(() -> {
            if (isCancelled()) {
                return;
            }
            if (setCreditsAndRequest(amqpSender) == 0) {
                this.holder.getVertx().setPeriodic(this.configuration.getCreditRetrievalPeriod().intValue(), l -> {
                    if (setCreditsAndRequest(amqpSender) != 0 || isCancelled()) {
                        this.holder.getVertx().cancelTimer(l.longValue());
                        this.creditRetrievalInProgress = false;
                    }
                });
            } else {
                this.creditRetrievalInProgress = false;
            }
        });
    }

    private boolean isCancelled() {
        Flow.Subscription subscription = this.upstream.get();
        return subscription == Subscriptions.CANCELLED || subscription == null;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        Flow.Subscription andSet = this.upstream.getAndSet(Subscriptions.CANCELLED);
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (andSet == null || andSet == Subscriptions.CANCELLED || subscriber == null) {
            return;
        }
        subscriber.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        Flow.Subscription andSet = this.upstream.getAndSet(Subscriptions.CANCELLED);
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (andSet == null || andSet == Subscriptions.CANCELLED || subscriber == null) {
            return;
        }
        subscriber.onComplete();
    }

    @Override // java.util.concurrent.Flow.Subscription
    public void request(long j) {
        if (this.once.getAndSet(true)) {
            return;
        }
        getSenderAndCredits().onItem().ignore().andContinueWithNull().subscribe().with(r1 -> {
        }, th -> {
            this.downstream.get().onError(th);
        });
    }

    @Override // java.util.concurrent.Flow.Subscription
    public void cancel() {
        Flow.Subscription andSet = this.upstream.getAndSet(Subscriptions.CANCELLED);
        if (andSet == null || andSet == Subscriptions.CANCELLED) {
            return;
        }
        andSet.cancel();
    }

    private Uni<Message<?>> send(AmqpSender amqpSender, Message<?> message, boolean z, long j, String str, boolean z2) {
        OutgoingCloudEventMetadata outgoingCloudEventMetadata = (OutgoingCloudEventMetadata) message.getMetadata(OutgoingCloudEventMetadata.class).orElse(null);
        io.vertx.mutiny.amqp.AmqpMessage amqpMessage = message instanceof AmqpMessage ? ((AmqpMessage) message).getAmqpMessage() : message.getPayload() instanceof io.vertx.mutiny.amqp.AmqpMessage ? (io.vertx.mutiny.amqp.AmqpMessage) message.getPayload() : message.getPayload() instanceof io.vertx.amqp.AmqpMessage ? new io.vertx.mutiny.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage) message.getPayload()) : message.getPayload() instanceof org.apache.qpid.proton.message.Message ? new io.vertx.mutiny.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage) new AmqpMessageImpl((org.apache.qpid.proton.message.Message) message.getPayload())) : AmqpMessageConverter.convertToAmqpMessage(message, z, j);
        if (this.writeCloudEvents && (outgoingCloudEventMetadata != null || this.mandatoryCloudEventAttributeSet)) {
            amqpMessage = this.writeAsBinaryCloudEvent ? AmqpCloudEventHelper.createBinaryCloudEventMessage(amqpMessage, outgoingCloudEventMetadata, this.configuration) : AmqpCloudEventHelper.createStructuredEventMessage(amqpMessage, outgoingCloudEventMetadata, this.configuration);
        }
        String actualAddress = getActualAddress(message, amqpMessage, str, z2);
        if (this.connector.getClients().isEmpty()) {
            AMQPLogging.log.messageNoSend(actualAddress);
            return Uni.createFrom().item((UniCreate) message);
        }
        if (!actualAddress.equals(amqpMessage.address())) {
            amqpMessage.getDelegate().unwrap().setAddress(actualAddress);
        }
        if (this.tracingEnabled) {
            this.amqpInstrumenter.traceOutgoing(message, new AmqpMessage<>(amqpMessage, (Context) null, (AmqpFailureHandler) null, false, (Boolean) true));
        }
        AMQPLogging.log.sendingMessageToAddress(actualAddress);
        return amqpSender.sendWithAck(amqpMessage).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(this.retryInterval)).atMost(this.retryAttempts).onItemOrFailure().transformToUni((r5, th) -> {
            return th != null ? Uni.createFrom().completionStage(message.nack(th)) : Uni.createFrom().completionStage(message.ack());
        }).onItem().transform(r3 -> {
            return message;
        });
    }

    private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqpMessage, String str, boolean z) {
        String address = amqpMessage.address();
        if (address == null) {
            return (String) message.getMetadata(OutgoingAmqpMetadata.class).flatMap(outgoingAmqpMetadata -> {
                String address2 = outgoingAmqpMetadata.getAddress();
                if (address2 == null || z) {
                    return Optional.ofNullable(address2);
                }
                AMQPLogging.log.unableToUseAddress(address2, str);
                return Optional.empty();
            }).orElse(str);
        }
        if (z) {
            return address;
        }
        AMQPLogging.log.unableToUseAddress(address, str);
        return str;
    }
}
