package org.kie.kogito.quarkus.workflows;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonCloudEventData;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.test.quarkus.QuarkusTestProperty;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
/* loaded from: input_file:org/kie/kogito/quarkus/workflows/CorrelationIT.class */
public class CorrelationIT {
    public static final String USER_ID = "userid";
    public static final String PROCESS_URL = "/correlation";
    public static final String PROCESS_GET_BY_ID_URL = "/correlation/{id}";
    public static final String CORRELATION_EVENT_TYPE = "correlation_event_type";
    public static final String CORRELATION_EVENT_TOPIC = "correlation_event_type";
    public static final String START_EVENT_TYPE = "correlation_start_event_type";
    public static final String START_EVENT_TOPIC = "correlation_start_event_type";
    private static final Logger LOGGER = LoggerFactory.getLogger(CorrelationIT.class);
    private KafkaTestClient kafkaClient;

    @QuarkusTestProperty(name = "kafka.bootstrap.servers")
    private String kafkaBootstrapServers;
    private ObjectMapper objectMapper;

    @BeforeEach
    void setup() {
        this.kafkaClient = new KafkaTestClient(this.kafkaBootstrapServers);
        this.objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()).registerModule(JsonFormat.getCloudEventJacksonModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    }

    @AfterEach
    public void cleanup() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    void correlationEventTest() throws Exception {
        String uuid = UUID.randomUUID().toString();
        LOGGER.debug("Sending create correlation workflow event");
        this.kafkaClient.produce(this.objectMapper.writeValueAsString(CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("")).withType("correlation_start_event_type").withTime(OffsetDateTime.now()).withExtension(USER_ID, uuid).withData(JsonCloudEventData.wrap(this.objectMapper.createObjectNode().put("message", "Starting workflow using correlation"))).build()), "correlation_start_event_type");
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().with().pollDelay(2L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).atMost(2L, TimeUnit.MINUTES).until(() -> {
            String processInstance = WorkflowTestUtils.getProcessInstance(PROCESS_URL);
            LOGGER.debug("Created workflow instance id = " + processInstance);
            atomicReference.set(processInstance);
            return Boolean.valueOf(processInstance != null);
        });
        WorkflowTestUtils.assertProcessInstanceExists(PROCESS_GET_BY_ID_URL, (String) atomicReference.get());
        LOGGER.debug("Sending correlation event to complete Workflow {}", atomicReference.get());
        this.kafkaClient.produce(this.objectMapper.writeValueAsString(CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("")).withType("correlation_event_type").withTime(OffsetDateTime.now()).withExtension(USER_ID, uuid).withData(JsonCloudEventData.wrap(this.objectMapper.createObjectNode().put("message", "Hello using correlation"))).build()), "correlation_event_type");
        WorkflowTestUtils.assertProcessInstanceHasFinished(PROCESS_GET_BY_ID_URL, (String) atomicReference.get(), 1L, 180L);
        LOGGER.debug("Workflow {} completed", atomicReference.get());
    }
}
