package org.kie.server.springboot.samples;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.fasterxml.jackson.core.io.JsonEOFException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jbpm.kie.services.impl.KModuleDeploymentUnit;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ProcessService;
import org.jbpm.services.api.RuntimeDataService;
import org.jbpm.services.api.UserTaskService;
import org.jbpm.services.task.deadlines.notifications.impl.NotificationListenerManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.kie.api.task.model.TaskSummary;
import org.kie.internal.query.QueryFilter;
import org.kie.server.api.marshalling.MarshallingException;
import org.kie.server.springboot.samples.listeners.CountDownLatchEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.testcontainers.containers.KafkaContainer;

@SpringBootTest(classes = {KieServerApplication.class, TestAutoConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@TestPropertySource(locations = {"classpath:application-kafka.properties"})
@RunWith(SpringJUnit4ClassRunner.class)
/* loaded from: input_file:org/kie/server/springboot/samples/KafkaConsumerHappyPathTest.class */
public class KafkaConsumerHappyPathTest extends KafkaFixture {
    private static final String USELESS_DATA_EVENT = "useless-data-event.json";
    private static final String MALFORMED_EVENT = "malformed-event.json";
    private static final String MONEY_DATA_EVENT = "money-data-event.json";
    private static final String MONEY_DATA_NULL_AMOUNT_EVENT = "money-data-null-amount-event.json";
    private static final String MONEY_DATA_NULL_CURRENCY_EVENT = "money-data-null-currency-event.json";
    private static final String MONEY_DATA_NULL_BOTH_EVENT = "money-data-null-both-event.json";
    private static final String MONEY_DATA_WRONG_TYPE_EVENT = "money-data-wrong-type-event.json";
    private static final String MONEY_DATA_WRONG_DATE_EVENT = "money-data-wrong-date-event.json";
    private static final String REIMBURSEMENT_DATA_EVENT = "reimbursement-data-event.json";
    private static final String CURRENCY_EUR = "EUR";
    private static final String AMOUNT_294 = "294";
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerHappyPathTest.class);
    protected static KafkaContainer kafka = new KafkaContainer();

    @Rule
    public TestRule watcher = new TestWatcher() { // from class: org.kie.server.springboot.samples.KafkaConsumerHappyPathTest.1
        protected void starting(Description description) {
            KafkaConsumerHappyPathTest.logger.info(">>> Starting test: " + description.getMethodName());
        }
    };

    @Autowired
    protected DeploymentService deploymentService;

    @Autowired
    protected ProcessService processService;

    @Autowired
    protected UserTaskService userTaskService;

    @Autowired
    protected RuntimeDataService runtimeDataService;

    @Autowired
    protected CountDownLatchEventListener countDownListener;
    protected String deploymentId;
    protected KModuleDeploymentUnit unit;
    protected ListAppender<ILoggingEvent> listAppender;

    @BeforeClass
    public static void beforeClass() {
        generalSetup();
    }

    @Before
    public void setup() {
        this.unit = setup(this.deploymentService, "signalling-project");
        this.deploymentId = this.unit.getIdentifier();
        waitForConsumerGroupToBeReady();
        this.listAppender = addLogAppender();
    }

    @After
    public void cleanup() {
        NotificationListenerManager.get().reset();
        abortAllProcesses(this.runtimeDataService, this.processService);
        cleanup(this.deploymentService, this.unit);
    }

    @AfterClass
    public static void teardown() {
        kafka.stop();
        System.clearProperty("org.kie.server.jbpm-kafka.ext.bootstrap.servers");
    }

    @Test(timeout = 60000)
    public void testStartSignal() throws InterruptedException {
        this.countDownListener.configure("StartSignalProcess", 1);
        sendEvent("startSignal", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Assert.assertEquals(1L, this.countDownListener.getIds().size());
        Long l = this.countDownListener.getIds().get(0);
        MatcherAssert.assertThat(Integer.valueOf(this.processService.getProcessInstance(this.deploymentId, l).getState()), CoreMatchers.is(1));
        this.processService.abortProcessInstance(l);
    }

    @Test(timeout = 60000)
    public void testStartMessage() throws InterruptedException {
        this.countDownListener.configure("StartMessageProcess", 1);
        sendEvent("startMessage", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Assert.assertEquals(1L, this.countDownListener.getIds().size());
        Long l = this.countDownListener.getIds().get(0);
        MatcherAssert.assertThat(Integer.valueOf(this.processService.getProcessInstance(this.deploymentId, l).getState()), CoreMatchers.is(1));
        this.processService.abortProcessInstance(l);
    }

    @Test(timeout = 60000)
    public void testBoundarySignal() throws InterruptedException {
        Long startAndAssertProcess = startAndAssertProcess("BoundarySignalProcess");
        this.countDownListener.configure("BoundarySignalProcess", 1);
        sendEvent("boundarySignal", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Assert.assertNull(this.processService.getProcessInstance(startAndAssertProcess));
    }

    @Test(timeout = 60000)
    public void testBoundaryMessage() throws InterruptedException {
        Long startAndAssertProcess = startAndAssertProcess("BoundaryMessageProcess");
        this.countDownListener.configure("BoundaryMessageProcess", 1);
        sendEvent("boundaryMessage", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Assert.assertNull(this.processService.getProcessInstance(startAndAssertProcess));
    }

    @Test(timeout = 60000)
    public void testIntermediateCatchEventSignal() throws Exception {
        Long startAndAssertProcess = startAndAssertProcess("IntermediateCatchEventSignal");
        this.countDownListener.configure("IntermediateCatchEventSignal", 1);
        autocompleteSingleTask();
        sendEvent("intermediateSignal", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Assert.assertNull(this.processService.getProcessInstance(startAndAssertProcess));
    }

    @Test(timeout = 60000)
    public void testIntermediateCatchEventMessage() throws Exception {
        Long startAndAssertProcess = startAndAssertProcess("IntermediateCatchEventMessage");
        this.countDownListener.configure("IntermediateCatchEventMessage", 1);
        autocompleteSingleTask();
        sendEvent("intermediateMessage", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Assert.assertNull(this.processService.getProcessInstance(startAndAssertProcess));
    }

    @Test(timeout = 60000)
    public void testSubprocessSignal() throws Exception {
        Long startAndAssertProcess = startAndAssertProcess("SubprocessSignalProcess");
        this.countDownListener.configureNode("SubprocessSignalProcess", "sub-script", 4);
        for (int i = 0; i < 4; i++) {
            sendEvent("subprocessSignal", USELESS_DATA_EVENT);
        }
        this.countDownListener.getCountDown().await();
        autocompleteSingleTask();
        Assert.assertNull(this.processService.getProcessInstance(startAndAssertProcess));
    }

    @Test(timeout = 60000)
    public void testSubprocessMessage() throws Exception {
        Long startAndAssertProcess = startAndAssertProcess("SubprocessMessageProcess");
        this.countDownListener.configureNode("SubprocessMessageProcess", "sub-script", 4);
        for (int i = 0; i < 4; i++) {
            sendEvent("subprocessMessage", USELESS_DATA_EVENT);
        }
        this.countDownListener.getCountDown().await();
        autocompleteSingleTask();
        Assert.assertNull(this.processService.getProcessInstance(startAndAssertProcess));
    }

    @Test(timeout = 60000)
    public void testBoundarySignalMultipleProcesses() throws InterruptedException {
        Long[] startAndAssertProcesses = startAndAssertProcesses("BoundarySignalProcess", 5);
        this.countDownListener.configure("BoundarySignalProcess", 5);
        sendEvent("boundarySignal", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        assertNullProcesses(5, startAndAssertProcesses);
    }

    @Test(timeout = 60000)
    public void testBoundaryMessageMultipleProcesses() throws InterruptedException {
        Long[] startAndAssertProcesses = startAndAssertProcesses("BoundaryMessageProcess", 5);
        this.countDownListener.configure("BoundaryMessageProcess", 5);
        sendEvent("boundaryMessage", USELESS_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        assertNullProcesses(5, startAndAssertProcesses);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojo() throws InterruptedException {
        startMessagePojoParam(MONEY_DATA_EVENT, AMOUNT_294, CURRENCY_EUR);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoFirstParamNull() throws InterruptedException {
        startMessagePojoParam(MONEY_DATA_NULL_AMOUNT_EVENT, null, CURRENCY_EUR);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoSecondParamNull() throws InterruptedException {
        startMessagePojoParam(MONEY_DATA_NULL_CURRENCY_EVENT, AMOUNT_294, null);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoBothParamsNull() throws InterruptedException {
        startMessagePojoParam(MONEY_DATA_NULL_BOTH_EVENT, null, null);
    }

    @Test(timeout = 60000)
    public void testStartMessageComplexPojo() throws InterruptedException {
        this.countDownListener.configure("StartMessageComplexPojoProcess", 1);
        sendEvent("startMessageComplexPojo", REIMBURSEMENT_DATA_EVENT);
        this.countDownListener.getCountDown().await();
        Long l = this.countDownListener.getIds().get(0);
        Assert.assertEquals(2L, ((List) getVariableMap(l, "assignedReimbursement").get("expenses")).size());
        this.processService.abortProcessInstance(l);
    }

    @Test(timeout = 60000)
    public void testStartSignalsTransactional() throws InterruptedException {
        this.countDownListener.configure("StartSignalProcess", 4);
        sendTransactionalRecords(createTopicEventsMap("startSignal", Arrays.asList(USELESS_DATA_EVENT, USELESS_DATA_EVENT, USELESS_DATA_EVENT, USELESS_DATA_EVENT)));
        this.countDownListener.getCountDown().await();
        List<Long> ids = this.countDownListener.getIds();
        Assert.assertEquals(4L, ids.size());
        assertActiveProcessesAndAbort(ids);
    }

    @Test(timeout = 60000)
    public void testStartMessagesTransactionalMalformedEvents() throws InterruptedException {
        this.countDownListener.configure("StartMessageProcess", 2);
        sendTransactionalRecords(createTopicEventsMap("startMessage", Arrays.asList(MALFORMED_EVENT, USELESS_DATA_EVENT, MALFORMED_EVENT, USELESS_DATA_EVENT, MALFORMED_EVENT)));
        this.countDownListener.getCountDown().await();
        List<Long> ids = this.countDownListener.getIds();
        Assert.assertEquals(2L, ids.size());
        assertActiveProcessesAndAbort(ids);
    }

    @Test(timeout = 60000)
    public void testStartSignalsAndMessagesTransactional() throws InterruptedException {
        this.countDownListener.configure("StartSignalProcess", 12 * 2);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 12; i++) {
            arrayList.add(USELESS_DATA_EVENT);
        }
        Map<String, List<String>> createTopicEventsMap = createTopicEventsMap("startSignal", arrayList);
        createTopicEventsMap.putAll(createTopicEventsMap("startMessage", arrayList));
        sendTransactionalRecords(createTopicEventsMap);
        this.countDownListener.getCountDown().await();
        List<Long> ids = this.countDownListener.getIds();
        Assert.assertEquals(12 * 2, ids.size());
        assertActiveProcessesAndAbort(ids);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoMismatchedInput() throws InterruptedException {
        this.countDownListener.configure("StartMessagePojoProcess", 1);
        sendEvent("startMessagePojo", USELESS_DATA_EVENT);
        assertExceptionInLogs(this.listAppender, MarshallingException.class);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoMalformedData() throws InterruptedException {
        this.countDownListener.configure("StartMessagePojoProcess", 1);
        sendEvent("startMessagePojo", MALFORMED_EVENT);
        assertExceptionInLogs(this.listAppender, JsonEOFException.class);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoWrongType() throws InterruptedException {
        this.countDownListener.configure("StartMessagePojoProcess", 1);
        sendEvent("startMessagePojo", MONEY_DATA_WRONG_TYPE_EVENT);
        assertExceptionInLogs(this.listAppender, MarshallingException.class);
    }

    @Test(timeout = 60000)
    public void testStartMessagePojoClassNotFound() throws InterruptedException {
        this.countDownListener.configure("StartMessagePojoClassNotFoundProcess", 1);
        sendEvent("startMessagePojoClassNotFound", MONEY_DATA_EVENT);
        assertExceptionInLogs(this.listAppender, ClassNotFoundException.class);
    }

    @Test(timeout = 60000)
    @Ignore("Wrong dates are ignored since only data field is used in cloud event for now")
    public void testStartMessageParseException() throws InterruptedException {
        this.countDownListener.configure("StartMessagePojoProcess", 1);
        sendEvent("startMessagePojo", MONEY_DATA_WRONG_DATE_EVENT);
        assertExceptionInLogs(this.listAppender, ParseException.class);
    }

    @Test(timeout = 60000)
    public void testStartMessageInDifferentContainers() throws InterruptedException {
        startInDifferentContainers("StartMessageProcess", "startMessage");
    }

    @Test(timeout = 60000)
    public void testStartSignalInDifferentContainers() throws InterruptedException {
        startInDifferentContainers("StartSignalProcess", "startSignal");
    }

    protected void startInDifferentContainers(String str, String str2) throws InterruptedException {
        this.countDownListener.configure(str, 2);
        KModuleDeploymentUnit upVar = setup(this.deploymentService, "alt-project");
        try {
            sendEvent(str2, USELESS_DATA_EVENT);
            this.countDownListener.getCountDown().await();
            Assert.assertEquals(2L, this.countDownListener.getIds().size());
            for (Long l : this.countDownListener.getIds()) {
                try {
                    this.processService.abortProcessInstance(upVar.getIdentifier(), l);
                } catch (Exception e) {
                    this.processService.abortProcessInstance(this.unit.getIdentifier(), l);
                }
            }
        } finally {
            if (this.deploymentService != null) {
                this.deploymentService.undeploy(upVar);
            }
        }
    }

    protected void startMessagePojoParam(String str, String str2, String str3) throws InterruptedException {
        this.countDownListener.configure("StartMessagePojoProcess", 1);
        sendEvent("startMessagePojo", str);
        this.countDownListener.getCountDown().await();
        Assert.assertEquals(1L, this.countDownListener.getIds().size());
        Long l = this.countDownListener.getIds().get(0);
        assertAssignedMoney(l, str2, str3);
        this.processService.abortProcessInstance(l);
    }

    protected void assertExceptionInLogs(ListAppender<ILoggingEvent> listAppender, Class<?> cls) throws InterruptedException {
        this.countDownListener.getCountDown().await(2L, TimeUnit.SECONDS);
        Assert.assertEquals(cls.getCanonicalName(), getErrorLog(listAppender).get().getThrowableProxy().getClassName());
        Assert.assertEquals(0L, this.countDownListener.getIds().size());
    }

    protected Long startAndAssertProcess(String str) {
        Long startProcess = this.processService.startProcess(this.deploymentId, str);
        Assert.assertNotNull(startProcess);
        Assert.assertTrue(startProcess.longValue() > 0);
        return startProcess;
    }

    protected void autocompleteSingleTask() {
        this.userTaskService.completeAutoProgress(((TaskSummary) this.runtimeDataService.getTasksAssignedAsPotentialOwner("john", new QueryFilter()).get(0)).getId(), "john", Collections.emptyMap());
    }

    protected Long[] startAndAssertProcesses(String str, int i) {
        Long[] lArr = new Long[i];
        for (int i2 = 0; i2 < i; i2++) {
            lArr[i2] = startAndAssertProcess(str);
        }
        return lArr;
    }

    protected void sendEvent(String str, String str2) {
        sendRecord(str, readData(str2));
    }

    protected Map<String, List<String>> createTopicEventsMap(String str, List<String> list) {
        HashMap hashMap = new HashMap();
        hashMap.put(str, (List) list.stream().map(this::readData).collect(Collectors.toList()));
        return hashMap;
    }

    protected String readData(String str) {
        String str2 = null;
        try {
            str2 = IOUtils.toString(getClass().getResourceAsStream("/producer/data/" + str), StandardCharsets.UTF_8);
        } catch (IOException e) {
            Assert.fail("Not expected exception: " + e.getMessage());
        }
        return str2;
    }

    protected void assertAssignedMoney(Long l, String str, String str2) {
        Map<String, Object> variableMap = getVariableMap(l, "assignedMoney");
        Assert.assertEquals(Integer.valueOf(str != null ? Integer.parseInt(str) : 0), variableMap.get("amount"));
        Assert.assertEquals(str2, variableMap.get("currency"));
    }

    protected Map<String, Object> getVariableMap(Long l, String str) {
        Object processInstanceVariable = this.processService.getProcessInstanceVariable(this.deploymentId, l, str);
        Assert.assertNotNull(processInstanceVariable);
        return (Map) new ObjectMapper().convertValue(processInstanceVariable, new TypeReference<Map<String, Object>>() { // from class: org.kie.server.springboot.samples.KafkaConsumerHappyPathTest.2
        });
    }

    protected void assertNullProcesses(int i, Long[] lArr) {
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertNull(this.processService.getProcessInstance(lArr[i2]));
        }
    }

    protected void assertActiveProcessesAndAbort(List<Long> list) {
        for (Long l : list) {
            MatcherAssert.assertThat(Integer.valueOf(this.processService.getProcessInstance(this.deploymentId, l).getState()), CoreMatchers.is(1));
            this.processService.abortProcessInstance(l);
        }
    }
}
