package org.jbpm.process.longrest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.undertow.Undertow;
import io.undertow.servlet.api.DeploymentInfo;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.Cookie;
import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer;
import org.jboss.resteasy.spi.ResteasyDeployment;
import org.jbpm.process.longrest.bpm.TestFunctions;
import org.jbpm.process.longrest.demoservices.CookieListener;
import org.jbpm.process.longrest.demoservices.EventType;
import org.jbpm.process.longrest.demoservices.Service;
import org.jbpm.process.longrest.demoservices.ServiceListener;
import org.jbpm.process.longrest.demoservices.dto.PreBuildRequest;
import org.jbpm.process.longrest.demoservices.dto.Request;
import org.jbpm.process.longrest.demoservices.dto.Scm;
import org.jbpm.process.longrest.mockserver.WorkItems;
import org.jbpm.process.longrest.util.Maps;
import org.jbpm.test.JbpmJUnitBaseTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.event.process.DefaultProcessEventListener;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEventListener;
import org.kie.api.event.process.ProcessNodeTriggeredEvent;
import org.kie.api.event.process.ProcessVariableChangedEvent;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.manager.RuntimeEngine;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jbpm/process/longrest/RestServiceWorkitemIntegrationTest.class */
public class RestServiceWorkitemIntegrationTest extends JbpmJUnitBaseTestCase {
    private final Logger logger;
    private static int PORT = 8080;
    private static String DEFAULT_HOST = "localhost";
    private UndertowJaxrsServer server;
    private ObjectMapper objectMapper;
    private final ActiveTasks activeProcesses;
    private final ServiceListener serviceListener;
    private final CookieListener cookieListener;

    public RestServiceWorkitemIntegrationTest() {
        super(true, true);
        this.logger = LoggerFactory.getLogger(RestServiceWorkitemIntegrationTest.class);
        this.objectMapper = new ObjectMapper();
        this.activeProcesses = new ActiveTasks();
        this.serviceListener = new ServiceListener();
        this.cookieListener = new CookieListener();
    }

    @Before
    public void preTestSetup() throws Exception {
        System.setProperty("HOSTNAME_HTTP", "localhost:8080");
        setupPoolingDataSource();
        HashMap hashMap = new HashMap();
        hashMap.put("execute-rest.bpmn", ResourceType.BPMN2);
        hashMap.put("test-process.bpmn", ResourceType.BPMN2);
        this.manager = createRuntimeManager(JbpmJUnitBaseTestCase.Strategy.PROCESS_INSTANCE, hashMap);
        this.customProcessListeners.add(new RestServiceProcessEventListener(this.activeProcesses));
        this.customHandlers.put("LongRunningRestService", new LongRunningRestServiceWorkItemHandler(this.manager));
        bootUpServices();
    }

    @After
    public void postTestTeardown() throws Exception {
        this.logger.info("Stopping http server ...");
        this.server.stop();
    }

    private void bootUpServices() throws Exception {
        this.server = new UndertowJaxrsServer();
        ResteasyDeployment resteasyDeployment = new ResteasyDeployment();
        resteasyDeployment.setApplicationClass(JaxRsActivator.class.getName());
        DeploymentInfo undertowDeployment = this.server.undertowDeployment(resteasyDeployment, "/");
        undertowDeployment.setClassLoader(getClass().getClassLoader());
        undertowDeployment.setDeploymentName("TestServices");
        undertowDeployment.setContextPath("/");
        undertowDeployment.addServletContextAttribute(Service.SERVICE_LISTENER_KEY, this.serviceListener);
        undertowDeployment.addServletContextAttribute(Service.COOKIE_LISTENER_KEY, this.cookieListener);
        undertowDeployment.addServletContextAttribute(WorkItems.RUNTIME_MANAGER_KEY, this.manager);
        this.server.deploy(undertowDeployment);
        this.server.start(Undertow.builder().addHttpListener(PORT, "localhost"));
    }

