/*
 * Decompiled with CFR 0.152.
 */
package org.kie.server.springboot.samples;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.read.ListAppender;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.jbpm.event.emitters.kafka.KafkaEventEmitter;
import org.jbpm.kie.services.impl.KModuleDeploymentUnit;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.model.DeploymentUnit;
import org.jbpm.services.task.impl.model.UserImpl;
import org.junit.Assert;
import org.kie.api.task.model.OrganizationalEntity;
import org.kie.server.api.model.cases.CaseFile;
import org.kie.server.springboot.utils.KieJarBuildHelper;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;

public class KafkaFixture {
    protected static final int LARGE_SIZE = 50000;
    protected static final int TOO_LARGE_SIZE = 500000;
    protected static final String GROUP_ID = "org.kie.server.testing";
    protected static final String VERSION = "1.0.0";
    protected static final String PROCESSES_TOPIC = "jbpm-processes-events";
    protected static final String TASKS_TOPIC = "jbpm-tasks-events";
    protected static final String CASES_TOPIC = "jbpm-cases-events";
    protected static final String CUSTOM_PROCESSES_TOPIC = "custom-processes";
    protected static final String PROCESS_TYPE = "process";
    protected static final String CASE_TYPE = "case";
    protected static final String EVALUATION_PROCESS_ID = "evaluation";
    protected static final String EVALUATION_DESC = "Evaluation";
    protected static final String JOHN = "john";
    protected static final String YODA = "yoda";
    protected static final String CONTACT = "contact";
    protected static final String OWNER = "owner";
    protected static final String VAR_KEY = "s";
    protected static final String VAR_VALUE = "first case started";
    protected static final String CHINESE_INITIATOR = "\u53d1\u8d77\u8005";
    protected static final String LARGE_VAR = "large_var";
    protected static final String NULL_VAR = "null_var";
    protected static final String INITIATOR = "initiator";
    protected static final String CASE_VARIABLES = "caseVariables";
    protected static final String VARIABLES = "variables";
    protected static final String USER_TASK_CASE = "user-task-case";
    protected static final String PATH = "src/test/resources/kjars/";
    protected static final String BOOTSTRAP_SERVERS = "org.kie.jbpm.event.emitters.kafka.bootstrap.servers";
    protected static final String CLIENT_ID = "org.kie.jbpm.event.emitters.kafka.client.id";
    protected static final String TOPIC_PROCESSES = "org.kie.jbpm.event.emitters.kafka.topic.processes";
    protected static String bootstrapServers;
    protected KModuleDeploymentUnit unit = null;

    public static boolean isDockerAvailable() {
        try {
            DockerClientFactory.instance().client();
            return true;
        }
        catch (Throwable ex) {
            return false;
        }
    }

    public static void generalSetup(boolean configure) {
        EntityManagerFactoryManager.get().clear();
        if (!configure) {
            return;
        }
        System.setProperty(BOOTSTRAP_SERVERS, bootstrapServers);
        System.setProperty(CLIENT_ID, "test_jbpm");
        KafkaFixture.createTopics();
    }

    public static void createTopics() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        try (AdminClient adminClient = AdminClient.create((Properties)props);){
            adminClient.createTopics(Arrays.asList(new NewTopic(PROCESSES_TOPIC, 1, 1), new NewTopic(TASKS_TOPIC, 1, 1), new NewTopic(CASES_TOPIC, 1, 1), new NewTopic(CUSTOM_PROCESSES_TOPIC, 1, 1))).all().get(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            Assert.fail((String)("Exception when createTopics: " + e.getMessage()));
        }
    }

    public String setup(DeploymentService ds, String artifactId) {
        KieJarBuildHelper.createKieJar(PATH + artifactId);
        this.unit = new KModuleDeploymentUnit(GROUP_ID, artifactId, VERSION);
        ds.deploy((DeploymentUnit)this.unit);
        return this.unit.getIdentifier();
    }

    protected void cleanup(DeploymentService ds) {
        if (ds != null) {
            ds.undeploy((DeploymentUnit)this.unit);
        }
    }

    protected Map<String, Object> initParameters() {
        HashMap<String, Object> parameters = new HashMap<String, Object>();
        parameters.put(INITIATOR, CHINESE_INITIATOR);
        parameters.put(NULL_VAR, null);
        return parameters;
    }

    protected ListAppender<ILoggingEvent> addLogAppender() {
        Logger logger = (Logger)LoggerFactory.getLogger(KafkaEventEmitter.class);
        ListAppender listAppender = new ListAppender();
        listAppender.start();
        logger.addAppender((Appender)listAppender);
        return listAppender;
    }

    protected Optional<ILoggingEvent> getLog(ListAppender<ILoggingEvent> listAppender) {
        Optional<ILoggingEvent> logEvent = listAppender.list.stream().filter(log -> log.getLevel() == Level.ERROR).findAny();
        Assert.assertTrue((String)"no trace printed when failed", (boolean)logEvent.isPresent());
        return logEvent;
    }

    protected CaseFile caseFile(int valueSize) {
        HashMap<String, String> data = new HashMap<String, String>();
        data.put(VAR_KEY, VAR_VALUE);
        data.put(LARGE_VAR, RandomStringUtils.random((int)valueSize));
        return CaseFile.builder().data(data).build();
    }

