/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.kafka.KafkaClient;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.VerificationMode;

@Disabled(value="Causing issues on Quarkus Platform CI")
@ExtendWith(value={MockitoExtension.class})
public class KafkaClientTest {
    private static final String TOPIC = "my-topic";
    private static final String MESSAGE_TO_CONSUME = "my-message-to-consume";
    private static final String MESSAGE_TO_PRODUCE = "my-message-to-produce";
    @Mock
    private KafkaProducer<String, String> producer;
    @Mock
    private KafkaConsumer<String, String> consumer;
    private KafkaClient client;
    private List<String> messages;
    private CountDownLatch waiter;

    @BeforeEach
    public void setup() {
        this.messages = new ArrayList<String>();
        this.waiter = new CountDownLatch(1);
    }

    @Test
    public void shouldConsumeMessage() throws InterruptedException {
        this.givenKafkaClient();
        this.whenConsume();
        this.thenConsumerIsSubscribed();
        this.thenMessageIsReceived();
    }

    @Test
    public void shouldProduceMessage() {
        this.givenKafkaClient();
        this.whenProduceMessage();
        this.thenProducerIsInvoked();
    }

    @Test
    public void shouldCloseWhenShutdown() {
        this.givenKafkaClient();
        this.whenShutdown();
        this.thenProducerIsClosed();
        this.thenConsumerIsClosed();
    }

    private void givenKafkaClient() {
        this.client = new KafkaClient(this.producer, this.consumer);
    }

    private void whenShutdown() {
        this.client.shutdown();
    }

    private void whenConsume() {
        ConsumerRecord record = (ConsumerRecord)Mockito.mock(ConsumerRecord.class);
        ConsumerRecords records = (ConsumerRecords)Mockito.mock(ConsumerRecords.class);
        Mockito.lenient().when((Object)records.spliterator()).thenReturn(Collections.singleton(record).spliterator());
        Mockito.lenient().when((Object)((String)record.value())).thenReturn((Object)MESSAGE_TO_CONSUME);
        Mockito.lenient().when((Object)this.consumer.poll((Duration)ArgumentMatchers.any(Duration.class))).thenReturn((Object)records);
        this.client.consume(TOPIC, message -> {
            this.messages.add((String)message);
            this.waiter.countDown();
        });
    }

    private void whenProduceMessage() {
        this.client.produce(MESSAGE_TO_PRODUCE, TOPIC);
    }

    private void thenProducerIsInvoked() {
        ((KafkaProducer)Mockito.verify(this.producer)).send((ProducerRecord)ArgumentMatchers.eq((Object)new ProducerRecord(TOPIC, (Object)MESSAGE_TO_PRODUCE)), (Callback)ArgumentMatchers.any());
    }

    private void thenConsumerIsSubscribed() {
        ((KafkaConsumer)Mockito.verify(this.consumer)).subscribe(ArgumentMatchers.anyCollection());
    }

    private void thenProducerIsClosed() {
        ((KafkaProducer)Mockito.verify(this.producer)).close();
    }

    private void thenConsumerIsClosed() {
        ((KafkaConsumer)Mockito.verify(this.consumer)).close();
    }

    private void thenMessageIsReceived() throws InterruptedException {
        this.waiter.await(5000L, TimeUnit.MILLISECONDS);
        ((KafkaConsumer)Mockito.verify(this.consumer, (VerificationMode)Mockito.atLeastOnce())).commitSync();
        Assertions.assertTrue((boolean)this.messages.contains(MESSAGE_TO_CONSUME));
    }
}

