package io.smallrye.reactive.messaging.amqp;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.rtsp.RtspHeaders;
import io.smallrye.config.common.utils.ConfigSourceUtil;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnFailure;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.amqp.fault.AmqpAccept;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailStop;
import io.smallrye.reactive.messaging.amqp.fault.AmqpFailureHandler;
import io.smallrye.reactive.messaging.amqp.fault.AmqpModifiedFailed;
import io.smallrye.reactive.messaging.amqp.fault.AmqpModifiedFailedAndUndeliverableHere;
import io.smallrye.reactive.messaging.amqp.fault.AmqpReject;
import io.smallrye.reactive.messaging.amqp.fault.AmqpRelease;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSenderOptions;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpReceiver;
import io.vertx.mutiny.amqp.AmqpSender;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import io.vertx.proton.ProtonSender;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.validator.Var;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(AmqpConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "username", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The username used to authenticate to the broker", type = "string", alias = "amqp-username"), @ConnectorAttribute(name = "password", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The password used to authenticate to the broker", type = "string", alias = "amqp-password"), @ConnectorAttribute(name = "host", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The broker hostname", type = "string", alias = "amqp-host", defaultValue = "localhost"), @ConnectorAttribute(name = "port", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The broker port", type = Var.JSTYPE_INT, alias = "amqp-port", defaultValue = "5672"), @ConnectorAttribute(name = "use-ssl", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether the AMQP connection uses SSL/TLS", type = "boolean", alias = "amqp-use-ssl", defaultValue = "false"), @ConnectorAttribute(name = "virtual-host", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)", type = "string", alias = "amqp-virtual-host"), @ConnectorAttribute(name = "sni-server-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "If set, explicitly override the hostname to use for the TLS SNI server name", type = "string", alias = "amqp-sni-server-name"), @ConnectorAttribute(name = "reconnect-attempts", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The number of reconnection attempts", type = Var.JSTYPE_INT, alias = "amqp-reconnect-attempts", defaultValue = ConfigSourceUtil.CONFIG_ORDINAL_100), @ConnectorAttribute(name = "reconnect-interval", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The interval in second between two reconnection attempts", type = Var.JSTYPE_INT, alias = "amqp-reconnect-interval", defaultValue = "10"), @ConnectorAttribute(name = "connect-timeout", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The connection timeout in milliseconds", type = Var.JSTYPE_INT, alias = "amqp-connect-timeout", defaultValue = "1000"), @ConnectorAttribute(name = "container-id", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The AMQP container id", type = "string"), @ConnectorAttribute(name = "address", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The AMQP address. If not set, the channel name is used", type = "string"), @ConnectorAttribute(name = "link-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the link. If not set, the channel name is used.", type = "string"), @ConnectorAttribute(name = "client-options-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the AMQP Client Option bean used to customize the AMQP client configuration", type = "string", alias = "amqp-client-options-name"), @ConnectorAttribute(name = "client-ssl-context-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of an SSLContext bean to use for connecting to AMQP when SSL is used", type = "string", alias = "amqp-client-ssl-context-name", hiddenFromDocumentation = true), @ConnectorAttribute(name = "tracing-enabled", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "health-timeout", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.", type = Var.JSTYPE_INT, defaultValue = "3"), @ConnectorAttribute(name = "cloud-events", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.", defaultValue = "true"), @ConnectorAttribute(name = "capabilities", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = " A comma-separated list of capabilities proposed by the sender or receiver client."), @ConnectorAttribute(name = ConnectorConfig.BROADCAST_PROPERTY, direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the received AMQP messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "durable", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether AMQP subscription is durable", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "auto-acknowledgement", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the received AMQP messages must be acknowledged when received", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "failure-strategy", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are `fail` (default), `accept`, `release`, `reject`, `modified-failed`, `modified-failed-undeliverable-here`", defaultValue = "fail"), @ConnectorAttribute(name = "selector", direction = ConnectorAttribute.Direction.INCOMING, description = "Sets a message selector. This attribute is used to define an `apache.org:selector-filter:string` filter on the source terminus, using SQL-based syntax to request the server filters which messages are delivered to the receiver (if supported by the server in question). Precise functionality supported and syntax needed can vary depending on the server.", type = "string"), @ConnectorAttribute(name = "durable", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether sent AMQP messages are marked durable", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = RtspHeaders.Values.TTL, direction = ConnectorAttribute.Direction.OUTGOING, description = "The time-to-live of the send AMQP messages. 0 to disable the TTL", type = "long", defaultValue = "0"), @ConnectorAttribute(name = "credit-retrieval-period", direction = ConnectorAttribute.Direction.OUTGOING, description = "The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits.", type = Var.JSTYPE_INT, defaultValue = "2000"), @ConnectorAttribute(name = "use-anonymous-sender", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether or not the connector should use an anonymous sender. Default value is `true` if the broker supports it, `false` otherwise. If not supported, it is not possible to dynamically change the destination address.", type = "boolean"), @ConnectorAttribute(name = ConnectorConfig.MERGE_PROPERTY, direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "cloud-events-source", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `source` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `source` attribute itself", alias = "cloud-events-default-source"), @ConnectorAttribute(name = "cloud-events-type", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `type` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `type` attribute itself", alias = "cloud-events-default-type"), @ConnectorAttribute(name = "cloud-events-subject", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `subject` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `subject` attribute itself", alias = "cloud-events-default-subject"), @ConnectorAttribute(name = "cloud-events-data-content-type", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `datacontenttype` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `datacontenttype` attribute itself", alias = "cloud-events-default-data-content-type"), @ConnectorAttribute(name = "cloud-events-data-schema", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `dataschema` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `dataschema` attribute itself", alias = "cloud-events-default-data-schema"), @ConnectorAttribute(name = "cloud-events-insert-timestamp", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether or not the connector should insert automatically the `time` attribute into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself", alias = "cloud-events-default-timestamp", defaultValue = "true"), @ConnectorAttribute(name = "cloud-events-mode", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record", defaultValue = HttpHeaders.Values.BINARY)})
/* loaded from: input_file:io/smallrye/reactive/messaging/amqp/AmqpConnector.class */
public class AmqpConnector implements InboundConnector, OutboundConnector, HealthReporter {
    public static final String CONNECTOR_NAME = "smallrye-amqp";

