package org.kie.dmn.kogito.springboot.tracing;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.kafka.KafkaClient;
import org.kie.kogito.tracing.decision.event.CloudEventUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {KogitoSpringbootApplication.class})
/* loaded from: input_file:org/kie/dmn/kogito/springboot/tracing/LoanEligibilityIT.class */
public class LoanEligibilityIT {
    public static final String TRACING_TOPIC_NAME = "kogito-tracing-decision";
    private static final Logger LOGGER = LoggerFactory.getLogger(LoanEligibilityIT.class);

    @Container
    public static KafkaContainer kafkaContainer = new KafkaContainer();

    @LocalServerPort
    private int port;

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
        KafkaContainer kafkaContainer2 = kafkaContainer;
        Objects.requireNonNull(kafkaContainer2);
        dynamicPropertyRegistry.add("kogito.addon.tracing.decision.kafka.bootstrapAddress", kafkaContainer2::getBootstrapServers);
    }

    @BeforeEach
    public void setUp() {
        RestAssured.port = this.port;
    }

    @Test
    public void testEvaluateLoanEligibility() throws InterruptedException {
        KafkaClient kafkaClient = new KafkaClient(kafkaContainer.getBootstrapServers());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            kafkaClient.consume(TRACING_TOPIC_NAME, str -> {
                LOGGER.info("Received from kafka: {}", str);
                Optional.ofNullable(CloudEventUtils.decode(str)).ifPresentOrElse(optional -> {
                    countDownLatch.countDown();
                }, () -> {
                    LOGGER.error("Error parsing {}", str);
                });
            });
            RestAssured.given().body("{    \"Client\": {        \"age\": 43,        \"salary\": 1950,        \"existing payments\": 100    },    \"Loan\": {        \"duration\": 15,        \"installment\": 180    },    \"SupremeDirector\" : \"Yes\",    \"Bribe\": 1000}").contentType(ContentType.JSON).when().post("/LoanEligibility", new Object[0]).then().statusCode(200).body("'Decide'", Matchers.is(true), new Object[0]);
            countDownLatch.await(5L, TimeUnit.SECONDS);
            Assertions.assertEquals(countDownLatch.getCount(), 0L);
        } finally {
            kafkaClient.shutdown();
        }
    }
}
