package org.kie.server.services.jbpm.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.jbpm.bpmn2.core.Message;
import org.jbpm.bpmn2.core.Signal;
import org.jbpm.kie.services.impl.KModuleDeploymentUnit;
import org.jbpm.kie.services.impl.model.MessageDescImpl;
import org.jbpm.kie.services.impl.model.SignalDescImpl;
import org.jbpm.runtime.manager.impl.SimpleRegisterableItemsFactory;
import org.jbpm.services.api.DeploymentEvent;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ListenerSupport;
import org.jbpm.services.api.ProcessService;
import org.jbpm.services.api.model.DeployedUnit;
import org.jbpm.services.api.model.ProcessDefinition;
import org.jbpm.services.api.model.SignalDesc;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.definition.process.Node;
import org.kie.api.runtime.KieContainer;
import org.kie.internal.runtime.manager.InternalRegisterableItemsFactory;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.RuntimeEnvironment;
import org.kie.server.services.api.KieServerExtension;
import org.kie.server.services.api.KieServerRegistry;
import org.kie.server.services.impl.KieServerImpl;
import org.kie.server.services.jbpm.kafka.KafkaServerUtils;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/server/services/jbpm/kafka/KafkaServerExtensionConsumerTest.class */
public class KafkaServerExtensionConsumerTest {
    private static final long TIMEOUT = 1;
    private ProcessService processService;
    private MockKafkaServerExtension extension;
    private KieServerImpl server;
    private KieServerRegistry registry;
    private DeployedUnit deployedUnit;
    private ProcessDefinition processDefinition;
    private MockConsumer<String, byte[]> mockConsumer;
    private static Logger logger = LoggerFactory.getLogger(KafkaServerExtensionConsumerTest.class);
    private InternalRegisterableItemsFactory itemsFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kie/server/services/jbpm/kafka/KafkaServerExtensionConsumerTest$MockKafkaServerExtension.class */
    public static class MockKafkaServerExtension extends KafkaServerExtension {
        private MockConsumer<String, byte[]> consumer;

        public MockKafkaServerExtension(MockConsumer<String, byte[]> mockConsumer) {
            this.consumer = mockConsumer;
        }

        protected Consumer<String, byte[]> getKafkaConsumer() {
            return this.consumer;
        }

        public void setKafkaConsumer(MockConsumer<String, byte[]> mockConsumer) {
            this.consumer = mockConsumer;
        }
    }

