package org.kie.server.services.jbpm.kafka;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kie-server-services-kafka-7.49.0-SNAPSHOT.jar:org/kie/server/services/jbpm/kafka/DefaultEventProcessorFactory.class */
public class DefaultEventProcessorFactory implements KafkaEventProcessorFactory {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DefaultEventProcessorFactory.class);
    private Map<String, Map<ClassLoader, KafkaEventReader>> topicReaderMap = new ConcurrentHashMap();
    private Map<ClassLoader, KafkaEventReader> readerInstanceMap = new ConcurrentHashMap();
    private Map<String, KafkaEventWriter> topicWriterMap = new ConcurrentHashMap();
    private Map<String, KafkaEventWriter> writerInstanceMap = new ConcurrentHashMap();
    private AtomicBoolean defaultWriterInitialized = new AtomicBoolean(false);
    private KafkaEventWriter defaultWriterInstance;

    @Override // org.kie.server.services.jbpm.kafka.KafkaEventProcessorFactory
    public KafkaEventReader getEventReader(String str, ClassLoader classLoader) {
        return this.topicReaderMap.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(classLoader, classLoader2 -> {
            return buildReader(str, classLoader);
        });
    }

    @Override // org.kie.server.services.jbpm.kafka.KafkaEventProcessorFactory
    public KafkaEventWriter getEventWriter(String str) {
        return this.topicWriterMap.computeIfAbsent(str, this::buildWriter);
    }

    private KafkaEventReader buildReader(String str, ClassLoader classLoader) {
        String topicProperty = KafkaServerUtils.getTopicProperty(str, "eventReaderClass");
        KafkaEventReader kafkaEventReader = null;
        if (topicProperty != null) {
            kafkaEventReader = this.readerInstanceMap.computeIfAbsent(classLoader, classLoader2 -> {
                return newReaderInstance(topicProperty, classLoader);
            });
        }
        return kafkaEventReader == null ? buildDefaultReader(classLoader) : kafkaEventReader;
    }

    private KafkaEventWriter buildWriter(String str) {
        String topicProperty = KafkaServerUtils.getTopicProperty(str, "eventWriterClass");
        KafkaEventWriter kafkaEventWriter = null;
        if (topicProperty != null) {
            kafkaEventWriter = this.writerInstanceMap.computeIfAbsent(topicProperty, this::newWriterInstance);
        }
        if (this.defaultWriterInitialized.compareAndSet(false, true)) {
            this.defaultWriterInstance = buildDefaultWriter();
        }
        return kafkaEventWriter == null ? this.defaultWriterInstance : kafkaEventWriter;
    }

    private KafkaEventReader newReaderInstance(String str, ClassLoader classLoader) {
        try {
            return (KafkaEventReader) Class.forName(str).asSubclass(KafkaEventReader.class).getConstructor(ClassLoader.class).newInstance(classLoader);
        } catch (ClassCastException | ReflectiveOperationException e) {
            logger.error("Error instantiating class {}", str, e);
            return null;
        }
    }

    private KafkaEventWriter newWriterInstance(String str) {
        try {
            return (KafkaEventWriter) Class.forName(str).asSubclass(KafkaEventWriter.class).getConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (ClassCastException | ReflectiveOperationException e) {
            logger.error("Error instantiating class {}", str, e);
            return null;
        }
    }

    protected KafkaEventReader buildDefaultReader(ClassLoader classLoader) {
        return new CloudEventReader(classLoader);
    }

    protected KafkaEventWriter buildDefaultWriter() {
        return new CloudEventWriter();
    }
}
