package org.kie.kogito.examples.demo;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import io.restassured.RestAssured;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.test.springboot.kafka.KafkaTestClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
/* loaded from: input_file:org/kie/kogito/examples/demo/OutboxIT.class */
public class OutboxIT {
    private static final Duration INITIAL_TIMEOUT = Duration.ofSeconds(250);
    private static final Duration TIMEOUT = Duration.ofSeconds(25);
    private static final Duration INTERVAL = Duration.ofSeconds(5);
    private static final String PROCESS_EVENTS_TOPIC = "kogito-processinstances-events";
    private static final String USERTASK_EVENTS_TOPIC = "kogito-usertaskinstances-events";

    @Value("${kogito.port}")
    private int kogitoPort;

    @Value("${debezium.port}")
    private int debeziumPort;

    @Autowired
    private KafkaTestClient kafkaClient;

    @AfterEach
    void close() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    public void testSendProcessEvents() {
        Awaitility.given().ignoreExceptions().await().atMost(INITIAL_TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            RestAssured.given().port(this.kogitoPort).when().get("/orders", new Object[0]).then().statusCode(200);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(INITIAL_TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            RestAssured.given().port(this.debeziumPort).pathParam("connector", "kogito-connector").pathParam("task", 0).when().get("/connectors/{connector}/tasks/{task}/status", new Object[0]).then().statusCode(200).assertThat().body("state", Matchers.equalTo("RUNNING"), new Object[0]);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            RestAssured.given().port(this.debeziumPort).pathParam("connector", "kogito-connector").when().get("/connectors/{connector}/topics", new Object[0]).then().statusCode(200).assertThat().body("kogito-connector.topics", Matchers.hasSize(0), new Object[0]);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            RestAssured.given().port(this.kogitoPort).header("Content-Type", "application/json", new Object[0]).body("{\"approver\" : \"john\", \"order\" : {\"orderNumber\" : \"23570\", \"shipped\" : false}}").when().post("/orders", new Object[0]).then().statusCode(201).assertThat().body("approver", Matchers.equalTo("john"), new Object[0]).body("order.orderNumber", Matchers.equalTo("23570"), new Object[0]).body("order.shipped", Matchers.equalTo(false), new Object[0]);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            RestAssured.given().port(this.debeziumPort).pathParam("connector", "kogito-connector").when().get("/connectors/{connector}/topics", new Object[0]).then().statusCode(200).assertThat().body("kogito-connector.topics", Matchers.hasSize(2), new Object[0]).body("kogito-connector.topics", Matchers.hasItem(PROCESS_EVENTS_TOPIC), new Object[0]).body("kogito-connector.topics", Matchers.hasItem(USERTASK_EVENTS_TOPIC), new Object[0]);
            return true;
        });
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.kafkaClient.consume(PROCESS_EVENTS_TOPIC, str -> {
            String str = (String) JsonPath.read(str, "$.data.variables.order.orderNumber", new Predicate[0]);
            boolean booleanValue = ((Boolean) JsonPath.read(str, "$.data.variables.order.shipped", new Predicate[0])).booleanValue();
            if (!"23570".equals(str) || booleanValue) {
                return;
            }
            atomicInteger.incrementAndGet();
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            return Boolean.valueOf(atomicInteger.intValue() == 2);
        });
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.kafkaClient.consume(USERTASK_EVENTS_TOPIC, str2 -> {
            String str2 = (String) JsonPath.read(str2, "$.data.inputs.input1.orderNumber", new Predicate[0]);
            boolean booleanValue = ((Boolean) JsonPath.read(str2, "$.data.inputs.input1.shipped", new Predicate[0])).booleanValue();
            if (!"23570".equals(str2) || booleanValue) {
                return;
            }
            atomicInteger2.incrementAndGet();
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            return Boolean.valueOf(atomicInteger2.intValue() == 1);
        });
    }
}
