package org.kie.kogito.index.messaging;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import java.time.Duration;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.index.DataIndexStorageService;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.kafka.KafkaClient;

/* loaded from: input_file:org/kie/kogito/index/messaging/AbstractMessagingConsumerKafkaIT.class */
public abstract class AbstractMessagingConsumerKafkaIT {
    Duration timeout = Duration.ofSeconds(30);

    @ConfigProperty(name = "kafka.bootstrap.servers", defaultValue = "localhost:9092")
    public String kafkaBootstrapServers;

    @Inject
    public DataIndexStorageService cacheService;
    KafkaClient kafkaClient;

    @BeforeEach
    void setup() {
        this.kafkaClient = new KafkaClient(this.kafkaBootstrapServers);
        this.cacheService.getJobsCache().clear();
        this.cacheService.getProcessInstancesCache().clear();
        this.cacheService.getUserTaskInstancesCache().clear();
    }

    @AfterEach
    void close() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
        this.cacheService.getJobsCache().clear();
        this.cacheService.getProcessInstancesCache().clear();
        this.cacheService.getUserTaskInstancesCache().clear();
    }

    @Test
    void testProcessInstanceEvent() throws Exception {
        sendProcessInstanceEvent();
        String str = "c2fa5c5e-3002-44c7-aef7-bce82297e3fe";
        Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
            RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ ProcessInstances { id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.ProcessInstances.size()", CoreMatchers.is(1), new Object[0]).body("data.ProcessInstances[0].id", CoreMatchers.is(str), new Object[0]);
        });
    }

    @Test
    void testUserTaskInstanceEvent() throws Exception {
        sendUserTaskInstanceEvent();
        String str = "228d5922-5e88-4bfa-8329-7116a5cbe58b";
        Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
            RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ UserTaskInstances { id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.UserTaskInstances.size()", CoreMatchers.is(1), new Object[0]).body("data.UserTaskInstances[0].id", CoreMatchers.is(str), new Object[0]);
        });
    }

    @Test
    void testJobEvent() throws Exception {
        sendJobEvent();
        String str = "8350b8b6-c5d9-432d-a339-a9fc85f642d4_0";
        Awaitility.await().atMost(this.timeout).untilAsserted(() -> {
            RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Jobs { id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Jobs.size()", CoreMatchers.is(1), new Object[0]).body("data.Jobs[0].id", CoreMatchers.is(str), new Object[0]);
        });
    }

    private void sendUserTaskInstanceEvent() throws Exception {
        send("user_task_instance_event.json", "kogito-usertaskinstances-events");
    }

    private void sendProcessInstanceEvent() throws Exception {
        send("process_instance_event.json", "kogito-processinstances-events");
    }

    private void sendJobEvent() throws Exception {
        send("job_event.json", "kogito-jobs-events");
    }

    private void send(String str, String str2) throws Exception {
        this.kafkaClient.produce(TestUtils.readFileContent(str), str2);
    }
}
