package org.kie.kogito.quarkus.workflows;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import java.io.IOException;
import java.time.Duration;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.impl.ByteArrayCloudEventMarshaller;
import org.kie.kogito.test.quarkus.QuarkusTestProperty;
import org.kie.kogito.test.quarkus.kafka.KafkaTypedTestClient;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
/* loaded from: input_file:org/kie/kogito/quarkus/workflows/AsyncAPIIT.class */
class AsyncAPIIT extends AbstractCallbackStateIT {

    @QuarkusTestProperty(name = "kafka.bootstrap.servers")
    String kafkaBootstrapServers;
    private CloudEventMarshaller<byte[]> marshaller;
    private KafkaTypedTestClient<byte[], ByteArraySerializer, ByteArrayDeserializer> kafkaClient;

    AsyncAPIIT() {
    }

    @Override // org.kie.kogito.quarkus.workflows.AbstractCallbackStateIT
    @BeforeEach
    void setup() {
        this.kafkaClient = new KafkaTypedTestClient<>(this.kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class);
        this.marshaller = new ByteArrayCloudEventMarshaller(new ObjectMapper().registerModule(new JavaTimeModule()).registerModule(JsonFormat.getCloudEventJacksonModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS));
    }

    @Override // org.kie.kogito.quarkus.workflows.AbstractCallbackStateIT
    @AfterEach
    void cleanUp() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    void testConsumer() throws IOException {
        String startProcess = AssuredTestUtils.startProcess("asyncEventConsumer");
        this.kafkaClient.produce((byte[]) this.marshaller.marshall(AssuredTestUtils.buildCloudEvent(startProcess, "wait", this.marshaller)), "wait");
        AssuredTestUtils.waitForFinish("asyncEventConsumer", startProcess, Duration.ofSeconds(5L));
    }
}