    @Before
    public void setup() {
        System.setProperty("org.kie.server.jbpm-kafka.ext.poll.interval", Long.toString(TIMEOUT));
        System.setProperty("org.kie.server.jbpm-kafka.ext.signals.mapping", KafkaServerUtils.Mapping.AUTO.toString());
        this.mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        this.extension = new MockKafkaServerExtension(this.mockConsumer);
        this.server = (KieServerImpl) Mockito.mock(KieServerImpl.class);
        this.registry = (KieServerRegistry) Mockito.mock(KieServerRegistry.class);
        this.itemsFactory = new SimpleRegisterableItemsFactory();
        KieServerExtension kieServerExtension = (KieServerExtension) Mockito.mock(KieServerExtension.class);
        Mockito.when(this.registry.getServerExtension(Mockito.anyString())).thenReturn(kieServerExtension);
        ListenerSupport listenerSupport = (ListenerSupport) Mockito.mock(ListenerSupport.class, Mockito.withSettings().extraInterfaces(new Class[]{DeploymentService.class}));
        this.processService = (ProcessService) Mockito.mock(ProcessService.class);
        Mockito.when(kieServerExtension.getServices()).thenReturn(Arrays.asList(listenerSupport, this.processService));
        this.deployedUnit = (DeployedUnit) Mockito.mock(DeployedUnit.class);
        InternalRuntimeManager internalRuntimeManager = (InternalRuntimeManager) Mockito.mock(InternalRuntimeManager.class);
        Mockito.when(this.deployedUnit.getRuntimeManager()).thenReturn(internalRuntimeManager);
        RuntimeEnvironment runtimeEnvironment = (RuntimeEnvironment) Mockito.mock(RuntimeEnvironment.class);
        Mockito.when(internalRuntimeManager.getEnvironment()).thenReturn(runtimeEnvironment);
        Mockito.when(runtimeEnvironment.getRegisterableItemsFactory()).thenReturn(this.itemsFactory);
        this.processDefinition = (ProcessDefinition) Mockito.mock(ProcessDefinition.class);
        Mockito.when(this.deployedUnit.getDeployedAssets()).thenReturn(Collections.singletonList(this.processDefinition));
        this.extension.init(this.server, this.registry);
        KModuleDeploymentUnit kModuleDeploymentUnit = (KModuleDeploymentUnit) Mockito.mock(KModuleDeploymentUnit.class);
        Mockito.when(this.deployedUnit.getDeploymentUnit()).thenReturn(kModuleDeploymentUnit);
        KieContainer kieContainer = (KieContainer) Mockito.mock(KieContainer.class);
        Mockito.when(kModuleDeploymentUnit.getKieContainer()).thenReturn(kieContainer);
        Mockito.when(kieContainer.getClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
        this.extension.serverStarted();
    }

    @After
    public void close() {
        this.extension.destroy(this.server, this.registry);
        System.clearProperty("org.kie.server.jbpm-kafka.ext.signals.mapping");
        System.clearProperty("org.kie.server.jbpm-kafka.ext.message.mapping");
    }

    private SignalDesc createSignal(String str, String str2) {
        Signal signal = new Signal(str, str, str2);
        signal.addIncomingNode((Node) Mockito.mock(Node.class));
        return SignalDescImpl.from(signal);
    }

    @Test
    public void testKafkaServerExecutorSignal() {
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("MySignal", "String")));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        publishEvent("MySignal", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"javierito\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy1", "MySignal", "javierito");
    }

    private void testStructRefEvent(String str) {
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("MySignal", str)));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        publishEvent("MySignal", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":{\"name\":\"javierito\"}}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy1", "MySignal", new Person("javierito"));
    }

    @Test
    public void testKafkaServerExecutorSignalWithClassType() {
        testStructRefEvent(Person.class.getTypeName());
    }

    @Test
    public void testKafkaServerExecutorSignalWithClassName() {
        testStructRefEvent(Person.class.getName());
    }

    @Test
    public void testKafkaSubscriptionChange() {
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("MySignal", "String")));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        publishEvent("MySignal", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"javierito\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy1", "MySignal", "javierito");
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("ChangedSignal", "String")));
        this.extension.onActivate(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        publishEvent("ChangedSignal", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"javierito\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy1", "ChangedSignal", "javierito");
    }

    @Test
    public void testKafkaSubscriptionEmpty() {
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("MySignal", "String")));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        publishEvent("MySignal", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"javierito\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy1", "MySignal", "javierito");
        this.extension.onUnDeploy(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.emptyList());
        this.extension.onActivate(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        Assert.assertTrue(this.mockConsumer.assignment().isEmpty());
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("NewSignal", "String")));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy1", this.deployedUnit));
        publishEvent("NewSignal", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"javierito\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy1", "NewSignal", "javierito");
    }

    @Test
    public void testKafkaServerExecutorMessage() {
        Message message = new Message("MyMessage");
        message.setName("Hello");
        message.setType("String");
        message.addIncomingNode((Node) Mockito.mock(Node.class));
        Mockito.when(this.processDefinition.getMessagesDesc()).thenReturn(Collections.singletonList(MessageDescImpl.from(message)));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy2", this.deployedUnit));
        publishEvent("Hello", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"pepe\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy2", "Message-Hello", "pepe");
    }

    @Test
    public void testKafkaServerExecutorMessageTopic() {
        System.setProperty("org.kie.server.jbpm-kafka.ext.topics.Hello", "MyTopic");
        try {
            Message message = new Message("MyMessage");
            message.setName("Hello");
            message.setType("String");
            message.addIncomingNode((Node) Mockito.mock(Node.class));
            Mockito.when(this.processDefinition.getMessagesDesc()).thenReturn(Collections.singletonList(MessageDescImpl.from(message)));
            this.extension.onDeploy(new DeploymentEvent("MyDeploy3", this.deployedUnit));
            publishEvent("MyTopic", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"pepe\"}");
            ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy3", "Message-Hello", "pepe");
            System.clearProperty("org.kie.server.jbpm-kafka.ext.topics.Hello");
        } catch (Throwable th) {
            System.clearProperty("org.kie.server.jbpm-kafka.ext.topics.Hello");
            throw th;
        }
    }

    @Test
    public void testNoSignals() {
        this.extension = (MockKafkaServerExtension) Mockito.spy(this.extension);
        Mockito.when(this.processDefinition.getMessagesDesc()).thenReturn(Collections.emptyList());
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.emptyList());
        this.extension.onDeploy(new DeploymentEvent("EmptyDeploy", this.deployedUnit));
        ((MockKafkaServerExtension) Mockito.verify(this.extension, Mockito.never())).getKafkaConsumer();
    }

    @Test
    public void testWithDestroy() {
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("MySignal2", "String")));
        Message message = new Message("MyMessage");
        message.setName("Hello2");
        message.setType("String");
        message.addIncomingNode((Node) Mockito.mock(Node.class));
        this.extension.onDeploy(new DeploymentEvent("MyDeploy4", this.deployedUnit));
        publishEvent("MySignal2", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"javierito\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy4", "MySignal2", "javierito");
        this.extension.destroy(this.server, this.registry);
        this.mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.emptyList());
        Mockito.when(this.processDefinition.getMessagesDesc()).thenReturn(Collections.singletonList(MessageDescImpl.from(message)));
        this.extension.setKafkaConsumer(this.mockConsumer);
        this.extension.init(this.server, this.registry);
        this.extension.onDeploy(new DeploymentEvent("MyDeploy5", this.deployedUnit));
        publishEvent("Hello2", "{\"id\":\"javi\",\"type\":\"one\",\"source\":\"pepe\",\"data\":\"pepe\"}");
        ((ProcessService) Mockito.verify(this.processService, getTimeout())).signalEvent("MyDeploy5", "Message-Hello2", "pepe");
    }

    @Test
    public void testSignalDisable() {
        Mockito.when(this.processDefinition.getSignalsDesc()).thenReturn(Collections.singletonList(createSignal("MySignal2", "String")));
        System.clearProperty("org.kie.server.jbpm-kafka.ext.signals.mapping");
        this.extension.onDeploy(new DeploymentEvent("MyDeploy4", this.deployedUnit));
        ((ProcessDefinition) Mockito.verify(this.processDefinition, Mockito.never())).getSignalsDesc();
    }

    @Test
    public void testMessageDisable() {
        Mockito.when(this.processDefinition.getMessagesDesc()).thenReturn(Collections.singletonList(MessageDescImpl.from(new Message("MyMessage"))));
        System.setProperty("org.kie.server.jbpm-kafka.ext.message.mapping", KafkaServerUtils.Mapping.NONE.toString());
        this.extension.onDeploy(new DeploymentEvent("MyDeploy4", this.deployedUnit));
        ((ProcessDefinition) Mockito.verify(this.processDefinition, Mockito.never())).getMessagesDesc();
    }

    private VerificationMode getTimeout() {
        return getTimeout(1);
    }

    private VerificationMode getTimeout(int i) {
        return Mockito.timeout(1000L).times(i);
    }

    private void publishEvent(String str, String str2) {
        Assert.assertTrue("Topic " + str + " not found", this.mockConsumer.subscription().contains(str));
        List<TopicPartition> singletonList = Collections.singletonList(new TopicPartition(str, 0));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (TopicPartition topicPartition : singletonList) {
            hashMap.put(topicPartition, 0L);
            hashMap2.put(topicPartition, 10L);
        }
        this.mockConsumer.rebalance(singletonList);
        this.mockConsumer.updateBeginningOffsets(hashMap);
        this.mockConsumer.updateEndOffsets(hashMap2);
        logger.debug("Publishing event {} to topic {}", str2, str);
        this.mockConsumer.addRecord(new ConsumerRecord(str, 0, 0L, "", str2.getBytes()));
    }
}
