/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.event.emitters.kafka;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.read.ListAppender;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jbpm.event.emitters.kafka.KafkaEventEmitter;
import org.jbpm.persistence.api.integration.EventEmitter;
import org.jbpm.persistence.api.integration.InstanceView;
import org.jbpm.persistence.api.integration.model.CaseInstanceView;
import org.jbpm.persistence.api.integration.model.ProcessInstanceView;
import org.jbpm.persistence.api.integration.model.TaskInstanceView;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.runtime.process.ProcessInstance;
import org.slf4j.LoggerFactory;

public class KafkaEventEmitterTest {
    private MockProducer<String, byte[]> producer;

    @Before
    public void setup() {
        this.producer = new MockProducer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducer() throws IOException, ParseException {
        SimpleDateFormat dateFormat = new SimpleDateFormat(System.getProperty("org.kie.jbpm.event.emitters.kafka.date_format", System.getProperty("org.kie.server.json.date_format", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")));
        ObjectMapper mapper = new ObjectMapper().setDateFormat((DateFormat)dateFormat).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true);
        Date date = new Date();
        ProcessInstanceView piView = new ProcessInstanceView();
        piView.setCompositeId("server-1");
        piView.setContainerId("container-1");
        piView.setCorrelationKey("process-1");
        piView.setDate(date);
        piView.setId(Long.valueOf(1L));
        piView.setInitiator("pepe");
        piView.setParentId(Long.valueOf(0L));
        piView.setProcessId("Test");
        piView.setProcessInstanceDescription("a process");
        piView.setProcessName("Test");
        piView.setProcessVersion("1_0");
        piView.setState(Integer.valueOf(1));
        piView.setVariables(Collections.emptyMap());
        TaskInstanceView taskView = new TaskInstanceView();
        taskView.setProcessId("Test");
        taskView.setProcessInstanceId(Long.valueOf(1L));
        taskView.setActualOwner("pepe");
        CaseInstanceView caseView = new CaseInstanceView();
        caseView.setProcessId("Test");
        caseView.setId(Long.valueOf(1L));
        caseView.setInitiator("pepe");
        System.setProperty("org.kie.jbpm.event.emitters.kafka.topic.cases", "customer-cases");
        try (KafkaEventEmitter emitter = new KafkaEventEmitter(this.producer);){
            this.emit((EventEmitter)emitter, Arrays.asList(piView, taskView, caseView));
            List producedEvents = this.producer.history();
            Assert.assertEquals((long)3L, (long)producedEvents.size());
            ProducerRecord record = (ProducerRecord)producedEvents.get(0);
            Assert.assertEquals((Object)"jbpm-processes-events", (Object)record.topic());
            Map piEvent = (Map)mapper.readValue((byte[])record.value(), Map.class);
            Assert.assertEquals((Object)"process", piEvent.get("type"));
            Assert.assertEquals((Object)"/process/Test/1", piEvent.get("source"));
            Assert.assertTrue((dateFormat.parse(piEvent.get("time").toString()).compareTo(date) >= 0 ? 1 : 0) != 0);
            Assert.assertTrue((boolean)(piEvent.get("data") instanceof Map));
            Map pi = (Map)piEvent.get("data");
            Assert.assertEquals((Object)"server-1", pi.get("compositeId"));
            Assert.assertEquals((Object)"container-1", pi.get("containerId"));
            Assert.assertEquals((Object)"process-1", pi.get("correlationKey"));
            Assert.assertEquals((Object)1, pi.get("id"));
            Assert.assertEquals((Object)0, pi.get("parentId"));
            Assert.assertEquals((Object)"Test", pi.get("processId"));
            Assert.assertEquals((Object)"pepe", pi.get("initiator"));
            Assert.assertEquals((Object)1, pi.get("state"));
            Assert.assertEquals((Object)"a process", pi.get("processInstanceDescription"));
            Assert.assertEquals((Object)"Test", pi.get("processName"));
            Assert.assertEquals((Object)"1_0", pi.get("processVersion"));
            Assert.assertTrue((boolean)(pi.get("variables") instanceof Map));
            Assert.assertTrue((boolean)((Map)pi.get("variables")).isEmpty());
            record = (ProducerRecord)producedEvents.get(1);
            Assert.assertEquals((Object)"jbpm-tasks-events", (Object)record.topic());
            Map taskEvent = (Map)mapper.readValue((byte[])record.value(), Map.class);
            Assert.assertEquals((Object)"task", taskEvent.get("type"));
            Assert.assertEquals((Object)"/process/Test/1", taskEvent.get("source"));
            Assert.assertTrue((boolean)(taskEvent.get("data") instanceof Map));
            Map task = (Map)taskEvent.get("data");
            Assert.assertEquals((Object)"Test", task.get("processId"));
            Assert.assertEquals((Object)1, task.get("processInstanceId"));
            Assert.assertEquals((Object)"pepe", task.get("actualOwner"));
            record = (ProducerRecord)producedEvents.get(2);
            Assert.assertEquals((Object)"customer-cases", (Object)record.topic());
            Map caseEvent = (Map)mapper.readValue((byte[])record.value(), Map.class);
            Assert.assertEquals((Object)"case", caseEvent.get("type"));
            Assert.assertEquals((Object)"/process/Test/1", caseEvent.get("source"));
            Assert.assertTrue((boolean)(caseEvent.get("data") instanceof Map));
            Map case1 = (Map)caseEvent.get("data");
            Assert.assertEquals((Object)"Test", case1.get("caseDefinitionId"));
            Assert.assertEquals((Object)1, case1.get("id"));
            Assert.assertEquals((Object)"pepe", case1.get("owner"));
        }
        finally {
            System.clearProperty("org.kie.jbpm.event.emitters.kafka.topic.cases");
        }
    }

    @Test
    public void testProducerWrongView() throws IOException, ParseException {
        InstanceView<ProcessInstance> piView = new InstanceView<ProcessInstance>(){
            private static final long serialVersionUID = 1L;

            public ProcessInstance getSource() {
                return null;
            }

            public void copyFromSource() {
            }
        };
        try (KafkaEventEmitter emitter = new KafkaEventEmitter(this.producer);){
            this.emit((EventEmitter)emitter, Collections.singletonList(piView));
            List producedEvents = this.producer.history();
            Assert.assertEquals((long)0L, (long)producedEvents.size());
        }
    }

    @Test
    public void testProducerWrongData() throws IOException, ParseException {
        TaskInstanceView taskView = new TaskInstanceView();
        taskView.setProcessId("Test");
        taskView.setProcessInstanceId(Long.valueOf(1L));
        taskView.setActualOwner("pepe");
        ProcessInstanceView processView = new ProcessInstanceView();
        processView.setProcessId("Test");
        processView.setId(Long.valueOf(1L));
        processView.setVariables(Collections.singletonMap("tryIt", new NotSerializable()));
        processView.setInitiator("pepe");
        try (KafkaEventEmitter emitter = new KafkaEventEmitter(this.producer);){
            this.emit((EventEmitter)emitter, Arrays.asList(taskView, processView));
            List producedEvents = this.producer.history();
            Assert.assertEquals((long)1L, (long)producedEvents.size());
        }
    }

    @Test
    public void testProducerExceptionLoggerAtCallback() throws IOException, ParseException {
        Logger logger = (Logger)LoggerFactory.getLogger(KafkaEventEmitter.class);
        ListAppender listAppender = new ListAppender();
        listAppender.start();
        logger.addAppender((Appender)listAppender);
        ProcessInstanceView processView = new ProcessInstanceView();
        processView.setProcessId("Test");
        processView.setId(Long.valueOf(1L));
        IllegalStateException ex = new IllegalStateException("Something bad has happened!!");
        try (KafkaEventEmitter emitter = new KafkaEventEmitter(this.producer);){
            this.emit((EventEmitter)emitter, Collections.singletonList(processView));
            this.producer.errorNext((RuntimeException)ex);
            List producedEvents = this.producer.history();
            Assert.assertEquals((long)1L, (long)producedEvents.size());
        }
        Optional<ILoggingEvent> logEvent = listAppender.list.stream().filter(log -> log.getLevel() == Level.ERROR).findAny();
        Assert.assertTrue((String)"no trace printed when failed", (boolean)logEvent.isPresent());
        Assert.assertTrue((boolean)Arrays.asList(logEvent.get().getArgumentArray()).contains(processView));
        Assert.assertEquals((Object)ex.getClass().getCanonicalName(), (Object)logEvent.get().getThrowableProxy().getClassName());
        Assert.assertEquals((Object)ex.getMessage(), (Object)logEvent.get().getThrowableProxy().getMessage());
    }

    private void emit(EventEmitter emitter, Collection<InstanceView<?>> views) {
        try {
            emitter.deliver(views);
            emitter.apply(views);
        }
        catch (Exception ex) {
            emitter.drop(views);
        }
    }

    private static class NotSerializable {
        private int tryMe;

        private NotSerializable() {
        }

        public int getTryMe() {
            throw new IllegalStateException("NOOOO!!!!");
        }
    }
}

