package org.acme.travel.tests.multimessaging.quarkus;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.acme.travel.Traveller;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
/* loaded from: input_file:org/acme/travel/tests/multimessaging/quarkus/MultiMessagingIT.class */
public class MultiMessagingIT {
    public static final String TOPIC_PRODUCER = "travellers";
    public static final String TOPIC_PROCESSED_CONSUMER = "processedtravellers";
    public static final String TOPIC_CANCEL_CONSUMER = "cancelledtravellers";
    private static Logger LOGGER = LoggerFactory.getLogger(MultiMessagingIT.class);
    private ObjectMapper objectMapper;
    public KafkaTestClient kafkaClient;

    @ConfigProperty(name = "kafka.bootstrap.servers")
    private String kafkaBootstrapServers;

    @BeforeEach
    public void setup() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule());
        this.kafkaClient = new KafkaTestClient(this.kafkaBootstrapServers);
    }

    @Test
    public void testProcess() throws InterruptedException {
        this.objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.kafkaClient.consume(Set.of(TOPIC_PROCESSED_CONSUMER, TOPIC_CANCEL_CONSUMER), str -> {
            LOGGER.info("Received from kafka: {}", str);
            try {
                Traveller traveller = (Traveller) this.objectMapper.readValue(((JsonNode) this.objectMapper.readValue(str, JsonNode.class)).get("data").toString(), Traveller.class);
                Assertions.assertEquals(Boolean.valueOf(!traveller.getNationality().equals("American")), Boolean.valueOf(traveller.isProcessed()));
                Assertions.assertTrue(traveller.getFirstName().matches("Name[0-9]+"));
                Assertions.assertTrue(traveller.getLastName().matches("LastName[0-9]+"));
                Assertions.assertTrue(traveller.getEmail().matches("email[0-9]+"));
                countDownLatch.countDown();
            } catch (JsonProcessingException e) {
                LOGGER.error("Error parsing {}", str, e);
                Assertions.fail(e);
            }
        });
        IntStream.range(0, 3).mapToObj(i -> {
            return new Traveller("Name" + i, "LastName" + i, "email" + i, getNationality(i));
        }).forEach(traveller -> {
            this.kafkaClient.produce(generateCloudEvent(traveller), TOPIC_PRODUCER);
        });
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assertions.assertEquals(0L, countDownLatch.getCount());
    }

    private String getNationality(int i) {
        return i % 2 == 0 ? "American" : "Spanish";
    }

    private String generateCloudEvent(Traveller traveller) {
        Assertions.assertFalse(traveller.isProcessed());
        try {
            return this.objectMapper.writeValueAsString(CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("")).withType(TOPIC_PRODUCER).withTime(OffsetDateTime.now()).withData(this.objectMapper.writeValueAsString(traveller).getBytes()).build());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @AfterEach
    public void stop() {
        Optional.ofNullable(this.kafkaClient).ifPresent((v0) -> {
            v0.shutdown();
        });
    }
}
