package org.kie.kogito.explainability.messaging;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.test.junit.mockito.InjectMock;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.cloudevents.utils.CloudEventUtils;
import org.kie.kogito.explainability.ExplanationService;
import org.kie.kogito.explainability.api.BaseExplainabilityRequest;
import org.kie.kogito.explainability.api.BaseExplainabilityResult;
import org.kie.kogito.explainability.api.ModelIdentifier;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/explainability/messaging/BaseExplainabilityMessagingHandlerIT.class */
abstract class BaseExplainabilityMessagingHandlerIT {
    protected static final String TOPIC_REQUEST = "trusty-explainability-request-test";
    protected static final String TOPIC_RESULT = "trusty-explainability-result-test";
    protected static final String EXECUTION_ID = "idException";
    protected static final String SERVICE_URL = "http://localhost:8080";

    @InjectMock
    protected ExplanationService explanationService;

    @ConfigProperty(name = "kafka.bootstrap.servers")
    protected String kafkaBootstrapServers;

    @Inject
    protected ObjectMapper objectMapper;
    KafkaTestClient kafkaClient;
    protected static Logger LOGGER = LoggerFactory.getLogger(BaseExplainabilityMessagingHandlerIT.class);
    protected static final ModelIdentifier MODEL_IDENTIFIER = new ModelIdentifier("dmn", "namespace:name");

    @BeforeEach
    public void setup() {
        this.kafkaClient = new KafkaTestClient(this.kafkaBootstrapServers);
    }

    @AfterEach
    public void tearDown() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    void explainabilityRequestIsProcessedAndAResultMessageIsSent() throws Exception {
        BaseExplainabilityRequest buildRequest = buildRequest();
        Mockito.when(this.explanationService.explainAsync((BaseExplainabilityRequest) ArgumentMatchers.any(BaseExplainabilityRequest.class), (Consumer) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(buildResult()));
        this.kafkaClient.produce(ExplainabilityCloudEventBuilder.buildCloudEventJsonString(buildRequest), TOPIC_REQUEST);
        ((ExplanationService) Mockito.verify(this.explanationService, Mockito.timeout(2000L).times(1))).explainAsync((BaseExplainabilityRequest) ArgumentMatchers.any(BaseExplainabilityRequest.class), (Consumer) ArgumentMatchers.any());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.kafkaClient.consume(TOPIC_RESULT, str -> {
            LOGGER.info("Received from kafka: {}", str);
            CloudEventUtils.decode(str).ifPresent(cloudEvent -> {
                try {
                    BaseExplainabilityResult baseExplainabilityResult = (BaseExplainabilityResult) this.objectMapper.readValue(cloudEvent.getData().toBytes(), BaseExplainabilityResult.class);
                    Assertions.assertNotNull(baseExplainabilityResult);
                    assertResult(baseExplainabilityResult);
                    countDownLatch.countDown();
                } catch (IOException e) {
                    LOGGER.error("Error parsing {}", str, e);
                    throw new RuntimeException(e);
                }
            });
        });
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        this.kafkaClient.shutdown();
    }

    @Test
    void explainabilityRequestIsProcessedAndAnIntermediateMessageIsSent() throws Exception {
        BaseExplainabilityRequest buildRequest = buildRequest();
        BaseExplainabilityResult buildResult = buildResult();
        ((ExplanationService) Mockito.doAnswer(invocationOnMock -> {
            mockExplainAsyncInvocationWithIntermediateResults((Consumer) invocationOnMock.getArguments()[1]);
            return CompletableFuture.completedFuture(buildResult);
        }).when(this.explanationService)).explainAsync((BaseExplainabilityRequest) ArgumentMatchers.any(BaseExplainabilityRequest.class), (Consumer) ArgumentMatchers.any());
        this.kafkaClient.produce(ExplainabilityCloudEventBuilder.buildCloudEventJsonString(buildRequest), TOPIC_REQUEST);
        ((ExplanationService) Mockito.verify(this.explanationService, Mockito.timeout(2000L).times(1))).explainAsync((BaseExplainabilityRequest) ArgumentMatchers.any(BaseExplainabilityRequest.class), (Consumer) ArgumentMatchers.any());
        CountDownLatch countDownLatch = new CountDownLatch(getTotalExpectedEventCountWithIntermediateResults());
        this.kafkaClient.consume(TOPIC_RESULT, str -> {
            LOGGER.info("Received from kafka: {}", str);
            CloudEventUtils.decode(str).ifPresent(cloudEvent -> {
                try {
                    BaseExplainabilityResult baseExplainabilityResult = (BaseExplainabilityResult) this.objectMapper.readValue(cloudEvent.getData().toBytes(), BaseExplainabilityResult.class);
                    Assertions.assertNotNull(baseExplainabilityResult);
                    assertResult(baseExplainabilityResult);
                    countDownLatch.countDown();
                } catch (IOException e) {
                    LOGGER.error("Error parsing {}", str, e);
                    throw new RuntimeException(e);
                }
            });
        });
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        this.kafkaClient.shutdown();
    }

    protected abstract BaseExplainabilityRequest buildRequest();

    protected abstract BaseExplainabilityResult buildResult();

    protected abstract void assertResult(BaseExplainabilityResult baseExplainabilityResult);

    protected abstract int getTotalExpectedEventCountWithIntermediateResults();

    protected abstract void mockExplainAsyncInvocationWithIntermediateResults(Consumer<BaseExplainabilityResult> consumer);
}
