package org.jboss.pnc.notification.dist;

import io.apicurio.registry.utils.IoUtil;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ConsumerContainer;
import io.apicurio.registry.utils.kafka.ConsumerSkipRecordsSerializationExceptionHandler;
import io.apicurio.registry.utils.kafka.Oneof2;
import io.apicurio.registry.utils.kafka.ProducerActions;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jboss.pnc.common.json.moduleconfig.SystemConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jboss/pnc/notification/dist/KafkaDistributedEventHandler.class */
public class KafkaDistributedEventHandler extends AbstractDistributedEventHandler {
    private static final Logger log = LoggerFactory.getLogger(KafkaDistributedEventHandler.class);
    private ProducerActions<String, String> producer;
    private ConsumerContainer.DynamicPool<String, String> consumer;
    private final SystemConfig config;
    private final String topic;

    public KafkaDistributedEventHandler(SystemConfig systemConfig) {
        this.config = (SystemConfig) Objects.requireNonNull(systemConfig);
        this.topic = (String) Objects.requireNonNull(systemConfig.getKafkaTopic());
    }

    @Override // org.jboss.pnc.notification.dist.DistributedEventHandler
    public void sendEvent(Object obj) {
        String message = toMessage(obj);
        log.debug("Send event {}", message);
        this.producer.apply(new ProducerRecord(this.topic, message));
    }

    @Override // org.jboss.pnc.notification.dist.DistributedEventHandler
    public void start() {
        log.debug("Starting configuration of KafkaDistributedEventHandler...");
        String kafkaProperties = this.config.getKafkaProperties();
        Properties readProperties = kafkaProperties != null ? SystemConfig.readProperties(kafkaProperties) : new Properties();
        StringSerializer stringSerializer = new StringSerializer();
        Properties forProducer = forProducer(this.config);
        forProducer.putAll(readProperties);
        log.debug("Producer properties: {}", obfuscateSensibleData(forProducer));
        this.producer = new AsyncProducer(forProducer, stringSerializer, stringSerializer);
        int kafkaNumOfConsumers = this.config.getKafkaNumOfConsumers();
        StringDeserializer stringDeserializer = new StringDeserializer();
        Properties forConsumer = forConsumer(this.config);
        forConsumer.putAll(readProperties);
        log.debug("Consumer properties: {}", obfuscateSensibleData(forConsumer));
        this.consumer = new ConsumerContainer.DynamicPool<>(forConsumer, stringDeserializer, stringDeserializer, this.topic, kafkaNumOfConsumers, Oneof2.first(this::consume), new ConsumerSkipRecordsSerializationExceptionHandler());
    }

    private void consume(ConsumerRecord<String, String> consumerRecord) {
        log.debug("Consume and send message {}", consumerRecord.value());
        sendMessage(consumerRecord.value());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        log.debug("Closing producer and consumer...");
        IoUtil.closeIgnore(this.producer);
        IoUtil.closeIgnore(this.consumer);
    }

    public Properties forConsumer(SystemConfig systemConfig) {
        String uuid = UUID.randomUUID().toString();
        Properties baseProperties = baseProperties(systemConfig);
        baseProperties.put("group.id", uuid);
        baseProperties.put("key.deserializer", StringDeserializer.class.getName());
        baseProperties.put("value.deserializer", StringDeserializer.class.getName());
        return baseProperties;
    }

    public Properties forProducer(SystemConfig systemConfig) {
        Properties baseProperties = baseProperties(systemConfig);
        baseProperties.put("retries", Integer.valueOf(systemConfig.getKafkaNumOfRetries()));
        baseProperties.put("retry.backoff.ms", Integer.valueOf(systemConfig.getKafkaRetryBackoffMillis()));
        if (systemConfig.getKafkaAcks() != null) {
            baseProperties.put("acks", systemConfig.getKafkaAcks());
        }
        baseProperties.put("key.serializer", StringSerializer.class.getName());
        baseProperties.put("value.serializer", StringSerializer.class.getName());
        return baseProperties;
    }

    private Properties baseProperties(SystemConfig systemConfig) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", (String) Objects.requireNonNull(systemConfig.getKafkaBootstrapServers()));
        properties.putAll(configureSecurity(systemConfig));
        return properties;
    }

    private Properties configureSecurity(SystemConfig systemConfig) {
        Properties properties = new Properties();
        if (systemConfig.getKafkaSecurityProtocol() == null) {
            return properties;
        }
        if (systemConfig.getKafkaSecurityProtocol() != null) {
            properties.put("security.protocol", SecurityProtocol.forName(systemConfig.getKafkaSecurityProtocol()).name);
        }
        if (isSasl(SecurityProtocol.forName(systemConfig.getKafkaSecurityProtocol()))) {
            if (systemConfig.getKafkaSecurityUser() != null && systemConfig.getKafkaSecurityPassword() != null) {
                properties.put("sasl.mechanism", systemConfig.getKafkaSecuritySaslMechanism());
                properties.put("sasl.jaas.config", String.format("%s required username='%s' password='%s';", getLoginModule(systemConfig.getKafkaSecuritySaslMechanism()).getName(), systemConfig.getKafkaSecurityUser(), systemConfig.getKafkaSecurityPassword()));
            } else if (systemConfig.getKafkaSecuritySaslJaasConf() != null) {
                properties.put("sasl.mechanism", systemConfig.getKafkaSecuritySaslMechanism());
                properties.put("sasl.jaas.config", systemConfig.getKafkaSecuritySaslJaasConf());
            }
        }
        return properties;
    }

    private boolean isSasl(SecurityProtocol securityProtocol) {
        return securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL;
    }

    private Class getLoginModule(String str) {
        String upperCase = str.toUpperCase();
        boolean z = -1;
        switch (upperCase.hashCode()) {
            case -1875511693:
                if (upperCase.equals("SCRAM-SHA-256")) {
                    z = true;
                    break;
                }
                break;
            case -1875508938:
                if (upperCase.equals("SCRAM-SHA-512")) {
                    z = 2;
                    break;
                }
                break;
            case 76210602:
                if (upperCase.equals("PLAIN")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return PlainLoginModule.class;
            case true:
            case true:
                return ScramLoginModule.class;
            default:
                throw new IllegalArgumentException("Unsupported SASL mechanism " + str);
        }
    }

    private Properties obfuscateSensibleData(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        if (properties2.containsKey("sasl.jaas.config")) {
            properties2.put("sasl.jaas.config", properties2.getProperty("sasl.jaas.config").replaceAll("(password)='[^&]+'", "$1='***'"));
        }
        return properties2;
    }
}
