package org.acme.travel;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.After;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.kie.kogito.kafka.KafkaClient;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
/* loaded from: input_file:org/acme/travel/MessagingIT.class */
public class MessagingIT {
    public static final String TOPIC_PRODUCER = "travellers";
    public static final String TOPIC_CONSUMER = "processedtravellers";
    private static Logger LOGGER = LoggerFactory.getLogger(MessagingIT.class);

    @Inject
    private ObjectMapper objectMapper;
    public KafkaClient kafkaClient;

    @ConfigProperty(name = "kafka.bootstrap.servers")
    private String kafkaBootstrapServers;

    @Test
    public void testProcess() throws InterruptedException {
        this.objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
        this.objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule());
        this.kafkaClient = new KafkaClient(this.kafkaBootstrapServers);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.kafkaClient.consume(TOPIC_CONSUMER, str -> {
            LOGGER.info("Received from kafka: {}", str);
            try {
                Traveller traveller = (Traveller) this.objectMapper.readValue(((JsonNode) this.objectMapper.readValue(str, JsonNode.class)).get("data").toString(), Traveller.class);
                Assertions.assertTrue(traveller.isProcessed());
                Assertions.assertTrue(traveller.getFirstName().matches("Name[0-9]+"));
                Assertions.assertTrue(traveller.getLastName().matches("LastName[0-9]+"));
                Assertions.assertTrue(traveller.getEmail().matches("email[0-9]+"));
                Assertions.assertTrue(traveller.getNationality().matches("Nationality[0-9]+"));
                countDownLatch.countDown();
            } catch (JsonProcessingException e) {
                LOGGER.error("Error parsing {}", str, e);
                Assertions.fail(e);
            }
        });
        IntStream.range(0, 3).mapToObj(i -> {
            return new Traveller("Name" + i, "LastName" + i, "email" + i, "Nationality" + i);
        }).forEach(traveller -> {
            this.kafkaClient.produce(generateCloudEvent(traveller), TOPIC_PRODUCER);
        });
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertEquals(0L, countDownLatch.getCount());
    }

    private String generateCloudEvent(Traveller traveller) {
        Assertions.assertFalse(traveller.isProcessed());
        try {
            return this.objectMapper.writeValueAsString(CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("")).withType(TOPIC_PRODUCER).withTime(OffsetDateTime.now()).withData(this.objectMapper.writeValueAsString(traveller).getBytes()).build());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @After
    public void stop() {
        Optional.ofNullable(this.kafkaClient).ifPresent((v0) -> {
            v0.shutdown();
        });
    }
}
