package org.kie.kogito.index.graphql;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import io.vertx.core.http.WebsocketVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.handler.graphql.ApolloWSMessageType;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import net.javacrumbs.jsonunit.assertj.JsonAssertion;
import net.javacrumbs.jsonunit.assertj.JsonAssertions;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.index.DataIndexStorageService;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.index.model.ProcessInstanceState;
import org.kie.kogito.index.service.AbstractIndexingIT;
import org.kie.kogito.persistence.protobuf.ProtobufService;

/* loaded from: input_file:org/kie/kogito/index/graphql/AbstractWebSocketSubscriptionIT.class */
public abstract class AbstractWebSocketSubscriptionIT extends AbstractIndexingIT {

    @Inject
    public ProtobufService protobufService;

    @Inject
    public Vertx vertx;

    @Inject
    public DataIndexStorageService cacheService;
    private AtomicInteger counter = new AtomicInteger(0);
    private HttpClient httpClient;

    @BeforeEach
    void setup() {
        this.httpClient = this.vertx.createHttpClient(new HttpClientOptions().setDefaultPort(TestUtils.getPortFromConfig()));
    }

    @AfterEach
    void tearDown() {
        this.httpClient.close();
        this.cacheService.getJobsCache().clear();
        this.cacheService.getProcessInstancesCache().clear();
        this.cacheService.getUserTaskInstancesCache().clear();
        if (this.cacheService.getDomainModelCache("travels") != null) {
            this.cacheService.getDomainModelCache("travels").clear();
        }
        if (this.cacheService.getDomainModelCache("deals") != null) {
            this.cacheService.getDomainModelCache("deals").clear();
        }
    }

    @Test
    void testProcessInstanceSubscription() throws Exception {
        String uuid = UUID.randomUUID().toString();
        this.protobufService.registerProtoBufferType(getProcessProtobufFileContent());
        assertProcessInstanceSubscription("travels", uuid, ProcessInstanceState.ACTIVE, "subscription { ProcessInstanceAdded { id, processId, state } }", "ProcessInstanceAdded");
        assertProcessInstanceSubscription("travels", uuid, ProcessInstanceState.COMPLETED, "subscription { ProcessInstanceUpdated { id, processId, state } }", "ProcessInstanceUpdated");
    }

    @Test
    void testUserTaskInstanceSubscription() throws Exception {
        String uuid = UUID.randomUUID().toString();
        String uuid2 = UUID.randomUUID().toString();
        this.protobufService.registerProtoBufferType(getUserTaskProtobufFileContent());
        assertUserTaskInstanceSubscription(uuid, "deals", uuid2, "InProgress", "subscription { UserTaskInstanceAdded { id, processInstanceId, processId, state } }", "UserTaskInstanceAdded");
        assertUserTaskInstanceSubscription(uuid, "deals", uuid2, "Completed", "subscription { UserTaskInstanceUpdated { id, processInstanceId, processId, state } }", "UserTaskInstanceUpdated");
    }

    @Test
    void testJobSubscription() throws Exception {
        String uuid = UUID.randomUUID().toString();
        String uuid2 = UUID.randomUUID().toString();
        assertJobSubscription(uuid, "deals", uuid2, "SCHEDULED", "subscription { JobAdded { id, processInstanceId, processId, status } }", "JobAdded");
        assertJobSubscription(uuid, "deals", uuid2, "EXECUTED", "subscription { JobUpdated { id, processInstanceId, processId, status } }", "JobUpdated");
    }

    @Test
    void testDomainSubscription() throws Exception {
        String uuid = UUID.randomUUID().toString();
        this.protobufService.registerProtoBufferType(getProcessProtobufFileContent());
        assertDomainSubscription("travels", uuid, ProcessInstanceState.ACTIVE, "subscription { TravelsAdded { id, traveller { firstName }, metadata { processInstances { state } } } }", "TravelsAdded");
        assertDomainSubscription("travels", uuid, ProcessInstanceState.COMPLETED, "subscription { TravelsUpdated { id, traveller { firstName }, metadata { processInstances { state } } } }", "TravelsUpdated");
    }

