package org.jbpm.process.workitem.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.drools.core.process.instance.impl.WorkItemImpl;
import org.jbpm.bpmn2.handler.WorkItemHandlerRuntimeException;
import org.jbpm.process.workitem.core.TestWorkItemManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/jbpm/process/workitem/kafka/KafkaWorkItemHandlerTest.class */
public class KafkaWorkItemHandlerTest {
    private static final String TOPIC_FIELD = "Topic";
    private static final String KEY_FIELD = "Key";
    private static final String VALUE_FIELD = "Value";
    private static final String TOPIC = "myTopic";
    private static final String KEY = "1";
    private static final String VALUE = "Sample";

    @Rule
    public final ExpectedException exception = ExpectedException.none();
    private TestWorkItemManager manager;
    private WorkItemImpl workItem;
    private KafkaWorkItemHandler handler;
    private MockProducer<String, String> mockProducerString;
    private MockProducer<Integer, Integer> mockProducerInteger;

    @Before
    public void init() {
        this.manager = new TestWorkItemManager();
        initWorkItem();
        buildKafkaWIH(true);
    }

    @After
    public void cleanup() {
        if (this.mockProducerString != null) {
            this.mockProducerString.close();
        }
        if (this.mockProducerInteger != null) {
            this.mockProducerInteger.close();
        }
    }

    @Test
    public void testSendMessage() throws Exception {
        assertResultSuccessAfterExecuteWorkItem();
    }

    @Test
    public void testSendMessageIntegerSerializer() throws Exception {
        buildKafkaWIHInteger(true);
        this.workItem.setParameter(KEY_FIELD, 2);
        this.workItem.setParameter(VALUE_FIELD, 2);
        assertResultSuccessAfterExecuteWorkItem();
    }

    @Test
    public void testMissingRequiredParams() throws Exception {
        assertExceptionAfterExecuteWorkItem(new WorkItemImpl());
    }

    @Test
    public void testNullParam() throws Exception {
        this.workItem.setParameter(VALUE_FIELD, (Object) null);
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testWrongTypeKey() throws Exception {
        buildKafkaWIHInteger(true);
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testWrongTypeValue() throws Exception {
        buildKafkaWIHInteger(true);
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testTimeoutException() throws Exception {
        Producer producer = (Producer) Mockito.mock(KafkaProducer.class);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(producer.send((ProducerRecord) Mockito.any())).thenReturn(future);
        Mockito.when((RecordMetadata) future.get()).thenThrow(new Throwable[]{new TimeoutException("timeout")});
        this.handler = new KafkaWorkItemHandler(new Properties(), producer);
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testExceptionHandlingDuringSend() throws Exception {
        buildKafkaWIH(false);
        sendErrorLater(100);
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testExceptionHandlingWhenClosed() throws Exception {
        buildKafkaWIH(true);
        this.mockProducerString.close();
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    @Test
    public void testExceptionHandlingWhenFenced() throws Exception {
        buildKafkaWIH(true);
        this.mockProducerString.initTransactions();
        this.mockProducerString.fenceProducer();
        assertExceptionAfterExecuteWorkItem(this.workItem);
    }

    private void initWorkItem() {
        this.workItem = new WorkItemImpl();
        this.workItem.setParameter(TOPIC_FIELD, TOPIC);
        this.workItem.setParameter(KEY_FIELD, KEY);
        this.workItem.setParameter(VALUE_FIELD, VALUE);
    }

    private void buildKafkaWIH(boolean z) {
        this.mockProducerString = new MockProducer<>(z, new StringSerializer(), new StringSerializer());
        this.handler = new KafkaWorkItemHandler(new Properties(), this.mockProducerString);
    }

    private void buildKafkaWIHInteger(boolean z) {
        this.mockProducerInteger = new MockProducer<>(new Cluster((String) null, Collections.emptyList(), Arrays.asList(new PartitionInfo(TOPIC, 0, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet()), z, new DefaultPartitioner(), new IntegerSerializer(), new IntegerSerializer());
        this.handler = new KafkaWorkItemHandler(new Properties(), this.mockProducerInteger);
    }

    private void assertResultSuccessAfterExecuteWorkItem() {
        this.handler.executeWorkItem(this.workItem, this.manager);
        assertWorkItemResults(1);
        Assert.assertEquals("success", ((Map) this.manager.getResults().entrySet().stream().map((v0) -> {
            return v0.getValue();
        }).findFirst().get()).get("Result"));
    }

    private void assertExceptionAfterExecuteWorkItem(WorkItemImpl workItemImpl) {
        this.exception.expect(WorkItemHandlerRuntimeException.class);
        this.handler.executeWorkItem(workItemImpl, this.manager);
        assertWorkItemResults(0);
    }

    private void assertWorkItemResults(int i) {
        Assert.assertNotNull(this.manager.getResults());
        Assert.assertEquals(i, this.manager.getResults().size());
    }

    private void sendErrorLater(int i) {
        new Thread(() -> {
            try {
                new CountDownLatch(1).await(i, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            this.mockProducerString.errorNext(new RuntimeException("Error during send"));
        }).start();
    }
}
