package org.kie.kogito.jobs.service.messaging;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.response.Response;
import jakarta.inject.Inject;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.api.URIBuilder;
import org.kie.kogito.jobs.api.event.CancelJobRequestEvent;
import org.kie.kogito.jobs.api.event.CreateProcessInstanceJobRequestEvent;
import org.kie.kogito.jobs.api.event.serialization.JobCloudEventSerializer;
import org.kie.kogito.jobs.service.health.HealthCheckUtils;
import org.kie.kogito.jobs.service.resource.BaseKeycloakJobServiceTest;

/* loaded from: input_file:org/kie/kogito/jobs/service/messaging/BaseMessagingApiTest.class */
public abstract class BaseMessagingApiTest {
    private static final String KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_EMITTER = "kogito-job-service-job-request-events-emitter";
    private static final String TEST_SOURCE = "http://testSource";
    private static final String JOB_ID_1 = "JOB_ID_1";
    private static final String JOB_ID_2 = "JOB_ID_2";
    private static final String PROCESS_INSTANCE_ID = "PROCESS_INSTANCE_ID";
    private static final String PROCESS_ID = "PROCESS_ID";
    private static final String ROOT_PROCESS_INSTANCE_ID = "ROOT_PROCESS_INSTANCE_ID";
    private static final String ROOT_PROCESS_ID = "ROOT_PROCESS_ID";
    private static final String KOGITO_ADDONS = "KOGITO_ADDONS";
    private static final String NODE_INSTANCE_ID_1 = "NODE_INSTANCE_ID_1";
    private static final String NODE_INSTANCE_ID_2 = "NODE_INSTANCE_ID_2";
    private static final long REPEAT_INTERVAL = 1000;
    private static final int REPEAT_LIMIT = 3;
    private static final int PRIORITY = 0;
    private static final AtomicInteger CHECK_CALLBACK_NODE_INSTANCE_ID = new AtomicInteger();
    private static final int CALLBACK_EXECUTIONS_QUERY_TIMOUT_IN_SECONDS = 120;
    private static final int CALLBACK_EXECUTIONS_QUERY_POLL_INTERVAL_IN_MILLISECONDS = 3000;
    private static final String CALLBACK_RESOURCE_PATH = "/test/callback/management/jobs";

    @Inject
    @ConfigProperty(name = "quarkus.http.test-port")
    public int port;

    @Inject
    @Channel(KOGITO_JOB_SERVICE_JOB_REQUEST_EVENTS_EMITTER)
    public Emitter<String> jobEventsEmitter;
    private final JobCloudEventSerializer serializer = new JobCloudEventSerializer();

    @BeforeEach
    void init() throws Exception {
        HealthCheckUtils.awaitReadyHealthCheck(2, TimeUnit.MINUTES);
    }