    protected Map<String, OrganizationalEntity> roleAssignments() {
        HashMap<String, OrganizationalEntity> roleAssignments = new HashMap<String, OrganizationalEntity>();
        roleAssignments.put(OWNER, (OrganizationalEntity)new UserImpl(YODA));
        roleAssignments.put(CONTACT, (OrganizationalEntity)new UserImpl(JOHN));
        return roleAssignments;
    }

    protected void consumeAndAssertRecords(String topic, String type, int state, int expectedProcesses) {
        ConsumerRecords records = this.consumeMessages(topic);
        Assert.assertEquals((long)expectedProcesses, (long)records.count());
        if (records.iterator().hasNext()) {
            this.assertRecord((ConsumerRecord<String, byte[]>)((ConsumerRecord)records.iterator().next()), topic, type, state);
        }
    }

    protected <T> ConsumerRecords<String, T> consumeMessages(String topic) {
        try (KafkaConsumer<String, T> consumer = this.createConsumer(topic);){
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(10L));
            consumer.commitSync();
            ConsumerRecords consumerRecords = records;
            return consumerRecords;
        }
    }

    protected <T> KafkaConsumer<String, T> createConsumer(String topic) {
        KafkaConsumer consumer = new KafkaConsumer(this.consumerProperties());
        consumer.subscribe(Collections.singletonList(topic));
        return consumer;
    }

    protected Properties consumerProperties() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", "jbpm_group");
        props.setProperty("key.deserializer", StringDeserializer.class.getName());
        props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        props.setProperty("auto.offset.reset", "earliest");
        return props;
    }

    protected void assertRecord(ConsumerRecord<String, byte[]> record, String topic, String type, int status) {
        String artifactId = PROCESS_TYPE.equals(type) ? EVALUATION_PROCESS_ID : USER_TASK_CASE;
        Map<String, Object> event = this.getJsonObject(record);
        Assert.assertNull((Object)record.key());
        Assert.assertEquals((Object)topic, (Object)record.topic());
        Assert.assertEquals((Object)type, (Object)event.get("type"));
        Assert.assertTrue((boolean)event.get("source").toString().contains("/process/" + artifactId));
        Assert.assertTrue((boolean)(event.get("data") instanceof Map));
        Map data = (Map)event.get("data");
        Assert.assertTrue((boolean)data.get("compositeId").toString().contains("SpringBoot_"));
        Assert.assertEquals((Object)("org.kie.server.testing:" + artifactId + ":1.0.0"), data.get("containerId"));
        Assert.assertNotNull(data.get("id"));
        Assert.assertEquals((Object)-1, data.get("parentId"));
        Assert.assertTrue((boolean)(data.get(VARIABLES) instanceof Map));
        Map variables = (Map)data.get(VARIABLES);
        Assert.assertFalse((boolean)variables.isEmpty());
        if (PROCESS_TYPE.equals(type)) {
            Assert.assertEquals((Object)status, data.get("state"));
            Assert.assertEquals((Object)EVALUATION_PROCESS_ID, data.get("processId"));
            Assert.assertEquals((Object)CHINESE_INITIATOR, data.get(INITIATOR));
            Assert.assertEquals((Object)"1", data.get("processVersion"));
            Assert.assertNotNull(data.get("correlationKey"));
            Assert.assertEquals((Object)EVALUATION_DESC, data.get("processInstanceDescription"));
            Assert.assertEquals((Object)EVALUATION_DESC, data.get("processName"));
            Assert.assertEquals((long)1L, (long)variables.size());
            Assert.assertEquals((Object)CHINESE_INITIATOR, variables.get(INITIATOR));
            Assert.assertNull(variables.get(NULL_VAR));
        } else {
            Assert.assertEquals((Object)status, data.get("caseStatus"));
            Assert.assertEquals((Object)USER_TASK_CASE, data.get("caseDefinitionId"));
            Assert.assertEquals((Object)"Simple Case with User Tasks", data.get("caseDefinitionName"));
            Assert.assertEquals((Object)YODA, data.get(OWNER));
            Assert.assertTrue((boolean)data.get("caseId").toString().contains("HR-0000000"));
            Assert.assertEquals((Object)"Case first case started", data.get("caseDescription"));
            Assert.assertTrue((boolean)(data.get(CASE_VARIABLES) instanceof Map));
            Map caseVariables = (Map)data.get(CASE_VARIABLES);
            Assert.assertTrue((!caseVariables.isEmpty() ? 1 : 0) != 0);
            Assert.assertEquals((Object)VAR_VALUE, caseVariables.get(VAR_KEY));
            Assert.assertEquals((long)50000L, (long)caseVariables.get(LARGE_VAR).toString().length());
            MatcherAssert.assertThat((Object)((List)data.get("participants")), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{YODA, JOHN}));
        }
    }

    protected Map<String, Object> getJsonObject(ConsumerRecord<String, byte[]> record) {
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
        ObjectMapper mapper = new ObjectMapper().setDateFormat((DateFormat)dateFormat).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        Map jsonEvent = null;
        try {
            jsonEvent = (Map)mapper.readValue((byte[])record.value(), Map.class);
        }
        catch (IOException e) {
            Assert.fail((String)("Exception when reading value: " + e.getMessage()));
        }
        return jsonEvent;
    }

    protected Map<String, Long> groupRecordsByField(ConsumerRecords<String, byte[]> records, String field) {
        return StreamSupport.stream(records.spliterator(), true).map(r -> (Map)this.getJsonObject((ConsumerRecord<String, byte[]>)r).get("data")).collect(Collectors.groupingBy(map -> map.get(field).toString(), Collectors.counting()));
    }
}

