package org.kie.kogito.index.messaging;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.index.GraphQLUtils;
import org.kie.kogito.index.InfinispanServerTestResource;
import org.kie.kogito.index.KafkaTestResource;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.index.infinispan.protostream.ProtobufService;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(InfinispanServerTestResource.class), @QuarkusTestResource(KafkaTestResource.class)})
/* loaded from: input_file:org/kie/kogito/index/messaging/ReactiveMessagingEventConsumerKafkaIT.class */
public class ReactiveMessagingEventConsumerKafkaIT {

    @Inject
    ProtobufService protobufService;
    KafkaProducer<String, String> producer;

    @BeforeEach
    public void setup() {
        String property = System.getProperty(KafkaTestResource.KAFKA_BOOTSTRAP_SERVERS, "localhost:9092");
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", property);
        hashMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        hashMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        hashMap.put("acks", "all");
        this.producer = KafkaProducer.create(Vertx.vertx(), hashMap);
    }

    @AfterEach
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    @Test
    public void testProcessInstanceEvent() throws Exception {
        sendProcessInstanceEvent().get(1L, TimeUnit.MINUTES);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("errors[0].message", CoreMatchers.is("Validation error of type FieldUndefined: Field 'Travels' in type 'Query' is undefined @ 'Travels'"), new Object[0]);
        RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getProcessInstanceById("c2fa5c5e-3002-44c7-aef7-bce82297e3fe")).when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.ProcessInstances[0].id", CoreMatchers.is("c2fa5c5e-3002-44c7-aef7-bce82297e3fe"), new Object[0]);
        this.protobufService.registerProtoBufferType(TestUtils.getTravelsProtoBufferFile());
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Travels", Matchers.isA(Collection.class), new Object[0]);
        sendProcessInstanceEvent().get(1L, TimeUnit.MINUTES);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ProcessInstances{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.ProcessInstances.size()", CoreMatchers.is(1), new Object[0]).body("data.ProcessInstances[0].id", CoreMatchers.is("c2fa5c5e-3002-44c7-aef7-bce82297e3fe"), new Object[0]);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Travels[0].id", CoreMatchers.is("f8868a2e-1bbb-47eb-93cf-fa46ff9dbfee"), new Object[0]);
        RestAssured.given().when().get("/metrics", new Object[0]).then().log().ifValidationFails().statusCode(200).body(Matchers.containsString("application_mp_messaging_message_count_total{channel=\"kogito-processdomain-events\"} 2.0"), new Matcher[]{Matchers.containsString("application_mp_messaging_message_count_total{channel=\"kogito-processinstances-events\"} 2.0")});
    }

    private CompletableFuture<Void> sendProcessInstanceEvent() throws Exception {
        String readFileContent = TestUtils.readFileContent("process_instance_event.json");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.producer.write(KafkaProducerRecord.create("kogito-processinstances-events", readFileContent), asyncResult -> {
            if (asyncResult.succeeded()) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(asyncResult.cause());
            }
        });
        return completableFuture;
    }
}
