package org.kie.server.services.jbpm.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jbpm.services.api.DeploymentEvent;
import org.jbpm.services.api.DeploymentEventListener;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ListenerSupport;
import org.jbpm.services.api.ProcessService;
import org.kie.internal.runtime.manager.deploy.DeploymentDescriptorManager;
import org.kie.server.api.KieServerConstants;
import org.kie.server.services.api.KieContainerInstance;
import org.kie.server.services.api.KieServerExtension;
import org.kie.server.services.api.KieServerRegistry;
import org.kie.server.services.api.SupportedTransports;
import org.kie.server.services.impl.KieServerImpl;
import org.kie.server.services.jbpm.JbpmKieServerExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kie-server-services-kafka-7.58.0-SNAPSHOT.jar:org/kie/server/services/jbpm/kafka/KafkaServerExtension.class */
public class KafkaServerExtension implements KieServerExtension, DeploymentEventListener {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaServerExtension.class);
    public static final String EXTENSION_NAME = "Kafka";
    private static final String DEFAULT_HOST = "localhost:9092";
    private KafkaServerConsumer kafkaServerConsumer;
    private KafkaServerProducer kafkaServerProducer;
    private KafkaEventProcessorFactory factory;
    private ListenerSupport deploymentService;
    private AtomicBoolean initialized = new AtomicBoolean();
    private Map<String, Object> consumerProperties = new HashMap();
    private Map<String, Object> producerProperties = new HashMap();

    @Override // org.kie.server.services.api.KieServerExtension
    public boolean isInitialized() {
        return this.initialized.get();
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public boolean isActive() {
        return !Boolean.parseBoolean(System.getProperty(KieServerConstants.KIE_KAFKA_SERVER_EXT_DISABLED, "true"));
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void init(KieServerImpl kieServerImpl, KieServerRegistry kieServerRegistry) {
        if (this.initialized.get()) {
            logger.warn("Kafka extension already initialized");
            return;
        }
        initProperties();
        DeploymentDescriptorManager.addDescriptorLocation("classpath:/META-INF/kafka-deployment-descriptor-defaults.xml");
        KieServerExtension serverExtension = kieServerRegistry.getServerExtension(JbpmKieServerExtension.EXTENSION_NAME);
        if (serverExtension == null) {
            logger.warn("Extension jBPM is required");
            return;
        }
        ProcessService processService = null;
        for (Object obj : serverExtension.getServices()) {
            if (this.deploymentService == null && DeploymentService.class.isAssignableFrom(obj.getClass())) {
                this.deploymentService = (ListenerSupport) obj;
            } else if (processService == null && ProcessService.class.isAssignableFrom(obj.getClass())) {
                processService = (ProcessService) obj;
            }
            if (this.deploymentService != null && processService != null) {
                break;
            }
        }
        if (this.deploymentService == null) {
            throw new IllegalStateException("Cannot find deployment service");
        }
        if (processService == null) {
            throw new IllegalStateException("Cannot find process service");
        }
        this.factory = KafkaServerUtils.buildEventProcessorFactory();
        this.kafkaServerConsumer = new KafkaServerConsumer(this.factory, this::getKafkaConsumer, processService);
        this.kafkaServerProducer = new KafkaServerProducer(this.factory, this::getKafkaProducer);
        this.deploymentService.addListener(this);
        this.initialized.set(true);
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void destroy(KieServerImpl kieServerImpl, KieServerRegistry kieServerRegistry) {
        if (this.deploymentService != null) {
            this.deploymentService.removeListener(this);
        }
        Duration ofSeconds = Duration.ofSeconds(Long.getLong("org.kie.server.jbpm-kafka.ext.close.timeout", 30L).longValue());
        this.kafkaServerConsumer.close(ofSeconds);
        this.kafkaServerConsumer = null;
        this.kafkaServerProducer.close(ofSeconds);
        this.kafkaServerProducer = null;
        this.factory.close();
        this.factory = null;
        this.deploymentService = null;
        this.initialized.set(false);
        this.consumerProperties.clear();
        this.producerProperties.clear();
    }

    protected Consumer<String, byte[]> getKafkaConsumer() {
        this.consumerProperties.putIfAbsent("bootstrap.servers", DEFAULT_HOST);
        this.consumerProperties.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());
        this.consumerProperties.putIfAbsent("group.id", "jbpm-consumer");
        return new KafkaConsumer(this.consumerProperties, new StringDeserializer(), new ByteArrayDeserializer());
    }

    protected Producer<String, byte[]> getKafkaProducer() {
        this.producerProperties.putIfAbsent("bootstrap.servers", DEFAULT_HOST);
        this.producerProperties.putIfAbsent(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000L);
        return new KafkaProducer(this.producerProperties, new StringSerializer(), new ByteArraySerializer());
    }

    @Override // org.jbpm.services.api.DeploymentEventListener
    public void onDeploy(DeploymentEvent deploymentEvent) {
        this.kafkaServerProducer.activate(deploymentEvent);
        this.kafkaServerConsumer.addRegistration(deploymentEvent);
    }

    @Override // org.jbpm.services.api.DeploymentEventListener
    public void onUnDeploy(DeploymentEvent deploymentEvent) {
        this.kafkaServerProducer.deactivate(deploymentEvent);
        this.kafkaServerConsumer.removeRegistration(deploymentEvent);
    }

    @Override // org.jbpm.services.api.DeploymentEventListener
    public void onActivate(DeploymentEvent deploymentEvent) {
        this.kafkaServerConsumer.addRegistration(deploymentEvent);
    }

    @Override // org.jbpm.services.api.DeploymentEventListener
    public void onDeactivate(DeploymentEvent deploymentEvent) {
        this.kafkaServerConsumer.removeRegistration(deploymentEvent);
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public boolean isUpdateContainerAllowed(String str, KieContainerInstance kieContainerInstance, Map<String, Object> map) {
        return true;
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public String getImplementedCapability() {
        return KieServerConstants.CAPABILITY_BPM_KAFKA;
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public List<Object> getServices() {
        return Collections.emptyList();
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public String getExtensionName() {
        return EXTENSION_NAME;
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public Integer getStartOrder() {
        return 20;
    }

    public String toString() {
        return "Kafka KIE Server extension";
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void serverStarted() {
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void createContainer(String str, KieContainerInstance kieContainerInstance, Map<String, Object> map) {
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void prepareContainerUpdate(String str, KieContainerInstance kieContainerInstance, Map<String, Object> map) {
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void updateContainer(String str, KieContainerInstance kieContainerInstance, Map<String, Object> map) {
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public void disposeContainer(String str, KieContainerInstance kieContainerInstance, Map<String, Object> map) {
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public List<Object> getAppComponents(SupportedTransports supportedTransports) {
        return Collections.emptyList();
    }

    @Override // org.kie.server.services.api.KieServerExtension
    public <T> T getAppComponents(Class<T> cls) {
        return null;
    }

    protected final Map<String, Object> getConsumerProperties() {
        return Collections.unmodifiableMap(this.consumerProperties);
    }

    protected final Map<String, Object> getProducerProperties() {
        return Collections.unmodifiableMap(this.producerProperties);
    }

    protected final void initProperties() {
        for (Map.Entry entry : System.getProperties().entrySet()) {
            String obj = entry.getKey().toString();
            if (obj.startsWith("org.kie.server.jbpm-kafka.ext.")) {
                String substring = obj.substring("org.kie.server.jbpm-kafka.ext.".length());
                if (ConsumerConfig.configNames().contains(substring)) {
                    this.consumerProperties.put(substring, entry.getValue());
                }
                if (ProducerConfig.configNames().contains(substring)) {
                    this.producerProperties.put(substring, entry.getValue());
                }
            }
        }
    }
}