    private void assertDomainSubscription(String str, String str2, ProcessInstanceState processInstanceState, String str3, String str4) throws Exception {
        CompletableFuture<JsonObject> subscribe = subscribe(str3);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Travels{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Travels", CoreMatchers.isA(Collection.class), new Object[0]);
        indexProcessCloudEvent(TestUtils.getProcessCloudEvent(str, str2, processInstanceState, null, null, null));
        JsonAssertions.assertThatJson(subscribe.get(1L, TimeUnit.MINUTES).toString()).and(new JsonAssertion[]{jsonAssert -> {
            jsonAssert.node("type").isEqualTo("data");
        }, jsonAssert2 -> {
            jsonAssert2.node("payload.data." + str4 + ".id").isEqualTo(str2);
        }, jsonAssert3 -> {
            jsonAssert3.node("payload.data." + str4 + ".metadata.processInstances[0].state").isEqualTo(processInstanceState.name());
        }, jsonAssert4 -> {
            jsonAssert4.node("payload.data." + str4 + ".traveller.firstName").isEqualTo("Maciej");
        }});
    }

    private void assertProcessInstanceSubscription(String str, String str2, ProcessInstanceState processInstanceState, String str3, String str4) throws Exception {
        CompletableFuture<JsonObject> subscribe = subscribe(str3);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Travels{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Travels", CoreMatchers.isA(Collection.class), new Object[0]);
        indexProcessCloudEvent(TestUtils.getProcessCloudEvent(str, str2, processInstanceState, null, null, null));
        JsonAssertions.assertThatJson(subscribe.get(1L, TimeUnit.MINUTES).toString()).and(new JsonAssertion[]{jsonAssert -> {
            jsonAssert.node("type").isEqualTo("data");
        }, jsonAssert2 -> {
            jsonAssert2.node("payload.data." + str4 + ".id").isEqualTo(str2);
        }, jsonAssert3 -> {
            jsonAssert3.node("payload.data." + str4 + ".processId").isEqualTo(str);
        }, jsonAssert4 -> {
            jsonAssert4.node("payload.data." + str4 + ".state").isEqualTo(processInstanceState.name());
        }});
    }

    private void assertUserTaskInstanceSubscription(String str, String str2, String str3, String str4, String str5, String str6) throws Exception {
        CompletableFuture<JsonObject> subscribe = subscribe(str5);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Deals{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Deals", CoreMatchers.isA(Collection.class), new Object[0]);
        indexUserTaskCloudEvent(TestUtils.getUserTaskCloudEvent(str, str2, str3, null, null, str4));
        JsonAssertions.assertThatJson(subscribe.get(1L, TimeUnit.MINUTES).toString()).and(new JsonAssertion[]{jsonAssert -> {
            jsonAssert.node("type").isEqualTo("data");
        }, jsonAssert2 -> {
            jsonAssert2.node("payload.data." + str6 + ".id").isEqualTo(str);
        }, jsonAssert3 -> {
            jsonAssert3.node("payload.data." + str6 + ".processInstanceId").isEqualTo(str3);
        }, jsonAssert4 -> {
            jsonAssert4.node("payload.data." + str6 + ".processId").isEqualTo(str2);
        }, jsonAssert5 -> {
            jsonAssert5.node("payload.data." + str6 + ".state").isEqualTo(str4);
        }});
    }

    private void assertJobSubscription(String str, String str2, String str3, String str4, String str5, String str6) throws Exception {
        CompletableFuture<JsonObject> subscribe = subscribe(str5);
        RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Jobs{ id } }\" }").when().post("/graphql", new Object[0]).then().log().ifValidationFails().statusCode(200).body("data.Jobs", CoreMatchers.isA(Collection.class), new Object[0]);
        indexJobCloudEvent(TestUtils.getJobCloudEvent(str, str2, str3, null, null, str4));
        JsonAssertions.assertThatJson(subscribe.get(1L, TimeUnit.MINUTES).toString()).and(new JsonAssertion[]{jsonAssert -> {
            jsonAssert.node("type").isEqualTo("data");
        }, jsonAssert2 -> {
            jsonAssert2.node("payload.data." + str6 + ".id").isEqualTo(str);
        }, jsonAssert3 -> {
            jsonAssert3.node("payload.data." + str6 + ".processInstanceId").isEqualTo(str3);
        }, jsonAssert4 -> {
            jsonAssert4.node("payload.data." + str6 + ".processId").isEqualTo(str2);
        }, jsonAssert5 -> {
            jsonAssert5.node("payload.data." + str6 + ".status").isEqualTo(str4);
        }});
    }

    private CompletableFuture<JsonObject> subscribe(String str) throws Exception {
        CompletableFuture<JsonObject> completableFuture = new CompletableFuture<>();
        CompletableFuture completableFuture2 = new CompletableFuture();
        JsonObject put = new JsonObject().put("type", ApolloWSMessageType.CONNECTION_TERMINATE.getText());
        this.httpClient.webSocket(new WebSocketConnectOptions().setURI("/graphql").setVersion(WebsocketVersion.V08), asyncResult -> {
            if (!asyncResult.succeeded()) {
                asyncResult.cause().printStackTrace();
                completableFuture2.completeExceptionally(asyncResult.cause());
            } else {
                WebSocket webSocket = (WebSocket) asyncResult.result();
                webSocket.handler(buffer -> {
                    String string = buffer.toJsonObject().getString("type");
                    if (ApolloWSMessageType.COMPLETE.getText().equals(string)) {
                        webSocket.write(put.toBuffer());
                        completableFuture.complete(null);
                        return;
                    }
                    if (ApolloWSMessageType.DATA.getText().equals(string)) {
                        webSocket.write(put.toBuffer());
                        completableFuture.complete(buffer.toJsonObject());
                    } else if (ApolloWSMessageType.CONNECTION_ACK.getText().equals(string)) {
                        webSocket.write(new JsonObject().put("id", String.valueOf(this.counter.getAndIncrement())).put("type", ApolloWSMessageType.START.getText()).put("payload", new JsonObject().put("query", str)).toBuffer()).onSuccess(r4 -> {
                            completableFuture2.complete(null);
                        });
                    } else {
                        if (ApolloWSMessageType.CONNECTION_KEEP_ALIVE.getText().equals(string)) {
                            return;
                        }
                        webSocket.write(put.toBuffer());
                        completableFuture.completeExceptionally(new RuntimeException(String.format("Unexpected message type: %s\nMessage: %s", string, buffer.toString())));
                    }
                });
                webSocket.write(new JsonObject().put("type", ApolloWSMessageType.CONNECTION_INIT.getText()).toBuffer());
            }
        });
        completableFuture2.get(1L, TimeUnit.MINUTES);
        return completableFuture;
    }

    protected abstract String getProcessProtobufFileContent() throws Exception;

    protected abstract String getUserTaskProtobufFileContent() throws Exception;
}
