package org.jbpm.workitem.springboot.samples;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jbpm.services.api.RuntimeDataService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.kie.api.executor.ExecutorService;
import org.kie.api.runtime.query.QueryContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;

@SpringBootTest(classes = {JBPMApplication.class, TestAutoConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@TestPropertySource(locations = {"classpath:application-test-async.properties"})
@RunWith(Parameterized.class)
/* loaded from: input_file:org/jbpm/workitem/springboot/samples/KafkaProxyAsyncSampleTest.class */
public class KafkaProxyAsyncSampleTest extends KafkaProxyBase {
    private static final String KEY_ASYNC = "PAM_Producer";
    private static final String VALUE_ASYNC = "That's no moon. It's a space station.";
    private static final Map<String, Object> VARIABLES_MAP = Collections.singletonMap("msg", VALUE_ASYNC);
    private static final String HELLO_PROCESS = "kafka-event-emitter.HelloWorld";
    private static final String KAFKA_WIH_PROCESS = "kafka-event-emitter.KafkaEventEmitter";
    protected static final String ASYNC_TEMPLATE_FILE = "src/test/resources/templates/kie-deployment-descriptor.async_template";

    @Autowired
    private ExecutorService executorService;

    @Autowired
    private RuntimeDataService runtimeDataService;

    @BeforeClass
    public static void beforeClass() {
        kafkaFixture.setTemplateFile(ASYNC_TEMPLATE_FILE);
    }

    @Override // org.jbpm.workitem.springboot.samples.KafkaProxyBase
    @Before
    public void setup() throws IOException, InterruptedException {
        super.setup();
        this.executorService.init();
    }

    @Override // org.jbpm.workitem.springboot.samples.KafkaProxyBase
    @After
    public void cleanup() {
        super.cleanup();
        this.executorService.clearAllErrors();
        this.executorService.clearAllRequests();
        this.executorService.destroy();
    }

    @Test(timeout = 30000)
    public void testAsyncKafkaWIH() throws Exception {
        this.countDownLatchEventListener.configureNode(KAFKA_WIH_PROCESS, "End", 2);
        Assert.assertTrue(this.processService.startProcess(this.deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP).longValue() > 0);
        this.countDownLatchEventListener.getCountDown().await();
        kafkaFixture.assertConsumerMessages(this.proxyBootstrap, KEY_ASYNC, VALUE_ASYNC);
        assertRequestsAndProcesses(0, 1, 0);
    }

    @Test(timeout = 30000)
    public void testAsyncKafkaWIHConnectedLater() throws Exception {
        this.countDownLatchEventListener.configureNode(KAFKA_WIH_PROCESS, "End", 2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.kafkaProxy.setConnectionCut(true);
        Assert.assertTrue(this.processService.startProcess(this.deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP).longValue() > 0);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(1L, this.runtimeDataService.getProcessInstances(Collections.singletonList(1), (String) null, (QueryContext) null).size());
        this.kafkaProxy.setConnectionCut(false);
        this.countDownLatchEventListener.getCountDown().await();
        kafkaFixture.assertConsumerMessages(this.proxyBootstrap, KEY_ASYNC, VALUE_ASYNC);
        assertRequestsAndProcesses(0, 1, 0);
    }

    @Test(timeout = 80000)
    public void testAsyncKafkaWIHNoConnection() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.kafkaProxy.setConnectionCut(true);
        Long startProcess = this.processService.startProcess(this.deploymentId, KAFKA_WIH_PROCESS, VARIABLES_MAP);
        Assert.assertTrue(startProcess.longValue() > 0);
        try {
            startOtherProcess();
            countDownLatch.await(65L, TimeUnit.SECONDS);
            assertRequestsAndProcesses(1, 0, 1);
            startOtherProcess();
        } finally {
            this.processService.abortProcessInstance(startProcess);
        }
    }

    private void assertRequestsAndProcesses(int i, int i2, int i3) {
        Assert.assertEquals(i, this.executorService.getInErrorRequests(new QueryContext()).size());
        Assert.assertEquals(i2, this.executorService.getCompletedRequests(new QueryContext()).size());
        Assert.assertEquals(i3, this.runtimeDataService.getProcessInstances(Collections.singletonList(1), (String) null, (QueryContext) null).size());
    }

    private void startOtherProcess() {
        Assert.assertEquals("Hello Grogu", this.processService.computeProcessOutcome(this.deploymentId, HELLO_PROCESS, Collections.singletonMap("name", "Grogu")).get("outcome"));
    }
}
