/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.persistence.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.drools.core.io.impl.ClassPathResource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.api.io.Resource;
import org.kie.kogito.auth.IdentityProvider;
import org.kie.kogito.auth.IdentityProviders;
import org.kie.kogito.auth.SecurityPolicy;
import org.kie.kogito.persistence.KogitoProcessInstancesFactory;
import org.kie.kogito.persistence.kafka.KafkaPersistenceUtils;
import org.kie.kogito.persistence.kafka.KafkaStreamsStateListener;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.ProcessInstances;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.WorkItem;
import org.kie.kogito.process.bpmn2.BpmnProcess;
import org.kie.kogito.process.bpmn2.BpmnVariables;
import org.kie.kogito.process.workitem.Policy;
import org.kie.kogito.testcontainers.KogitoKafkaContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public class KafkaProcessInstancesIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessInstancesIT.class);
    @Container
    KogitoKafkaContainer kafka = new KogitoKafkaContainer();
    KafkaProcessInstancesFactory factory;
    KafkaStreamsStateListener listener = new KafkaStreamsStateListener();

    @BeforeEach
    void start() {
        this.factory = new KafkaProcessInstancesFactory();
        this.factory.setKafkaConfig(Collections.singletonMap("bootstrap.servers", this.kafka.getBootstrapServers()));
        this.factory.setStateListener(this.listener);
    }

    @AfterEach
    void stop() {
        if (this.factory != null) {
            this.factory.stop();
        }
        if (this.listener.getKafkaStreams() != null) {
            this.listener.getKafkaStreams().close();
            this.listener.getKafkaStreams().cleanUp();
        }
    }

    @Test
    void testFindByIdReadMode() {
        BpmnProcess process = (BpmnProcess)BpmnProcess.from((Resource[])new Resource[]{new ClassPathResource("BPMN2-UserTask-Script.bpmn2")}).get(0);
        this.listener.setKafkaStreams(this.createStreams((Process)process));
        process.setProcessInstancesFactory((ProcessInstancesFactory)this.factory);
        process.configure();
        this.listener.getKafkaStreams().start();
        ProcessInstances instances = process.instances();
        Assertions.assertThat((Integer)instances.size()).isZero();
        ProcessInstance mutablePi = process.createInstance(BpmnVariables.create(Collections.singletonMap("var", "value")));
        mutablePi.start();
        Assertions.assertThat((int)mutablePi.status()).isEqualTo(5);
        Assertions.assertThat((Optional)mutablePi.error()).hasValueSatisfying(error -> {
            Assertions.assertThat((String)error.errorMessage()).endsWith((CharSequence)"java.lang.NullPointerException - null");
            Assertions.assertThat((String)error.failedNodeId()).isEqualTo("ScriptTask_1");
        });
        Assertions.assertThat((Map)((BpmnVariables)mutablePi.variables()).toMap()).containsExactly(new Map.Entry[]{Assertions.entry((Object)"var", (Object)"value")});
        Awaitility.await().until(() -> instances.values().size() == 1);
        ProcessInstance pi = (ProcessInstance)instances.findById(mutablePi.id(), ProcessInstanceReadMode.READ_ONLY).get();
        Assertions.assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> pi.abort());
        ProcessInstance readOnlyPi = (ProcessInstance)instances.findById(mutablePi.id(), ProcessInstanceReadMode.READ_ONLY).get();
        Assertions.assertThat((int)readOnlyPi.status()).isEqualTo(5);
        Assertions.assertThat((Optional)readOnlyPi.error()).hasValueSatisfying(error -> {
            Assertions.assertThat((String)error.errorMessage()).endsWith((CharSequence)"java.lang.NullPointerException - null");
            Assertions.assertThat((String)error.failedNodeId()).isEqualTo("ScriptTask_1");
        });
        Assertions.assertThat((Map)((BpmnVariables)readOnlyPi.variables()).toMap()).containsExactly(new Map.Entry[]{Assertions.entry((Object)"var", (Object)"value")});
        Assertions.assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> readOnlyPi.abort());
        ((ProcessInstance)instances.findById(mutablePi.id()).get()).abort();
        Assertions.assertThat((Integer)instances.size()).isZero();
    }

    @Test
    void testValuesReadMode() {
        BpmnProcess process = (BpmnProcess)BpmnProcess.from((Resource[])new Resource[]{new ClassPathResource("BPMN2-UserTask.bpmn2")}).get(0);
        this.listener.setKafkaStreams(this.createStreams((Process)process));
        process.setProcessInstancesFactory((ProcessInstancesFactory)this.factory);
        process.configure();
        this.listener.getKafkaStreams().start();
        ProcessInstances instances = process.instances();
        Assertions.assertThat((Integer)instances.size()).isZero();
        ProcessInstance processInstance = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
        processInstance.start();
        Awaitility.await().until(() -> instances.values().size() == 1);
        ProcessInstance pi = (ProcessInstance)instances.values().stream().findFirst().get();
        Assertions.assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> pi.abort());
        ((ProcessInstance)instances.values(ProcessInstanceReadMode.MUTABLE).stream().findFirst().get()).abort();
        Assertions.assertThat((Integer)instances.size()).isZero();
    }

    @Test
    void testBasicFlow() {
        BpmnProcess process = (BpmnProcess)BpmnProcess.from((Resource[])new Resource[]{new ClassPathResource("BPMN2-UserTask.bpmn2")}).get(0);
        this.listener.setKafkaStreams(this.createStreams((Process)process));
        process.setProcessInstancesFactory((ProcessInstancesFactory)this.factory);
        process.configure();
        this.listener.getKafkaStreams().start();
        ProcessInstances instances = process.instances();
        Assertions.assertThat((Integer)instances.size()).isZero();
        ProcessInstance processInstance = process.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
        processInstance.start();
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)processInstance.status());
        Awaitility.await().until(() -> instances.values().size() == 1);
        SecurityPolicy asJohn = SecurityPolicy.of((IdentityProvider)IdentityProviders.of((String)"john"));
        Assertions.assertThat((List)((ProcessInstance)instances.values().iterator().next()).workItems(new Policy[]{asJohn})).hasSize(1);
        List workItems = processInstance.workItems(new Policy[]{asJohn});
        Assertions.assertThat((List)workItems).hasSize(1);
        WorkItem workItem = (WorkItem)workItems.get(0);
        org.junit.jupiter.api.Assertions.assertEquals((Object)"john", workItem.getParameters().get("ActorId"));
        processInstance.completeWorkItem(workItem.getId(), null, new Policy[]{asJohn});
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)processInstance.status());
        Assertions.assertThat((Integer)instances.size()).isZero();
    }

    KafkaStreams createStreams(Process process) {
        Topology topology = KafkaPersistenceUtils.createTopologyForProcesses(Arrays.asList(process.id()));
        KafkaStreams streams = new KafkaStreams(topology, this.getStreamsConfig());
        streams.setUncaughtExceptionHandler((thread, throwable) -> LOGGER.error("Kafka persistence error: " + throwable.getMessage(), throwable));
        streams.cleanUp();
        return streams;
    }

    Properties getStreamsConfig() {
        Properties properties = new Properties();
        properties.put("application.id", "kogito");
        properties.put("bootstrap.servers", this.kafka.getBootstrapServers());
        return properties;
    }

    private class KafkaProcessInstancesFactory
    extends KogitoProcessInstancesFactory {
        private KafkaProcessInstancesFactory() {
        }
    }
}