    @Timeout(value = 10, unit = TimeUnit.MINUTES)
    @Test
    protected void createJob() {
        assertCallbackResource();
        this.jobEventsEmitter.send(this.serializer.serialize(CreateProcessInstanceJobRequestEvent.builder().source(URI.create(TEST_SOURCE)).job(new Job(JOB_ID_1, ZonedDateTime.now().plusSeconds(10L), Integer.valueOf(PRIORITY), buildCallbackEndpoint(getCallbackResourceURL(), PROCESS_ID, PROCESS_INSTANCE_ID, NODE_INSTANCE_ID_1), PROCESS_INSTANCE_ID, ROOT_PROCESS_INSTANCE_ID, PROCESS_ID, ROOT_PROCESS_ID, Long.valueOf(REPEAT_INTERVAL), Integer.valueOf(REPEAT_LIMIT), NODE_INSTANCE_ID_1)).processInstanceId(PROCESS_INSTANCE_ID).processId(PROCESS_ID).rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID).rootProcessId(ROOT_PROCESS_ID).kogitoAddons(KOGITO_ADDONS).build()));
        waitUntilResult(() -> {
            return getJobCallbackExecutions(NODE_INSTANCE_ID_1);
        }, str -> {
            return Objects.equals(str, "2");
        }, CALLBACK_EXECUTIONS_QUERY_TIMOUT_IN_SECONDS, CALLBACK_EXECUTIONS_QUERY_POLL_INTERVAL_IN_MILLISECONDS);
    }

    @Timeout(value = 10, unit = TimeUnit.MINUTES)
    @Test
    protected void cancelJob() {
        assertCallbackResource();
        this.jobEventsEmitter.send(this.serializer.serialize(CreateProcessInstanceJobRequestEvent.builder().source(URI.create(TEST_SOURCE)).job(new Job(JOB_ID_2, ZonedDateTime.now().plusDays(1L), Integer.valueOf(PRIORITY), buildCallbackEndpoint(getCallbackResourceURL(), PROCESS_ID, PROCESS_INSTANCE_ID, NODE_INSTANCE_ID_2), PROCESS_INSTANCE_ID, ROOT_PROCESS_INSTANCE_ID, PROCESS_ID, ROOT_PROCESS_ID, Long.valueOf(REPEAT_INTERVAL), Integer.valueOf(REPEAT_LIMIT), NODE_INSTANCE_ID_2)).processInstanceId(PROCESS_INSTANCE_ID).processId(PROCESS_ID).rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID).rootProcessId(ROOT_PROCESS_ID).kogitoAddons(KOGITO_ADDONS).build()));
        String str = "/jobs/JOB_ID_2";
        waitUntilResult(() -> {
            return getJob(str);
        }, response -> {
            return expectedJobExists(JOB_ID_2, response);
        }, CALLBACK_EXECUTIONS_QUERY_TIMOUT_IN_SECONDS, CALLBACK_EXECUTIONS_QUERY_POLL_INTERVAL_IN_MILLISECONDS);
        this.jobEventsEmitter.send(this.serializer.serialize(CancelJobRequestEvent.builder().source(URI.create(TEST_SOURCE)).jobId(JOB_ID_2).build()));
        waitUntilResult(() -> {
            return getJob(str);
        }, response2 -> {
            return expectedJobDontExists(JOB_ID_2, response2);
        }, CALLBACK_EXECUTIONS_QUERY_TIMOUT_IN_SECONDS, CALLBACK_EXECUTIONS_QUERY_POLL_INTERVAL_IN_MILLISECONDS);
    }

    private String getCallbackResourceURL() {
        return "http://localhost:" + this.port;
    }

    private void assertCallbackResource() {
        String str = "chekCallbackNode_" + CHECK_CALLBACK_NODE_INSTANCE_ID.getAndIncrement();
        RestAssured.given().contentType(ContentType.JSON).accept(ContentType.JSON).body("{}").when().post(buildCallbackEndpoint(getCallbackResourceURL(), PROCESS_ID, PROCESS_INSTANCE_ID, str), new Object[PRIORITY]).then().statusCode(BaseKeycloakJobServiceTest.OK_CODE).extract().statusCode();
        Assertions.assertThat(getJobCallbackExecutions(str)).isEqualTo("1");
    }

    private String getJobCallbackExecutions(String str) {
        return RestAssured.given().contentType(ContentType.JSON).accept(ContentType.JSON).when().get(String.format("/test/callback/management/jobs/executions/%s", str), new Object[PRIORITY]).then().statusCode(BaseKeycloakJobServiceTest.OK_CODE).extract().body().asString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Response getJob(String str) {
        return RestAssured.given().contentType(ContentType.JSON).response().contentType(ContentType.JSON).when().get(str, new Object[PRIORITY]).then().extract().response();
    }

    private static String buildCallbackEndpoint(String str, String str2, String str3, String str4) {
        return URIBuilder.toURI(str + "/test/callback/management/jobs/" + str2 + "/instances/" + str3 + "/timers/" + str4).toString();
    }

    private static <T> void waitUntilResult(Supplier<T> supplier, Predicate<T> predicate, int i, int i2) {
        Awaitility.await().pollInterval(i, TimeUnit.MILLISECONDS).atMost(i2, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(predicate.test(supplier.get()));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean expectedJobExists(String str, Response response) {
        if (response.statusCode() == 404 || "".equals(response.asString())) {
            return false;
        }
        return str.equals(response.jsonPath().getString("id"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean expectedJobDontExists(String str, Response response) {
        return response.statusCode() == 404 && ("Job not found id " + str).equals(response.jsonPath().getString("message"));
    }
}
