package org.kie.server.services.jbpm.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jbpm.services.api.DeploymentEvent;
import org.jbpm.services.api.model.DeployedAsset;
import org.jbpm.services.api.model.MessageDesc;
import org.jbpm.services.api.model.ProcessDefinition;
import org.jbpm.services.api.model.SignalDesc;
import org.jbpm.services.api.model.SignalDescBase;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kie-server-services-kafka-7.60.0-SNAPSHOT.jar:org/kie/server/services/jbpm/kafka/KafkaServerRegistration.class */
public class KafkaServerRegistration {
    private Map<String, Map<SignalDesc, Collection<String>>> topic2Signal = new HashMap();
    private Map<String, Map<MessageDesc, Collection<String>>> topic2Message = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close() {
        this.topic2Signal.clear();
        this.topic2Message.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isEmpty() {
        return this.topic2Signal.isEmpty() && this.topic2Message.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Set<String> addRegistration(DeploymentEvent deploymentEvent) {
        Iterator<DeployedAsset> it = deploymentEvent.getDeployedUnit().getDeployedAssets().iterator();
        while (it.hasNext()) {
            updateTopics(deploymentEvent.getDeploymentId(), (ProcessDefinition) it.next());
        }
        return getTopicsRegistered();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Set<String> removeRegistration(DeploymentEvent deploymentEvent, Consumer<String> consumer) {
        Iterator<DeployedAsset> it = deploymentEvent.getDeployedUnit().getDeployedAssets().iterator();
        while (it.hasNext()) {
            removeTopics(deploymentEvent.getDeploymentId(), (ProcessDefinition) it.next(), consumer);
        }
        return getTopicsRegistered();
    }

    private Set<String> getTopicsRegistered() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.topic2Signal.keySet());
        hashSet.addAll(this.topic2Message.keySet());
        return hashSet;
    }

    private void updateTopics(String str, ProcessDefinition processDefinition) {
        if (KafkaServerUtils.processSignals()) {
            addTopics(this.topic2Signal, str, processDefinition.getSignalsDesc());
        }
        if (KafkaServerUtils.processMessages()) {
            addTopics(this.topic2Message, str, processDefinition.getMessagesDesc());
        }
    }

    private void removeTopics(String str, ProcessDefinition processDefinition, Consumer<String> consumer) {
        removeTopics(this.topic2Signal, str, processDefinition.getSignalsDesc(), consumer);
        removeTopics(this.topic2Message, str, processDefinition.getMessagesDesc(), consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void forEachSignal(ConsumerRecord<String, byte[]> consumerRecord, KafkaServerEventProcessor<SignalDesc> kafkaServerEventProcessor) {
        forEach(this.topic2Signal, consumerRecord, kafkaServerEventProcessor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void forEachMessage(ConsumerRecord<String, byte[]> consumerRecord, KafkaServerEventProcessor<MessageDesc> kafkaServerEventProcessor) {
        forEach(this.topic2Message, consumerRecord, kafkaServerEventProcessor);
    }

    private synchronized <T extends SignalDescBase> void forEach(Map<String, Map<T, Collection<String>>> map, ConsumerRecord<String, byte[]> consumerRecord, KafkaServerEventProcessor<T> kafkaServerEventProcessor) {
        Map<T, Collection<String>> map2 = map.get(consumerRecord.topic());
        if (map2 != null) {
            for (Map.Entry<T, Collection<String>> entry : map2.entrySet()) {
                T key = entry.getKey();
                Iterator<String> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    kafkaServerEventProcessor.accept(consumerRecord, it.next(), key);
                }
            }
        }
    }

    private <T extends SignalDescBase> void addTopics(Map<String, Map<T, Collection<String>>> map, String str, Collection<T> collection) {
        for (T t : collection) {
            if (!t.getIncomingNodes().isEmpty()) {
                map.computeIfAbsent(KafkaServerUtils.topicFromSignal(t), str2 -> {
                    return new HashMap();
                }).computeIfAbsent(t, signalDescBase -> {
                    return new ArrayList();
                }).add(str);
            }
        }
    }

    private <T extends SignalDescBase> void removeTopics(Map<String, Map<T, Collection<String>>> map, String str, Collection<T> collection, Consumer<String> consumer) {
        Collection<String> collection2;
        HashSet hashSet = new HashSet();
        for (T t : collection) {
            String str2 = KafkaServerUtils.topicFromSignal(t);
            Map<T, Collection<String>> map2 = map.get(str2);
            if (map2 != null && (collection2 = map2.get(t)) != null && collection2.remove(str)) {
                hashSet.add(str2);
                if (collection2.isEmpty()) {
                    map2.remove(t);
                    if (map2.isEmpty()) {
                        map.remove(str2);
                    }
                }
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            consumer.accept((String) it.next());
        }
    }
}
