/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.index.messaging;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.response.Response;
import io.restassured.response.ValidatableResponse;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.kafka.KafkaClient;
import org.kie.kogito.persistence.protobuf.ProtobufService;

public abstract class AbstractReactiveMessagingEventConsumerKafkaIT {
    @ConfigProperty(name="kafka.bootstrap.servers")
    String kafkaBootstrapServers;
    @Inject
    ProtobufService protobufService;
    KafkaClient kafkaClient;

    @BeforeEach
    void setup() {
        this.kafkaClient = new KafkaClient(this.kafkaBootstrapServers);
    }

    @AfterEach
    void close() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    void testProcessInstanceEvent() throws Exception {
        this.sendProcessInstanceEvent();
        String processInstanceId = "c2fa5c5e-3002-44c7-aef7-bce82297e3fe";
        this.protobufService.registerProtoBufferType(this.getTestProtobufFileContent());
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Travels", Matchers.isA(Collection.class), new Object[0]);
        this.sendProcessInstanceEvent();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ProcessInstances{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.ProcessInstances.size()", CoreMatchers.is((Object)1), new Object[0])).body("data.ProcessInstances[0].id", CoreMatchers.is((Object)processInstanceId), new Object[0]));
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Travels[0].id", CoreMatchers.is((Object)"f8868a2e-1bbb-47eb-93cf-fa46ff9dbfee"), new Object[0]));
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().when().get("/metrics", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body(Matchers.containsString((String)"application_mp_messaging_message_count_total{channel=\"kogito-processdomain-events\"} 2.0"), new Matcher[]{Matchers.containsString((String)"application_mp_messaging_message_count_total{channel=\"kogito-processinstances-events\"} 2.0")}));
    }

    private void sendProcessInstanceEvent() throws Exception {
        String json = TestUtils.readFileContent("process_instance_event.json");
        this.kafkaClient.produce(json, "kogito-processinstances-events");
    }

    protected abstract String getTestProtobufFileContent() throws Exception;
}

