/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.process.workitem.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.drools.core.process.instance.impl.WorkItemImpl;
import org.jbpm.bpmn2.handler.WorkItemHandlerRuntimeException;
import org.jbpm.process.workitem.core.TestWorkItemManager;
import org.jbpm.process.workitem.kafka.KafkaWorkItemHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.kie.api.runtime.process.WorkItem;
import org.kie.api.runtime.process.WorkItemManager;
import org.mockito.Mockito;

public class KafkaWorkItemHandlerTest {
    private static final String TOPIC_FIELD = "Topic";
    private static final String KEY_FIELD = "Key";
    private static final String VALUE_FIELD = "Value";
    private static final String TOPIC = "myTopic";
    private static final String KEY = "1";
    private static final String VALUE = "Sample";
    @Rule
    public final ExpectedException exception = ExpectedException.none();
    private TestWorkItemManager manager;
    private WorkItemImpl workItem;
    private KafkaWorkItemHandler handler;
    private MockProducer<String, String> mockProducerString;
    private MockProducer<Integer, Integer> mockProducerInteger;

    @Before
    public void init() {
        this.manager = new TestWorkItemManager();
        this.initWorkItem();
        this.buildKafkaWIH(true);
    }

    @After
    public void cleanup() {
        if (this.mockProducerString != null) {
            this.mockProducerString.close();
        }
        if (this.mockProducerInteger != null) {
            this.mockProducerInteger.close();
        }
    }

    @Test
    public void testSendMessage() throws Exception {
        this.assertResultSuccessAfterExecuteWorkItem();
    }

    @Test
    public void testSendMessageIntegerSerializer() throws Exception {
        this.buildKafkaWIHInteger(true);
        this.workItem.setParameter(KEY_FIELD, (Object)2);
        this.workItem.setParameter(VALUE_FIELD, (Object)2);
        this.assertResultSuccessAfterExecuteWorkItem();
    }

    @Test
    public void testMissingRequiredParams() throws Exception {
        WorkItemImpl emptyWorkItem = new WorkItemImpl();
        this.assertExceptionAfterExecuteWorkItem(emptyWorkItem);
    }

    @Test
    public void testNullParam() throws Exception {
        this.workItem.setParameter(VALUE_FIELD, null);
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testWrongTypeKey() throws Exception {
        this.buildKafkaWIHInteger(true);
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testWrongTypeValue() throws Exception {
        this.buildKafkaWIHInteger(true);
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testTimeoutException() throws Exception {
        Producer mockProducerKafka = (Producer)Mockito.mock(KafkaProducer.class);
        Future future = (Future)Mockito.mock(Future.class);
        Mockito.when((Object)mockProducerKafka.send((ProducerRecord)Mockito.any())).thenReturn((Object)future);
        Mockito.when(future.get()).thenThrow(new Throwable[]{new TimeoutException("timeout")});
        this.handler = new KafkaWorkItemHandler(mockProducerKafka);
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testExceptionHandlingDuringSend() throws Exception {
        this.buildKafkaWIH(false);
        this.sendErrorLater(100);
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testExceptionHandlingWhenClosed() throws Exception {
        this.buildKafkaWIH(true);
        this.mockProducerString.close();
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testExceptionHandlingWhenFenced() throws Exception {
        this.buildKafkaWIH(true);
        this.mockProducerString.initTransactions();
        this.mockProducerString.fenceProducer();
        this.assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    private void initWorkItem() {
        this.workItem = new WorkItemImpl();
        this.workItem.setParameter(TOPIC_FIELD, (Object)TOPIC);
        this.workItem.setParameter(KEY_FIELD, (Object)KEY);
        this.workItem.setParameter(VALUE_FIELD, (Object)VALUE);
    }

    private void buildKafkaWIH(boolean autocomplete) {
        this.mockProducerString = new MockProducer(autocomplete, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        this.handler = new KafkaWorkItemHandler(this.mockProducerString);
    }

    private void buildKafkaWIHInteger(boolean autocomplete) {
        PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, null, null, null);
        Cluster cluster = new Cluster(null, Collections.emptyList(), Arrays.asList(partitionInfo), Collections.emptySet(), Collections.emptySet());
        this.mockProducerInteger = new MockProducer(cluster, autocomplete, (Partitioner)new DefaultPartitioner(), (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
        this.handler = new KafkaWorkItemHandler(this.mockProducerInteger);
    }

    private void assertResultSuccessAfterExecuteWorkItem() {
        this.handler.executeWorkItem((WorkItem)this.workItem, (WorkItemManager)this.manager);
        this.assertWorkItemResults(1);
        Assert.assertEquals((Object)"success", this.manager.getResults().entrySet().stream().map(Map.Entry::getValue).findFirst().get().get("Result"));
    }

    private void assertExceptionAfterExecuteWorkItem(WorkItemImpl workItem) {
        this.exception.expect(WorkItemHandlerRuntimeException.class);
        this.handler.executeWorkItem((WorkItem)workItem, (WorkItemManager)this.manager);
        this.assertWorkItemResults(0);
    }

    private void assertWorkItemResults(int expectedResultItems) {
        Assert.assertNotNull((Object)this.manager.getResults());
        Assert.assertEquals((long)expectedResultItems, (long)this.manager.getResults().size());
    }

    private void sendErrorLater(int time) {
        new Thread(() -> {
            CountDownLatch lock = new CountDownLatch(1);
            try {
                lock.await(time, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.mockProducerString.errorNext(new RuntimeException("Error during send"));
        }).start();
    }
}