    @Test(timeout = 15000)
    public void shouldInvokeRemoteServiceAndReceiveCallback() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "preBuildResult", "buildResult", "completionResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Semaphore semaphore = new Semaphore(0);
        ServiceListener.Subscription subscribe = this.serviceListener.subscribe(EventType.CALLBACK_COMPLETED, obj -> {
            semaphore.release();
        });
        HashMap hashMap = new HashMap();
        hashMap.put("A", 1);
        hashMap.put("lines", "two\nlines");
        hashMap.put("quote", "String \"literal\".");
        kieSession.startProcess("testProcess", Collections.singletonMap("input", getProcessParameters(1, 30, 1, 30, hashMap)));
        this.manager.disposeRuntimeEngine(runtimeEngine);
        arrayBlockingQueue.take();
        arrayBlockingQueue.take();
        Map map = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("preBuildCallbackResult: " + map);
        Assert.assertEquals("new-scm-tag", Maps.getStringObjectMap(Maps.getStringObjectMap(map, "response"), "scm").get("revision"));
        Assert.assertTrue(Maps.getStringObjectMap(map, "initialResponse").get("cancelUrl").toString().startsWith("http://localhost:8080/demo-service/cancel/"));
        this.logger.info("buildCallbackResult: " + ((Map) arrayBlockingQueue.take().getNewValue()));
        Assert.assertEquals("SUCCESS", Maps.getStringObjectMap(map, "response").get("status"));
        Map map2 = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("completionResult: " + map2);
        Assert.assertEquals("SUCCESS", map2.get("status"));
        Map map3 = (Map) ((Map) map2.get("response")).get("labels");
        Assert.assertEquals(hashMap.get("lines"), map3.get("lines"));
        Assert.assertEquals(hashMap.get("quote"), map3.get("quote"));
        this.logger.info("Waiting for callback to complete...");
        semaphore.acquire(2);
        this.logger.info("Callback completed.");
        this.customProcessListeners.remove(processEventListener);
        this.serviceListener.unsubscribe(subscribe);
    }

    @Test(timeout = 20000)
    public void shouldCatchException() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "preBuildResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Map<String, Object> processParameters = getProcessParameters(2, 10, 10, 10, Collections.emptyMap());
        processParameters.put("preBuildServiceUrl", "http://host-not-found:8080/");
        kieSession.startProcess("testProcess", Collections.singletonMap("input", processParameters));
        this.manager.disposeRuntimeEngine(runtimeEngine);
        arrayBlockingQueue.take();
        Map map = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("preBuildCallbackResult: " + map);
        RemoteInvocationException remoteInvocationException = (RemoteInvocationException) map.get("error");
        this.logger.info("Expected exception: {}.", remoteInvocationException.getMessage());
        Assert.assertNotNull(remoteInvocationException);
        this.customProcessListeners.remove(processEventListener);
    }

    @Test(timeout = 20000)
    public void shouldRetryFailedRequest() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "preBuildResult", "retryAttempt", "completionResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Map<String, Object> processParameters = getProcessParameters(2, 10, 10, 10, Collections.emptyMap());
        processParameters.put("preBuildServiceUrl", "http://host-not-found:8080/");
        processParameters.put("retryDelay", 250);
        processParameters.put("maxRetries", 3);
        kieSession.startProcess("testProcess", Collections.singletonMap("input", processParameters));
        this.manager.disposeRuntimeEngine(runtimeEngine);
        arrayBlockingQueue.take();
        arrayBlockingQueue.take();
        arrayBlockingQueue.take();
        arrayBlockingQueue.take();
        Assert.assertEquals("3", ((Integer) arrayBlockingQueue.take().getNewValue()).toString());
        Map map = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("preBuildCallbackResult: " + map);
        RemoteInvocationException remoteInvocationException = (RemoteInvocationException) map.get("error");
        this.logger.info("Expected exception: {}.", remoteInvocationException.getMessage());
        Assert.assertNotNull(remoteInvocationException);
        arrayBlockingQueue.take();
        arrayBlockingQueue.take();
        arrayBlockingQueue.take();
        arrayBlockingQueue.take().getNewValue();
        this.customProcessListeners.remove(processEventListener);
    }

    @Test(timeout = 15000)
    public void testTimeoutServiceDoesNotRespondCancelSuccess() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "restResponse", "preBuildResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Semaphore semaphore = new Semaphore(0);
        AtomicInteger atomicInteger = new AtomicInteger();
        ServiceListener.Subscription subscribe = this.serviceListener.subscribe(EventType.CALLBACK_COMPLETED, obj -> {
            semaphore.release();
        });
        this.serviceListener.subscribe(EventType.BUILD_REQUESTED, obj2 -> {
            atomicInteger.incrementAndGet();
        });
        ProcessInstance startProcess = kieSession.startProcess("testProcess", Collections.singletonMap("input", getProcessParameters(10, 30, 1, 30, Collections.emptyMap())));
        this.manager.disposeRuntimeEngine(runtimeEngine);
        arrayBlockingQueue.take();
        arrayBlockingQueue.take().getNewValue();
        RuntimeEngine runtimeEngine2 = getRuntimeEngine(startProcess.getId());
        this.logger.info("Signalling cancel ...");
        runtimeEngine2.getKieSession().signalEvent("CancelAll", (Object) null);
        this.manager.disposeRuntimeEngine(runtimeEngine2);
        Map map = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("preBuildCallbackResult: " + map);
        Assert.assertEquals("true", Maps.getStringObjectMap(map, "response").get("cancelled"));
        this.logger.info("Waiting for all processes to complete...");
        this.activeProcesses.waitAllCompleted();
        this.logger.info("All processes completed.");
        this.logger.info("Waiting for callback to complete...");
        semaphore.acquire();
        this.logger.info("Callback completed.");
        Assert.assertEquals(0L, atomicInteger.get());
        this.customProcessListeners.remove(processEventListener);
        this.serviceListener.unsubscribe(subscribe);
    }

    @Test(timeout = 15000)
    public void serviceTimesOutInternalCancelSucceeds() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "preBuildResult", "completionResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Semaphore semaphore = new Semaphore(0);
        ServiceListener.Subscription subscribe = this.serviceListener.subscribe(EventType.CALLBACK_COMPLETED, obj -> {
            semaphore.release();
        });
        HashMap hashMap = new HashMap();
        this.cookieListener.addConsumer(map -> {
            hashMap.putAll(map);
        });
        kieSession.startProcess("testProcess", Collections.singletonMap("input", getProcessParameters(10, 2, 1, 30, Collections.emptyMap())));
        this.manager.disposeRuntimeEngine(runtimeEngine);
        arrayBlockingQueue.take();
        arrayBlockingQueue.take().getNewValue();
        Map map2 = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("preBuildResult: " + map2);
        Assert.assertEquals("true", Maps.getStringObjectMap(map2, "response").get("cancelled"));
        Assert.assertEquals("TIMED_OUT", map2.get("status"));
        Map<String, Object> changedVariable = getChangedVariable(arrayBlockingQueue, "completionResult");
        this.logger.info("completionResult: " + changedVariable);
        Assert.assertEquals("SUCCESS", changedVariable.get("status"));
        this.activeProcesses.waitAllCompleted();
        semaphore.acquire();
        Assert.assertFalse(hashMap.isEmpty());
        Assert.assertEquals(Service.PRE_BUILD_COOKIE_VALUE, ((Cookie) hashMap.get(Service.PRE_BUILD_COOKIE_NAME)).getValue());
        this.customProcessListeners.remove(processEventListener);
        this.serviceListener.unsubscribe(subscribe);
    }

    @Test(timeout = 15000)
    public void serviceTimesOutInternalCancelTimesOut() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "preBuildResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Semaphore semaphore = new Semaphore(0);
        ServiceListener.Subscription subscribe = this.serviceListener.subscribe(EventType.CALLBACK_COMPLETED, obj -> {
            semaphore.release();
        });
        kieSession.startProcess("testProcess", Collections.singletonMap("input", getProcessParameters(10, 2, 10, 2, Collections.emptyMap())));
        this.manager.disposeRuntimeEngine(runtimeEngine);
        arrayBlockingQueue.take();
        arrayBlockingQueue.take().getNewValue();
        Map map = (Map) arrayBlockingQueue.take().getNewValue();
        this.logger.info("preBuildResult: " + map);
        Maps.getStringObjectMap(map, "response");
        Assert.assertEquals("TIMED_OUT", map.get("status"));
        this.activeProcesses.waitAllCompleted();
        semaphore.acquire();
        this.customProcessListeners.remove(processEventListener);
        this.serviceListener.unsubscribe(subscribe);
    }

    @Test(timeout = 15000)
    public void shouldStartAndCompleteExecuteRestProcess() throws InterruptedException {
        final Semaphore semaphore = new Semaphore(0);
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        DefaultProcessEventListener defaultProcessEventListener = new DefaultProcessEventListener() { // from class: org.jbpm.process.longrest.RestServiceWorkitemIntegrationTest.1
            public void beforeNodeTriggered(ProcessNodeTriggeredEvent processNodeTriggeredEvent) {
                RestServiceWorkitemIntegrationTest.this.logger.info("Event ID: {}, event node ID: {}, event node name: {}", new Object[]{Long.valueOf(processNodeTriggeredEvent.getNodeInstance().getId()), Long.valueOf(processNodeTriggeredEvent.getNodeInstance().getNodeId()), processNodeTriggeredEvent.getNodeInstance().getNodeName()});
            }

            public void afterProcessCompleted(ProcessCompletedEvent processCompletedEvent) {
                RestServiceWorkitemIntegrationTest.this.logger.info("Process completed, unblocking test.");
                semaphore.release();
            }

            public void afterVariableChanged(ProcessVariableChangedEvent processVariableChangedEvent) {
                String variableId = processVariableChangedEvent.getVariableId();
                RestServiceWorkitemIntegrationTest.this.logger.info("Process: {}, variable: {}, changed to: {}.", new Object[]{processVariableChangedEvent.getProcessInstance().getProcessName(), variableId, processVariableChangedEvent.getNewValue()});
                if (Arrays.asList("result").contains(variableId)) {
                    arrayBlockingQueue.add(processVariableChangedEvent);
                }
            }
        };
        this.customProcessListeners.add(defaultProcessEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        runtimeEngine.getKieSession().startProcess("org.jbpm.process.longrest.executerest", getExecuteRestParameters());
        this.manager.disposeRuntimeEngine(runtimeEngine);
        if (!semaphore.tryAcquire(15L, TimeUnit.SECONDS)) {
            Assert.fail("Failed to complete the process.");
        }
        Map map = (Map) ((ProcessVariableChangedEvent) arrayBlockingQueue.take()).getNewValue();
        this.logger.info("result: " + map);
        Assert.assertEquals("new-scm-tag", Maps.getStringObjectMap(Maps.getStringObjectMap(map, "response"), "scm").get("revision"));
        this.customProcessListeners.remove(defaultProcessEventListener);
    }

    @Test(timeout = 15000)
    public void shouldFailWhenThereIsNoHeartBeat() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        ProcessEventListener processEventListener = getProcessEventListener(arrayBlockingQueue, "preBuildResult");
        this.customProcessListeners.add(processEventListener);
        RuntimeEngine runtimeEngine = getRuntimeEngine();
        KieSession kieSession = runtimeEngine.getKieSession();
        Semaphore semaphore = new Semaphore(0);
        ServiceListener.Subscription subscribe = this.serviceListener.subscribe(EventType.CALLBACK_COMPLETED, obj -> {
            semaphore.release();
        });
        TestFunctions.addHeartBeatToRequest = true;
        try {
            kieSession.startProcess("testProcess", Collections.singletonMap("input", getProcessParameters(10, 10, 10, 2, 2, 4, Collections.emptyMap())));
            this.manager.disposeRuntimeEngine(runtimeEngine);
            arrayBlockingQueue.take();
            Map map = (Map) arrayBlockingQueue.take().getNewValue();
            this.logger.info("preBuildResult: " + map);
            Maps.getStringObjectMap(map, "response");
            Assert.assertEquals("DIED", map.get("status"));
            this.activeProcesses.waitAllCompleted();
            semaphore.acquire();
            this.customProcessListeners.remove(processEventListener);
            this.serviceListener.unsubscribe(subscribe);
            TestFunctions.addHeartBeatToRequest = false;
        } catch (Throwable th) {
            TestFunctions.addHeartBeatToRequest = false;
            throw th;
        }
    }

    private Map<String, Object> getExecuteRestParameters() {
        HashMap hashMap = new HashMap();
        hashMap.put("requestMethod", "POST");
        hashMap.put("requestHeaders", null);
        hashMap.put("requestUrl", "http://localhost:8080/demo-service/prebuild");
        hashMap.put("requestTemplate", getPreBuildRequestBody());
        hashMap.put("taskTimeout", "10");
        hashMap.put("cancel", false);
        hashMap.put("cancelTimeout", null);
        hashMap.put("cancelUrlJsonPointer", null);
        hashMap.put("cancelUrlTemplate", null);
        hashMap.put("cancelUrlTemplate", null);
        hashMap.put("cancelMethod", null);
        hashMap.put("cancelHeaders", null);
        hashMap.put("successEvalTemplate", null);
        return hashMap;
    }

    private String getPreBuildRequestBody() {
        PreBuildRequest preBuildRequest = new PreBuildRequest();
        Scm scm = new Scm();
        scm.setUrl("https://github.com/kiegroup/jbpm-work-items.git");
        preBuildRequest.setScm(scm);
        Request request = new Request();
        request.setMethod("POST");
        request.setUrl("@{system.callbackUrl}");
        preBuildRequest.setCallback(request);
        try {
            return this.objectMapper.writeValueAsString(preBuildRequest);
        } catch (JsonProcessingException e) {
            Assert.fail("Cannot serialize preBuildRequest: " + e.getMessage());
            return null;
        }
    }

    private Map<String, Object> getProcessParameters(int i, int i2, int i3, int i4, Map<String, Object> map) {
        return getProcessParameters(i, i2, i3, i4, 10, 0, map);
    }

    private Map<String, Object> getProcessParameters(int i, int i2, int i3, int i4, int i5, int i6, Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("serviceBaseUrl", "http://localhost:8080/demo-service");
        hashMap.put("preBuildServiceUrl", "http://localhost:8080/demo-service/prebuild?callbackDelay=" + i + "&cancelDelay=" + i3 + "&cancelHeartBeatAfter=" + i5);
        hashMap.put("preBuildTimeout", Integer.valueOf(i2));
        hashMap.put("preBuildCancelTimeout", Integer.valueOf(i4));
        hashMap.put("retryDelay", 0);
        hashMap.put("maxRetries", 0);
        hashMap.put("heartbeatTimeout", Integer.valueOf(i6));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("id", "1");
        hashMap2.put("scmRepoURL", "https://github.com/kiegroup/jbpm-work-items.git");
        hashMap2.put("scmRevision", "main");
        hashMap2.put("preBuildSyncEnabled", "true");
        hashMap2.put("buildScript", "true");
        hashMap2.put("labels", map);
        hashMap.put("buildConfiguration", hashMap2);
        return hashMap;
    }

    private KieSession getKieSession(long j) {
        return getRuntimeEngine(j).getKieSession();
    }

    private RuntimeEngine getRuntimeEngine(long j) {
        return this.manager.getRuntimeEngine(ProcessInstanceIdContext.get(Long.valueOf(j)));
    }

    private ProcessEventListener getProcessEventListener(final BlockingQueue<ProcessVariableChangedEvent> blockingQueue, final String... strArr) {
        return new DefaultProcessEventListener() { // from class: org.jbpm.process.longrest.RestServiceWorkitemIntegrationTest.2
            public void afterVariableChanged(ProcessVariableChangedEvent processVariableChangedEvent) {
                String variableId = processVariableChangedEvent.getVariableId();
                RestServiceWorkitemIntegrationTest.this.logger.info("Process: {}, variable: {}, changed to: {}.", new Object[]{processVariableChangedEvent.getProcessInstance().getProcessName(), variableId, processVariableChangedEvent.getNewValue()});
                if (Arrays.asList(strArr).contains(variableId)) {
                    blockingQueue.add(processVariableChangedEvent);
                }
            }
        };
    }

    private Map<String, Object> getChangedVariable(BlockingQueue<ProcessVariableChangedEvent> blockingQueue, String str) throws InterruptedException {
        ProcessVariableChangedEvent take;
        do {
            take = blockingQueue.take();
        } while (!take.getVariableId().equals(str));
        return (Map) take.getNewValue();
    }
}
