package org.kie.kogito.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@Disabled("Causing issues on Quarkus Platform CI")
@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/kie/kogito/kafka/KafkaClientTest.class */
public class KafkaClientTest {
    private static final String TOPIC = "my-topic";
    private static final String MESSAGE_TO_CONSUME = "my-message-to-consume";
    private static final String MESSAGE_TO_PRODUCE = "my-message-to-produce";

    @Mock
    private KafkaProducer<String, String> producer;

    @Mock
    private KafkaConsumer<String, String> consumer;
    private KafkaClient client;
    private List<String> messages;
    private CountDownLatch waiter;

    @BeforeEach
    public void setup() {
        this.messages = new ArrayList();
        this.waiter = new CountDownLatch(1);
    }

    @Test
    public void shouldConsumeMessage() throws InterruptedException {
        givenKafkaClient();
        whenConsume();
        thenConsumerIsSubscribed();
        thenMessageIsReceived();
    }

    @Test
    public void shouldProduceMessage() {
        givenKafkaClient();
        whenProduceMessage();
        thenProducerIsInvoked();
    }

    @Test
    public void shouldCloseWhenShutdown() {
        givenKafkaClient();
        whenShutdown();
        thenProducerIsClosed();
        thenConsumerIsClosed();
    }

    private void givenKafkaClient() {
        this.client = new KafkaClient(this.producer, this.consumer);
    }

    private void whenShutdown() {
        this.client.shutdown();
    }

    private void whenConsume() {
        ConsumerRecord consumerRecord = (ConsumerRecord) Mockito.mock(ConsumerRecord.class);
        ConsumerRecords consumerRecords = (ConsumerRecords) Mockito.mock(ConsumerRecords.class);
        Mockito.lenient().when(consumerRecords.spliterator()).thenReturn(Collections.singleton(consumerRecord).spliterator());
        Mockito.lenient().when((String) consumerRecord.value()).thenReturn(MESSAGE_TO_CONSUME);
        Mockito.lenient().when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(consumerRecords);
        this.client.consume(TOPIC, str -> {
            this.messages.add(str);
            this.waiter.countDown();
        });
    }

    private void whenProduceMessage() {
        this.client.produce(MESSAGE_TO_PRODUCE, TOPIC);
    }

    private void thenProducerIsInvoked() {
        ((KafkaProducer) Mockito.verify(this.producer)).send((ProducerRecord) ArgumentMatchers.eq(new ProducerRecord(TOPIC, MESSAGE_TO_PRODUCE)), (Callback) ArgumentMatchers.any());
    }

    private void thenConsumerIsSubscribed() {
        ((KafkaConsumer) Mockito.verify(this.consumer)).subscribe(ArgumentMatchers.anyCollection());
    }

    private void thenProducerIsClosed() {
        ((KafkaProducer) Mockito.verify(this.producer)).close();
    }

    private void thenConsumerIsClosed() {
        ((KafkaConsumer) Mockito.verify(this.consumer)).close();
    }

    private void thenMessageIsReceived() throws InterruptedException {
        this.waiter.await(5000L, TimeUnit.MILLISECONDS);
        ((KafkaConsumer) Mockito.verify(this.consumer, Mockito.atLeastOnce())).commitSync();
        Assertions.assertTrue(this.messages.contains(MESSAGE_TO_CONSUME));
    }
}
