package org.kie.kogito.persistence.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.drools.io.ClassPathResource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.api.io.Resource;
import org.kie.kogito.auth.IdentityProviders;
import org.kie.kogito.auth.SecurityPolicy;
import org.kie.kogito.persistence.KafkaProcessInstancesFactory;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.ProcessInstances;
import org.kie.kogito.process.WorkItem;
import org.kie.kogito.process.bpmn2.BpmnProcess;
import org.kie.kogito.process.bpmn2.BpmnVariables;
import org.kie.kogito.process.workitem.Policy;
import org.kie.kogito.test.utils.ProcessInstancesTestUtils;
import org.kie.kogito.testcontainers.KogitoKafkaContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/kie/kogito/persistence/kafka/KafkaProcessInstancesIT.class */
public class KafkaProcessInstancesIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessInstancesIT.class);
    private static final Duration TIMEOUT = Duration.ofMinutes(1);

    @Container
    KogitoKafkaContainer kafka = new KogitoKafkaContainer();
    KafkaProcessInstancesFactory factory;
    KafkaStreamsStateListener listener;

    @BeforeEach
    void start() {
        this.listener = new KafkaStreamsStateListener();
        this.factory = new KafkaProcessInstancesFactory();
        this.factory.setKafkaConfig(Collections.singletonMap("bootstrap.servers", this.kafka.getBootstrapServers()));
        this.factory.setStateListener(this.listener);
    }

    @AfterEach
    void stop() {
        if (this.factory != null) {
            this.factory.stop();
        }
        if (this.listener.getKafkaStreams() != null) {
            this.listener.getKafkaStreams().close();
            this.listener.getKafkaStreams().cleanUp();
        }
        this.listener.close();
    }

    private <T> void awaitTillOne(ProcessInstances<T> processInstances) {
        awaitTillSize(processInstances, 1);
    }

    private <T> void awaitTillEmpty(ProcessInstances<T> processInstances) {
        awaitTillSize(processInstances, 0);
    }

    private <T> void awaitTillSize(ProcessInstances<T> processInstances, int i) {
        Awaitility.await().atMost(TIMEOUT).until(() -> {
            Stream stream = processInstances.stream();
            try {
                Boolean valueOf = Boolean.valueOf(stream.count() == ((long) i));
                if (stream != null) {
                    stream.close();
                }
                return valueOf;
            } catch (Throwable th) {
                if (stream != null) {
                    try {
                        stream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test
    void testFindByIdReadMode() {
        BpmnProcess bpmnProcess = (BpmnProcess) BpmnProcess.from(new Resource[]{new ClassPathResource("BPMN2-UserTask-Script.bpmn2")}).get(0);
        this.listener.setKafkaStreams(createStreams());
        bpmnProcess.setProcessInstancesFactory(this.factory);
        bpmnProcess.configure();
        this.listener.getKafkaStreams().start();
        ProcessInstances instances = bpmnProcess.instances();
        ProcessInstancesTestUtils.assertEmpty(instances);
        ProcessInstance createInstance = bpmnProcess.createInstance(BpmnVariables.create(Collections.singletonMap("var", "value")));
        createInstance.start();
        Assertions.assertThat(createInstance.status()).isEqualTo(5);
        Assertions.assertThat(createInstance.error()).hasValueSatisfying(processError -> {
            Assertions.assertThat(processError.errorMessage()).contains(new CharSequence[]{"java.lang.NullPointerException"});
            Assertions.assertThat(processError.failedNodeId()).isEqualTo("ScriptTask_1");
        });
        Assertions.assertThat(((BpmnVariables) createInstance.variables()).toMap()).containsExactly(new Map.Entry[]{Assertions.entry("var", "value")});
        ProcessInstance processInstance = (ProcessInstance) instances.findById(createInstance.id(), ProcessInstanceReadMode.READ_ONLY).get();
        Assertions.assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> {
            processInstance.abort();
        });
        ProcessInstance processInstance2 = (ProcessInstance) instances.findById(createInstance.id(), ProcessInstanceReadMode.READ_ONLY).get();
        Assertions.assertThat(processInstance2.status()).isEqualTo(5);
        Assertions.assertThat(processInstance2.error()).hasValueSatisfying(processError2 -> {
            Assertions.assertThat(processError2.errorMessage()).contains(new CharSequence[]{"java.lang.NullPointerException"});
            Assertions.assertThat(processError2.failedNodeId()).isEqualTo("ScriptTask_1");
        });
        Assertions.assertThat(((BpmnVariables) processInstance2.variables()).toMap()).containsExactly(new Map.Entry[]{Assertions.entry("var", "value")});
        Assertions.assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> {
            processInstance2.abort();
        });
        ((ProcessInstance) instances.findById(createInstance.id()).get()).abort();
        awaitTillEmpty(instances);
    }

    @Test
    void testValuesReadMode() {
        BpmnProcess bpmnProcess = (BpmnProcess) BpmnProcess.from(new Resource[]{new ClassPathResource("BPMN2-UserTask.bpmn2")}).get(0);
        this.listener.setKafkaStreams(createStreams());
        bpmnProcess.setProcessInstancesFactory(this.factory);
        bpmnProcess.configure();
        this.listener.getKafkaStreams().start();
        ProcessInstances instances = bpmnProcess.instances();
        ProcessInstancesTestUtils.assertEmpty(instances);
        bpmnProcess.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test"))).start();
        awaitTillOne(instances);
        ProcessInstance first = ProcessInstancesTestUtils.getFirst(instances);
        Assertions.assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(() -> {
            first.abort();
        });
        ProcessInstancesTestUtils.abortFirst(instances);
        awaitTillEmpty(instances);
    }

    @Test
    void testBasicFlow() {
        BpmnProcess bpmnProcess = (BpmnProcess) BpmnProcess.from(new Resource[]{new ClassPathResource("BPMN2-UserTask.bpmn2")}).get(0);
        this.listener.setKafkaStreams(createStreams());
        bpmnProcess.setProcessInstancesFactory(this.factory);
        bpmnProcess.configure();
        this.listener.getKafkaStreams().start();
        ProcessInstances instances = bpmnProcess.instances();
        ProcessInstancesTestUtils.assertEmpty(instances);
        ProcessInstance createInstance = bpmnProcess.createInstance(BpmnVariables.create(Collections.singletonMap("test", "test")));
        createInstance.start();
        org.junit.jupiter.api.Assertions.assertEquals(1, createInstance.status());
        awaitTillOne(instances);
        Policy of = SecurityPolicy.of(IdentityProviders.of("john"));
        Assertions.assertThat(ProcessInstancesTestUtils.getFirst(instances).workItems(new Policy[]{of})).hasSize(1);
        List workItems = createInstance.workItems(new Policy[]{of});
        Assertions.assertThat(workItems).hasSize(1);
        WorkItem workItem = (WorkItem) workItems.get(0);
        org.junit.jupiter.api.Assertions.assertEquals("john", workItem.getParameters().get("ActorId"));
        createInstance.completeWorkItem(workItem.getId(), (Map) null, new Policy[]{of});
        org.junit.jupiter.api.Assertions.assertEquals(2, createInstance.status());
        awaitTillEmpty(instances);
    }

    KafkaStreams createStreams() {
        KafkaStreams kafkaStreams = new KafkaStreams(KafkaPersistenceUtils.createTopologyForProcesses(), getStreamsConfig());
        kafkaStreams.setUncaughtExceptionHandler(th -> {
            LOGGER.error("Kafka persistence error: " + th.getMessage(), th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        kafkaStreams.cleanUp();
        return kafkaStreams;
    }

    Properties getStreamsConfig() {
        Properties properties = new Properties();
        properties.put("application.id", "kogito");
        properties.put("bootstrap.servers", this.kafka.getBootstrapServers());
        return properties;
    }
}
