package org.kie.kogito.serverless.workflow.executor;

import io.cloudevents.CloudEvent;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventReceiverFactory;
import org.kie.kogito.serverless.workflow.utils.ConfigResolverHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/serverless/workflow/executor/KafkaEventReceiverFactory.class */
public class KafkaEventReceiverFactory implements EventReceiverFactory {
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventReceiverFactory.class);
    private Consumer<byte[], CloudEvent> consumer;
    private Thread consumerThread;
    private Map<String, String> trigger2Topic = KafkaPropertiesFactory.get().triggerToTopicMap("kogito.addon.messaging.incoming.trigger.");
    private Map<String, KafkaEventReceiver> receivers = new ConcurrentHashMap();
    private Lock consumerLock = new ReentrantLock();

    public EventReceiver apply(String str) {
        return this.receivers.computeIfAbsent(this.trigger2Topic.getOrDefault(str, str), str2 -> {
            return new KafkaEventReceiver();
        });
    }

    public void close() throws InterruptedException {
        this.receivers.clear();
        try {
            this.consumerLock.lock();
            boolean z = this.consumer != null;
            if (z) {
                this.consumer.close();
                this.consumer = null;
            }
            if (z) {
                this.consumerThread.join();
                this.consumerThread = null;
            }
        } finally {
            this.consumerLock.unlock();
        }
    }

    public void ready() {
        Set<String> keySet = this.receivers.keySet();
        if (keySet.isEmpty()) {
            return;
        }
        try {
            this.consumerLock.lock();
            boolean z = this.consumer == null;
            if (z) {
                this.consumer = createKafkaConsumer();
            }
            this.consumer.subscribe(keySet);
            if (z) {
                this.consumerThread = new Thread(this::eventLoop);
                this.consumerThread.start();
            }
        } finally {
            this.consumerLock.unlock();
        }
    }

    protected Consumer<byte[], CloudEvent> createKafkaConsumer() {
        return new KafkaConsumer(KafkaPropertiesFactory.get().getKafkaConsumerConfig());
    }

    private void eventLoop() {
        while (true) {
            int intValue = ((Integer) ConfigResolverHolder.getConfigResolver().getConfigProperty("kogito.sw.executor.event.pollInterval", Integer.TYPE).orElse(10)).intValue();
            try {
                this.consumerLock.lock();
                if (this.consumer == null) {
                    return;
                }
                ConsumerRecords<ConsumerRecord> poll = this.consumer.poll(Duration.ofSeconds(intValue));
                this.consumerLock.unlock();
                for (ConsumerRecord consumerRecord : poll) {
                    String str = consumerRecord.topic();
                    KafkaEventReceiver kafkaEventReceiver = this.receivers.get(str);
                    if (kafkaEventReceiver == null) {
                        logger.info("No subscription for topic {}", str);
                    } else {
                        kafkaEventReceiver.onEvent((CloudEvent) consumerRecord.value());
                    }
                }
                try {
                    this.consumerLock.lock();
                    if (this.consumer == null) {
                        return;
                    }
                    this.consumer.commitAsync();
                    this.consumerLock.unlock();
                } finally {
                }
            } finally {
                this.consumerLock.unlock();
            }
        }
    }
}
