package io.smallrye.reactive.messaging.kafka;

import com.github.benmanes.caffeine.cache.LocalCacheFactory;
import io.netty.handler.codec.http.HttpHeaders;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Multi;
import io.smallrye.openapi.runtime.io.schema.SchemaConstant;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSink;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.mutiny.core.Vertx;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.resource.spi.work.WorkException;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Connector(KafkaConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "bootstrap.servers", alias = "kafka.bootstrap.servers", type = "string", defaultValue = "localhost:9092", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster."), @ConnectorAttribute(name = SemanticAttributes.MessagingDestinationKindValues.TOPIC, type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The consumed / populated Kafka topic. If neither this property nor the `topics` properties are set, the channel name is used"), @ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true"), @ConnectorAttribute(name = "health-readiness-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether readiness health reporting is enabled (default) or disabled", defaultValue = "true"), @ConnectorAttribute(name = "health-readiness-topic-verification", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.", deprecated = true), @ConnectorAttribute(name = "health-readiness-timeout", type = "long", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.", deprecated = true), @ConnectorAttribute(name = "health-topic-verification-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.", defaultValue = BooleanUtils.FALSE), @ConnectorAttribute(name = "health-topic-verification-timeout", type = "long", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.", defaultValue = "2000"), @ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true"), @ConnectorAttribute(name = "cloud-events", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.", defaultValue = "true"), @ConnectorAttribute(name = ConfigHelper.KAFKA_CONFIGURATION_NAME_ATTRIBUTE, type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier."), @ConnectorAttribute(name = "topics", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "A comma-separating list of topics to be consumed. Cannot be used with the `topic` or `pattern` properties"), @ConnectorAttribute(name = SchemaConstant.PROP_PATTERN, type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Indicate that the `topic` property is a regular expression. Must be used with the `topic` property. Cannot be used with the `topics` property", defaultValue = BooleanUtils.FALSE), @ConnectorAttribute(name = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The deserializer classname used to deserialize the record's key", defaultValue = "org.apache.kafka.common.serialization.StringDeserializer"), @ConnectorAttribute(name = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The deserializer classname used to deserialize the record's value", mandatory = true), @ConnectorAttribute(name = ConsumerConfig.FETCH_MIN_BYTES_CONFIG, type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.", defaultValue = "1"), @ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "A unique string that identifies the consumer group the application belongs to. If not set, a unique, generated id is used"), @ConnectorAttribute(name = ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "If enabled, consumer's offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit.", defaultValue = BooleanUtils.FALSE), @ConnectorAttribute(name = "retry", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether or not the connection to the broker is re-attempted in case of failure", defaultValue = "true"), @ConnectorAttribute(name = "retry-attempts", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum number of reconnection before failing. -1 means infinite retry", defaultValue = WorkException.INTERNAL), @ConnectorAttribute(name = "retry-max-wait", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The max delay (in seconds) between 2 reconnects", defaultValue = "30"), @ConnectorAttribute(name = ConnectorConfig.BROADCAST_PROPERTY, type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the Kafka records should be dispatched to multiple consumer", defaultValue = BooleanUtils.FALSE), @ConnectorAttribute(name = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none", defaultValue = "latest"), @ConnectorAttribute(name = "failure-strategy", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `fail` (default), `ignore`, or `dead-letter-queue`", defaultValue = "fail"), @ConnectorAttribute(name = "commit-strategy", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be `latest`, `ignore` or `throttled`. If `enable.auto.commit` is true then the default is `ignore` otherwise it is `throttled`"), @ConnectorAttribute(name = "throttled.unprocessed-record-max-age.ms", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "While using the `throttled` commit-strategy, specify the max age in milliseconds that an unprocessed message can be before the connector is marked as unhealthy. Setting this attribute to 0 disables this monitoring.", defaultValue = "60000"), @ConnectorAttribute(name = "dead-letter-queue.topic", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates on which topic the record is sent. Defaults is `dead-letter-topic-$channel`"), @ConnectorAttribute(name = "dead-letter-queue.key.serializer", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates the key serializer to use. If not set the serializer associated to the key deserializer is used"), @ConnectorAttribute(name = "dead-letter-queue.value.serializer", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates the value serializer to use. If not set the serializer associated to the value deserializer is used"), @ConnectorAttribute(name = "partitions", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic", defaultValue = "1"), @ConnectorAttribute(name = "consumer-rebalance-listener.name", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set, this rebalance listener is applied to the consumer."), @ConnectorAttribute(name = "key-deserialization-failure-handler", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler`. If set, deserialization failure happening when deserializing keys are delegated to this handler which may provide a fallback value."), @ConnectorAttribute(name = "value-deserialization-failure-handler", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler`. If set, deserialization failure happening when deserializing values are delegated to this handler which may provide a fallback value."), @ConnectorAttribute(name = "fail-on-deserialization-failure", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "When no deserialization failure handler is set and a deserialization failure happens, report the failure and mark the application as unhealthy. If set to `false` and a deserialization failure happens, a `null` value is forwarded.", defaultValue = "true"), @ConnectorAttribute(name = "graceful-shutdown", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether or not a graceful shutdown should be attempted when the application terminates.", defaultValue = "true"), @ConnectorAttribute(name = "poll-timeout", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The polling timeout in milliseconds. When polling records, the poll will wait at most that duration before returning records. Default is 1000ms", defaultValue = "1000"), @ConnectorAttribute(name = "pause-if-no-requests", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue = "true"), @ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the Kafka records are consumed in batch. The channel injection point must consume a compatible type, such as `List<Payload>` or `KafkaRecordBatch<Payload>`.", defaultValue = BooleanUtils.FALSE), @ConnectorAttribute(name = "max-queue-size-factor", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "Multiplier factor to determine maximum number of records queued for processing, using `max.poll.records` * `max-queue-size-factor`. Defaults to 2. In `batch` mode `max.poll.records` is considered `1`.", defaultValue = "2"), @ConnectorAttribute(name = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The serializer classname used to serialize the record's key", defaultValue = "org.apache.kafka.common.serialization.StringSerializer"), @ConnectorAttribute(name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The serializer classname used to serialize the payload", mandatory = true), @ConnectorAttribute(name = ProducerConfig.ACKS_CONFIG, type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all", defaultValue = "1"), @ConnectorAttribute(name = ProducerConfig.BUFFER_MEMORY_CONFIG, type = "long", direction = ConnectorAttribute.Direction.OUTGOING, description = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server.", defaultValue = "33554432"), @ConnectorAttribute(name = "retries", type = "long", direction = ConnectorAttribute.Direction.OUTGOING, description = "If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by `delivery.timeout.ms`.", defaultValue = "2147483647"), @ConnectorAttribute(name = LocalCacheFactory.KEY, type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "A key to used when writing the record"), @ConnectorAttribute(name = "partition", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "The target partition id. -1 to let the client determine the partition", defaultValue = WorkException.INTERNAL), @ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether the client waits for Kafka to acknowledge the written record before acknowledging the message", defaultValue = "true"), @ConnectorAttribute(name = "max-inflight-messages", type = "long", direction = ConnectorAttribute.Direction.OUTGOING, description = "The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to `0` remove the limit", defaultValue = "1024"), @ConnectorAttribute(name = "cloud-events-source", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `source` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `source` attribute itself", alias = "cloud-events-default-source"), @ConnectorAttribute(name = "cloud-events-type", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `type` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `type` attribute itself", alias = "cloud-events-default-type"), @ConnectorAttribute(name = "cloud-events-subject", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `subject` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `subject` attribute itself", alias = "cloud-events-default-subject"), @ConnectorAttribute(name = "cloud-events-data-content-type", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `datacontenttype` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `datacontenttype` attribute itself", alias = "cloud-events-default-data-content-type"), @ConnectorAttribute(name = "cloud-events-data-schema", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `dataschema` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `dataschema` attribute itself", alias = "cloud-events-default-data-schema"), @ConnectorAttribute(name = "cloud-events-insert-timestamp", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether or not the connector should insert automatically the `time` attribute` into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself", alias = "cloud-events-default-timestamp", defaultValue = "true"), @ConnectorAttribute(name = "cloud-events-mode", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record", defaultValue = HttpHeaders.Values.BINARY), @ConnectorAttribute(name = "close-timeout", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "The amount of milliseconds waiting for a graceful shutdown of the Kafka producer", defaultValue = "10000"), @ConnectorAttribute(name = ConnectorConfig.MERGE_PROPERTY, direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = BooleanUtils.FALSE), @ConnectorAttribute(name = "propagate-record-key", direction = ConnectorAttribute.Direction.OUTGOING, description = "Propagate incoming record key to the outgoing record", type = "boolean", defaultValue = BooleanUtils.FALSE)})
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/KafkaConnector.class */
public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, HealthReporter {
    public static final String CONNECTOR_NAME = "smallrye-kafka";
    public static Tracer TRACER;

    @Inject
    ExecutionHolder executionHolder;

    @Inject
    @Any
    Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;

    @Inject
    @Any
    Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers;

    @Inject
    KafkaCDIEvents kafkaCDIEvents;
    private final List<KafkaSource<?, ?>> sources = new CopyOnWriteArrayList();
    private final List<KafkaSink> sinks = new CopyOnWriteArrayList();

    @Inject
    @Any
    Instance<Map<String, Object>> configurations;
    private Vertx vertx;

    public void terminate(@Observes(notifyObserver = Reception.IF_EXISTS) @BeforeDestroyed(ApplicationScoped.class) @Priority(50) Object obj) {
        this.sources.forEach((v0) -> {
            v0.closeQuietly();
        });
        this.sinks.forEach((v0) -> {
            v0.closeQuietly();
        });
        KafkaThrottledLatestProcessedCommit.clearCache();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @PostConstruct
    public void init() {
        this.vertx = this.executionHolder.vertx();
        TRACER = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging.kafka");
    }

    @Override // org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory
    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration = new KafkaConnectorIncomingConfiguration(ConfigHelper.retrieveChannelConfiguration(this.configurations, config));
        if (kafkaConnectorIncomingConfiguration.getHealthReadinessTopicVerification().isPresent()) {
            KafkaLogging.log.deprecatedConfig("health-readiness-topic-verification", "health-topic-verification-enabled");
        }
        if (kafkaConnectorIncomingConfiguration.getHealthReadinessTimeout().isPresent()) {
            KafkaLogging.log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
        }
        int intValue = kafkaConnectorIncomingConfiguration.getPartitions().intValue();
        if (intValue <= 0) {
            throw new IllegalArgumentException("`partitions` must be greater than 0");
        }
        String orElseGet = kafkaConnectorIncomingConfiguration.getGroupId().orElseGet(() -> {
            String uuid = UUID.randomUUID().toString();
            KafkaLogging.log.noGroupId(uuid);
            return uuid;
        });
        if (intValue == 1) {
            KafkaSource<?, ?> kafkaSource = new KafkaSource<>(this.vertx, orElseGet, kafkaConnectorIncomingConfiguration, this.consumerRebalanceListeners, this.kafkaCDIEvents, this.deserializationFailureHandlers, -1);
            this.sources.add(kafkaSource);
            boolean booleanValue = kafkaConnectorIncomingConfiguration.getBroadcast().booleanValue();
            Multi<IncomingKafkaRecord<?, ?>> stream = !kafkaConnectorIncomingConfiguration.getBatch().booleanValue() ? kafkaSource.getStream() : kafkaSource.getBatchStream();
            return booleanValue ? ReactiveStreams.fromPublisher(stream.broadcast().toAllSubscribers()) : ReactiveStreams.fromPublisher(stream);
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < intValue; i++) {
            KafkaSource<?, ?> kafkaSource2 = new KafkaSource<>(this.vertx, orElseGet, kafkaConnectorIncomingConfiguration, this.consumerRebalanceListeners, this.kafkaCDIEvents, this.deserializationFailureHandlers, i);
            this.sources.add(kafkaSource2);
            if (kafkaConnectorIncomingConfiguration.getBatch().booleanValue()) {
                arrayList.add(kafkaSource2.getBatchStream());
            } else {
                arrayList.add(kafkaSource2.getStream());
            }
        }
        Multi streams = Multi.createBy().merging().streams((Publisher[]) arrayList.toArray(new Publisher[0]));
        return kafkaConnectorIncomingConfiguration.getBroadcast().booleanValue() ? ReactiveStreams.fromPublisher(streams.broadcast().toAllSubscribers()) : ReactiveStreams.fromPublisher(streams);
    }

    @Override // org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory
    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration = new KafkaConnectorOutgoingConfiguration(ConfigHelper.retrieveChannelConfiguration(this.configurations, config));
        if (kafkaConnectorOutgoingConfiguration.getHealthReadinessTopicVerification().isPresent()) {
            KafkaLogging.log.deprecatedConfig("health-readiness-topic-verification", "health-topic-verification-enabled");
        }
        if (kafkaConnectorOutgoingConfiguration.getHealthReadinessTimeout().isPresent()) {
            KafkaLogging.log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
        }
        KafkaSink kafkaSink = new KafkaSink(kafkaConnectorOutgoingConfiguration, this.kafkaCDIEvents);
        this.sinks.add(kafkaSink);
        return kafkaSink.getSink();
    }

    @Override // io.smallrye.reactive.messaging.health.HealthReporter
    public HealthReport getStartup() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        Iterator<KafkaSource<?, ?>> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().isStarted(builder);
        }
        Iterator<KafkaSink> it2 = this.sinks.iterator();
        while (it2.hasNext()) {
            it2.next().isStarted(builder);
        }
        return builder.build();
    }

    @Override // io.smallrye.reactive.messaging.health.HealthReporter
    public HealthReport getReadiness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        Iterator<KafkaSource<?, ?>> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().isReady(builder);
        }
        Iterator<KafkaSink> it2 = this.sinks.iterator();
        while (it2.hasNext()) {
            it2.next().isReady(builder);
        }
        return builder.build();
    }

    @Override // io.smallrye.reactive.messaging.health.HealthReporter
    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        Iterator<KafkaSource<?, ?>> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().isAlive(builder);
        }
        Iterator<KafkaSink> it2 = this.sinks.iterator();
        while (it2.hasNext()) {
            it2.next().isAlive(builder);
        }
        return builder.build();
    }

    public <K, V> KafkaConsumer<K, V> getConsumer(String str) {
        return (KafkaConsumer) this.sources.stream().filter(kafkaSource -> {
            return kafkaSource.getChannel().equals(str);
        }).map((v0) -> {
            return v0.getConsumer();
        }).findFirst().orElse(null);
    }

    public <K, V> KafkaProducer<K, V> getProducer(String str) {
        return (KafkaProducer) this.sinks.stream().filter(kafkaSink -> {
            return kafkaSink.getChannel().equals(str);
        }).map((v0) -> {
            return v0.getProducer();
        }).findFirst().orElse(null);
    }

    public Set<String> getConsumerChannels() {
        return (Set) this.sources.stream().map((v0) -> {
            return v0.getChannel();
        }).collect(Collectors.toSet());
    }

    public Set<String> getProducerChannels() {
        return (Set) this.sinks.stream().map((v0) -> {
            return v0.getChannel();
        }).collect(Collectors.toSet());
    }
}
