/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.test.regression;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.ListAssert;
import org.jbpm.executor.AsynchronousJobEvent;
import org.jbpm.executor.AsynchronousJobListener;
import org.jbpm.executor.impl.ExecutorServiceImpl;
import org.jbpm.executor.impl.wih.AsyncWorkItemHandler;
import org.jbpm.executor.test.CountDownAsyncJobListener;
import org.jbpm.test.JbpmAsyncJobTestCase;
import org.jbpm.test.persistence.util.PersistenceUtil;
import org.junit.Test;
import org.kie.api.executor.ExecutorService;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.api.runtime.process.WorkItemHandler;
import org.kie.api.runtime.query.QueryContext;
import org.kie.test.util.db.PoolingDataSourceWrapper;
import qa.tools.ikeeper.annotation.BZ;

public class ParallelAsyncJobsTest
extends JbpmAsyncJobTestCase {
    private static final String PARENT = "org/jbpm/test/regression/ParallelAsyncJobs-parent.bpmn2";
    private static final String PARENT_ID = "org.jbpm.test.regression.ParallelAsyncJobs-parent";
    private static final String SUBPROCESS = "org/jbpm/test/regression/ParallelAsyncJobs-subprocess.bpmn2";

    @Test(timeout=30000L)
    @BZ(value={"1146829"})
    public void testRunBasicAsync() throws Exception {
        ExecutorService executorService = this.getExecutorService();
        final HashSet threadExeuctingJobs = new HashSet();
        CountDownAsyncJobListener countDownListener = new CountDownAsyncJobListener(4){

            public void afterJobExecuted(AsynchronousJobEvent event) {
                super.afterJobExecuted(event);
                threadExeuctingJobs.add(Thread.currentThread().getName());
            }
        };
        ((ExecutorServiceImpl)executorService).addAsyncJobListener((AsynchronousJobListener)countDownListener);
        KieSession ks = this.createKSession(PARENT, SUBPROCESS);
        ks.getWorkItemManager().registerWorkItemHandler("async", (WorkItemHandler)new AsyncWorkItemHandler(executorService, "org.jbpm.test.command.LongRunningCommand"));
        ArrayList<String> exceptions = new ArrayList<String>();
        exceptions.add("ADRESS_EXCEPTION");
        exceptions.add("ID_EXCEPTION");
        exceptions.add("PHONE_EXCEPTION");
        HashMap<String, ArrayList<String>> pm = new HashMap<String, ArrayList<String>>();
        pm.put("exceptions", exceptions);
        ProcessInstance pi = ks.startProcess(PARENT_ID, pm);
        this.assertProcessInstanceCompleted(pi.getId());
        countDownListener.waitTillCompleted();
        ((ListAssert)Assertions.assertThat((List)executorService.getCompletedRequests(new QueryContext())).as("All async jobs should have been executed", new Object[0])).hasSize(4);
        ((IterableAssert)Assertions.assertThat(threadExeuctingJobs).as("There should be 4 distinct threads as jobs where executed in parallel", new Object[0])).hasSize(4);
    }

    @Override
    protected PoolingDataSourceWrapper setupPoolingDataSource() {
        Properties dsProps = PersistenceUtil.getDatasourceProperties();
        dsProps.setProperty("POOL_CONNECTIONS", "false");
        PoolingDataSourceWrapper ds1 = PersistenceUtil.setupPoolingDataSource((Properties)dsProps, (String)"jdbc/jbpm-ds");
        return ds1;
    }
}

