/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.pnc.notification.dist;

import io.apicurio.registry.utils.IoUtil;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ConsumerContainer;
import io.apicurio.registry.utils.kafka.ConsumerSkipRecordsSerializationExceptionHandler;
import io.apicurio.registry.utils.kafka.Oneof2;
import io.apicurio.registry.utils.kafka.ProducerActions;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jboss.pnc.common.json.moduleconfig.SystemConfig;
import org.jboss.pnc.notification.dist.AbstractDistributedEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaDistributedEventHandler
extends AbstractDistributedEventHandler {
    private static final Logger log = LoggerFactory.getLogger(KafkaDistributedEventHandler.class);
    private ProducerActions<String, String> producer;
    private ConsumerContainer.DynamicPool<String, String> consumer;
    private final SystemConfig config;
    private final String topic;

    public KafkaDistributedEventHandler(SystemConfig config) {
        this.config = Objects.requireNonNull(config);
        this.topic = Objects.requireNonNull(config.getKafkaTopic());
    }

    @Override
    public void sendEvent(Object event) {
        String message = this.toMessage(event);
        log.debug("Send event {}", (Object)message);
        this.producer.apply((Object)new ProducerRecord(this.topic, (Object)message));
    }

    @Override
    public void start() {
        log.debug("Starting configuration of KafkaDistributedEventHandler...");
        String kafkaProperties = this.config.getKafkaProperties();
        Properties properties = kafkaProperties != null ? SystemConfig.readProperties((String)kafkaProperties) : new Properties();
        StringSerializer serializer = new StringSerializer();
        Properties producerProperties = this.forProducer(this.config);
        producerProperties.putAll((Map<?, ?>)properties);
        log.debug("Producer properties: {}", (Object)this.obfuscateSensibleData(producerProperties));
        this.producer = new AsyncProducer(producerProperties, (Serializer)serializer, (Serializer)serializer);
        int numOfConsumers = this.config.getKafkaNumOfConsumers();
        StringDeserializer deserializer = new StringDeserializer();
        Properties consumerProperties = this.forConsumer(this.config);
        consumerProperties.putAll((Map<?, ?>)properties);
        log.debug("Consumer properties: {}", (Object)this.obfuscateSensibleData(consumerProperties));
        this.consumer = new ConsumerContainer.DynamicPool(consumerProperties, (Deserializer)deserializer, (Deserializer)deserializer, this.topic, numOfConsumers, Oneof2.first(this::consume), (BiConsumer)new ConsumerSkipRecordsSerializationExceptionHandler());
    }

    private void consume(ConsumerRecord<String, String> cr) {
        log.debug("Consume and send message {}", cr.value());
        this.sendMessage(cr.value());
    }

    @Override
    public void close() {
        log.debug("Closing producer and consumer...");
        IoUtil.closeIgnore(this.producer);
        IoUtil.closeIgnore(this.consumer);
    }

    public Properties forConsumer(SystemConfig config) {
        String groupId = UUID.randomUUID().toString();
        Properties consumerProps = this.baseProperties(config);
        consumerProps.put("group.id", groupId);
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        return consumerProps;
    }

    public Properties forProducer(SystemConfig config) {
        Properties producerProps = this.baseProperties(config);
        producerProps.put("retries", (Object)config.getKafkaNumOfRetries());
        producerProps.put("retry.backoff.ms", (Object)config.getKafkaRetryBackoffMillis());
        if (config.getKafkaAcks() != null) {
            producerProps.put("acks", config.getKafkaAcks());
        }
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());
        return producerProps;
    }

    private Properties baseProperties(SystemConfig config) {
        Properties baseProperties = new Properties();
        String bootstrapServers = Objects.requireNonNull(config.getKafkaBootstrapServers());
        baseProperties.put("bootstrap.servers", bootstrapServers);
        baseProperties.putAll((Map<?, ?>)this.configureSecurity(config));
        return baseProperties;
    }

    private Properties configureSecurity(SystemConfig config) {
        Properties securityProperties = new Properties();
        if (config.getKafkaSecurityProtocol() == null) {
            return securityProperties;
        }
        if (config.getKafkaSecurityProtocol() != null) {
            securityProperties.put("security.protocol", SecurityProtocol.forName((String)config.getKafkaSecurityProtocol()).name);
        }
        if (this.isSasl(SecurityProtocol.forName((String)config.getKafkaSecurityProtocol()))) {
            if (config.getKafkaSecurityUser() != null && config.getKafkaSecurityPassword() != null) {
                securityProperties.put("sasl.mechanism", config.getKafkaSecuritySaslMechanism());
                securityProperties.put("sasl.jaas.config", String.format("%s required username='%s' password='%s';", this.getLoginModule(config.getKafkaSecuritySaslMechanism()).getName(), config.getKafkaSecurityUser(), config.getKafkaSecurityPassword()));
            } else if (config.getKafkaSecuritySaslJaasConf() != null) {
                securityProperties.put("sasl.mechanism", config.getKafkaSecuritySaslMechanism());
                securityProperties.put("sasl.jaas.config", config.getKafkaSecuritySaslJaasConf());
            }
        }
        return securityProperties;
    }

    private boolean isSasl(SecurityProtocol securityProtocol) {
        return securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL;
    }

    private Class getLoginModule(String saslMechanism) {
        switch (saslMechanism.toUpperCase()) {
            case "PLAIN": {
                return PlainLoginModule.class;
            }
            case "SCRAM-SHA-256": 
            case "SCRAM-SHA-512": {
                return ScramLoginModule.class;
            }
        }
        throw new IllegalArgumentException("Unsupported SASL mechanism " + saslMechanism);
    }

    private Properties obfuscateSensibleData(Properties properties) {
        Properties obfuscatedProps = new Properties();
        obfuscatedProps.putAll((Map<?, ?>)properties);
        if (obfuscatedProps.containsKey("sasl.jaas.config")) {
            String clearJaasConfig = obfuscatedProps.getProperty("sasl.jaas.config");
            String obfuscatedJaasConfig = clearJaasConfig.replaceAll("(password)='[^&]+'", "$1='***'");
            obfuscatedProps.put("sasl.jaas.config", obfuscatedJaasConfig);
        }
        return obfuscatedProps;
    }
}