    @Inject
    ExecutionHolder executionHolder;

    @Inject
    @Any
    Instance<AmqpClientOptions> clientOptions;

    @Inject
    @Any
    Instance<SSLContext> clientSslContexts;
    private final List<AmqpClient> clients = new CopyOnWriteArrayList();
    private final Map<String, AmqpCreditBasedSender> processors = new ConcurrentHashMap();
    private final Map<String, Boolean> opened = new ConcurrentHashMap();
    private final Map<String, ConnectionHolder> holders = new ConcurrentHashMap();
    private volatile AmqpOpenTelemetryInstrumenter amqpInstrumenter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setup(ExecutionHolder executionHolder) {
        this.executionHolder = executionHolder;
    }

    private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver amqpReceiver, ConnectionHolder connectionHolder, String str, String str2, AmqpFailureHandler amqpFailureHandler, boolean z, Boolean bool) {
        AMQPLogging.log.receiverListeningAddress(str);
        BroadcastProcessor create = BroadcastProcessor.create();
        amqpReceiver.exceptionHandler(th -> {
            AMQPLogging.log.receiverError(th);
            create.onError(th);
        });
        Objects.requireNonNull(create);
        connectionHolder.onFailure(create::onError);
        return Multi.createFrom().deferred(() -> {
            Multi transformToUniAndConcatenate = amqpReceiver.toMulti().emitOn(runnable -> {
                VertxContext.runOnContext(connectionHolder.getContext().getDelegate(), runnable);
            }).onItem().transformToUniAndConcatenate(amqpMessage -> {
                try {
                    return Uni.createFrom().item((UniCreate) new AmqpMessage(amqpMessage, connectionHolder.getContext(), amqpFailureHandler, z, bool));
                } catch (Exception e) {
                    AMQPLogging.log.unableToCreateMessage(str2, e);
                    return Uni.createFrom().nullItem();
                }
            });
            if (bool.booleanValue()) {
                transformToUniAndConcatenate = transformToUniAndConcatenate.onItem().transform(message -> {
                    return this.amqpInstrumenter.traceIncoming(message, (AmqpMessage) message);
                });
            }
            return Multi.createBy().merging().streams(transformToUniAndConcatenate, create);
        });
    }

    @Override // io.smallrye.reactive.messaging.connector.InboundConnector
    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        AmqpConnectorIncomingConfiguration amqpConnectorIncomingConfiguration = new AmqpConnectorIncomingConfiguration(config);
        Optional<String> address = amqpConnectorIncomingConfiguration.getAddress();
        Objects.requireNonNull(amqpConnectorIncomingConfiguration);
        String orElseGet = address.orElseGet(amqpConnectorIncomingConfiguration::getChannel);
        this.opened.put(amqpConnectorIncomingConfiguration.getChannel(), false);
        boolean booleanValue = amqpConnectorIncomingConfiguration.getBroadcast().booleanValue();
        String channel = amqpConnectorIncomingConfiguration.getChannel();
        String orElse = amqpConnectorIncomingConfiguration.getLinkName().orElse(channel);
        boolean booleanValue2 = amqpConnectorIncomingConfiguration.getCloudEvents().booleanValue();
        boolean booleanValue3 = amqpConnectorIncomingConfiguration.getTracingEnabled().booleanValue();
        AmqpReceiverOptions selector = new AmqpReceiverOptions().setAutoAcknowledgement(amqpConnectorIncomingConfiguration.getAutoAcknowledgement().booleanValue()).setDurable(amqpConnectorIncomingConfiguration.getDurable().booleanValue()).setLinkName(orElse).setCapabilities(getClientCapabilities(amqpConnectorIncomingConfiguration)).setSelector(amqpConnectorIncomingConfiguration.getSelector().orElse(null));
        AmqpClient createClient = AmqpClientHelper.createClient(this, amqpConnectorIncomingConfiguration, this.clientOptions, this.clientSslContexts);
        Context context = null;
        if (ConcurrencyConnectorConfig.getConcurrency(config).filter(num -> {
            return num.intValue() > 1;
        }).isPresent()) {
            context = Context.newInstance(((VertxInternal) getVertx().mo3711getDelegate()).createEventLoopContext());
        }
        ConnectionHolder connectionHolder = new ConnectionHolder(createClient, amqpConnectorIncomingConfiguration, getVertx(), context);
        this.holders.put(amqpConnectorIncomingConfiguration.getChannel(), connectionHolder);
        AmqpFailureHandler createFailureHandler = createFailureHandler(amqpConnectorIncomingConfiguration);
        if (booleanValue3 && this.amqpInstrumenter == null) {
            this.amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector();
        }
        Multi transformToMulti = connectionHolder.getOrEstablishConnection().onItem().transformToUni(amqpConnection -> {
            return amqpConnection.createReceiver(orElseGet, selector);
        }).onItem().invoke(amqpReceiver -> {
            this.opened.put(channel, true);
        }).onItem().transformToMulti(amqpReceiver2 -> {
            return getStreamOfMessages(amqpReceiver2, connectionHolder, orElseGet, channel, createFailureHandler, booleanValue2, Boolean.valueOf(booleanValue3));
        });
        Integer reconnectInterval = amqpConnectorIncomingConfiguration.getReconnectInterval();
        Integer reconnectAttempts = amqpConnectorIncomingConfiguration.getReconnectAttempts();
        MultiOnFailure onFailure = transformToMulti.onFailure();
        AMQPLogging aMQPLogging = AMQPLogging.log;
        Objects.requireNonNull(aMQPLogging);
        Multi invoke = onFailure.invoke(aMQPLogging::retrieveMessagesRetrying).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(reconnectInterval.intValue())).atMost(reconnectAttempts.intValue()).onFailure().invoke(th -> {
            this.opened.put(channel, false);
            AMQPLogging.log.retrieveMessagesNoMoreRetrying(th);
        });
        if (booleanValue) {
            invoke = invoke.broadcast().toAllSubscribers();
        }
        return invoke;
    }

    @Override // io.smallrye.reactive.messaging.connector.OutboundConnector
    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        AmqpConnectorOutgoingConfiguration amqpConnectorOutgoingConfiguration = new AmqpConnectorOutgoingConfiguration(config);
        Optional<String> address = amqpConnectorOutgoingConfiguration.getAddress();
        Objects.requireNonNull(amqpConnectorOutgoingConfiguration);
        String orElseGet = address.orElseGet(amqpConnectorOutgoingConfiguration::getChannel);
        this.opened.put(amqpConnectorOutgoingConfiguration.getChannel(), false);
        AtomicReference atomicReference = new AtomicReference();
        AmqpClient createClient = AmqpClientHelper.createClient(this, amqpConnectorOutgoingConfiguration, this.clientOptions, this.clientSslContexts);
        Optional<String> linkName = amqpConnectorOutgoingConfiguration.getLinkName();
        Objects.requireNonNull(amqpConnectorOutgoingConfiguration);
        String orElseGet2 = linkName.orElseGet(amqpConnectorOutgoingConfiguration::getChannel);
        ConnectionHolder connectionHolder = new ConnectionHolder(createClient, amqpConnectorOutgoingConfiguration, getVertx(), null);
        AmqpCreditBasedSender amqpCreditBasedSender = new AmqpCreditBasedSender(this, connectionHolder, amqpConnectorOutgoingConfiguration, Uni.createFrom().deferred(() -> {
            AmqpSender amqpSender = (AmqpSender) atomicReference.get();
            if (amqpSender != null && !amqpSender.connection().isDisconnected()) {
                if (isLinkOpen(amqpSender)) {
                    return Uni.createFrom().item((UniCreate) amqpSender);
                }
                amqpSender.closeAndForget();
            }
            return connectionHolder.getOrEstablishConnection().onItem().transformToUni(amqpConnection -> {
                return amqpConnectorOutgoingConfiguration.getUseAnonymousSender().orElseGet(() -> {
                    return Boolean.valueOf(ConnectionHolder.supportAnonymousRelay(amqpConnection));
                }).booleanValue() ? amqpConnection.createAnonymousSender() : amqpConnection.createSender(orElseGet, new AmqpSenderOptions().setLinkName(orElseGet2).setCapabilities(getClientCapabilities(amqpConnectorOutgoingConfiguration)));
            }).onItem().invoke(amqpSender2 -> {
                AmqpSender amqpSender2 = (AmqpSender) atomicReference.getAndSet(amqpSender2);
                if (amqpSender2 != null) {
                    amqpSender2.closeAndForget();
                }
                this.opened.put(amqpConnectorOutgoingConfiguration.getChannel(), true);
            });
        }).onFailure().invoke(th -> {
            atomicReference.set(null);
            this.opened.put(amqpConnectorOutgoingConfiguration.getChannel(), false);
        }).onCancellation().invoke(() -> {
            atomicReference.set(null);
            this.opened.put(amqpConnectorOutgoingConfiguration.getChannel(), false);
        }));
        this.processors.put(amqpConnectorOutgoingConfiguration.getChannel(), amqpCreditBasedSender);
        return MultiUtils.via(amqpCreditBasedSender, multi -> {
            return multi.onFailure().invoke(th2 -> {
                AMQPLogging.log.failureReported(amqpConnectorOutgoingConfiguration.getChannel(), th2);
                this.opened.put(amqpConnectorOutgoingConfiguration.getChannel(), false);
            });
        });
    }

    private boolean isLinkOpen(AmqpSender amqpSender) {
        ProtonSender unwrap = amqpSender.mo3711getDelegate().unwrap();
        if (unwrap == null) {
            return false;
        }
        return unwrap.isOpen();
    }

    public List<String> getClientCapabilities(AmqpConnectorCommonConfiguration amqpConnectorCommonConfiguration) {
        return amqpConnectorCommonConfiguration.getCapabilities().isPresent() ? (List) Arrays.stream(amqpConnectorCommonConfiguration.getCapabilities().get().split(",")).map((v0) -> {
            return v0.trim();
        }).filter(str -> {
            return !str.isEmpty();
        }).collect(Collectors.toList()) : Collections.emptyList();
    }

    public void terminate(@Priority(50) @Observes(notifyObserver = Reception.IF_EXISTS) @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        this.processors.values().forEach((v0) -> {
            v0.cancel();
        });
        this.clients.forEach((v0) -> {
            v0.closeAndForget();
        });
        this.clients.clear();
    }

    public Vertx getVertx() {
        return this.executionHolder.vertx();
    }

    public void addClient(AmqpClient amqpClient) {
        this.clients.add(amqpClient);
    }

    private AmqpFailureHandler createFailureHandler(AmqpConnectorIncomingConfiguration amqpConnectorIncomingConfiguration) {
        String failureStrategy = amqpConnectorIncomingConfiguration.getFailureStrategy();
        switch (AmqpFailureHandler.Strategy.from(failureStrategy)) {
            case FAIL:
                return new AmqpFailStop(this, amqpConnectorIncomingConfiguration.getChannel());
            case ACCEPT:
                return new AmqpAccept(amqpConnectorIncomingConfiguration.getChannel());
            case REJECT:
                return new AmqpReject(amqpConnectorIncomingConfiguration.getChannel());
            case RELEASE:
                return new AmqpRelease(amqpConnectorIncomingConfiguration.getChannel());
            case MODIFIED_FAILED:
                return new AmqpModifiedFailed(amqpConnectorIncomingConfiguration.getChannel());
            case MODIFIED_FAILED_UNDELIVERABLE_HERE:
                return new AmqpModifiedFailedAndUndeliverableHere(amqpConnectorIncomingConfiguration.getChannel());
            default:
                throw AMQPExceptions.ex.illegalArgumentInvalidFailureStrategy(failureStrategy);
        }
    }

    public List<AmqpClient> getClients() {
        return this.clients;
    }

    @Override // io.smallrye.reactive.messaging.health.HealthReporter
    public HealthReport getReadiness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (Map.Entry<String, ConnectionHolder> entry : this.holders.entrySet()) {
            try {
                builder.add(entry.getKey(), entry.getValue().isConnected().await().atMost(Duration.ofSeconds(entry.getValue().getHealthTimeout())).booleanValue());
            } catch (Exception e) {
                builder.add(entry.getKey(), false, e.getMessage());
            }
        }
        for (Map.Entry<String, AmqpCreditBasedSender> entry2 : this.processors.entrySet()) {
            try {
                builder.add(entry2.getKey(), entry2.getValue().isConnected().await().atMost(Duration.ofSeconds(entry2.getValue().getHealthTimeout())).booleanValue());
            } catch (Exception e2) {
                builder.add(entry2.getKey(), false, e2.getMessage());
            }
        }
        return builder.build();
    }

    @Override // io.smallrye.reactive.messaging.health.HealthReporter
    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        for (Map.Entry<String, Boolean> entry : this.opened.entrySet()) {
            builder.add(entry.getKey(), entry.getValue().booleanValue());
        }
        return builder.build();
    }

    @Override // io.smallrye.reactive.messaging.health.HealthReporter
    public HealthReport getStartup() {
        return getLiveness();
    }

    public void reportFailure(String str, Throwable th) {
        AMQPLogging.log.failureReported(str, th);
        this.opened.put(str, false);
        terminate(null);
    }
}
