package org.acme.travel.tests.avro.quarkus;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.kafka.Record;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.acme.travel.AvroUtils;
import org.acme.travel.Traveller;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusTest
@QuarkusTestResource(KafkaQuarkusTestResource.class)
/* loaded from: input_file:org/acme/travel/tests/avro/quarkus/MultiMessagingIT.class */
public class MultiMessagingIT {
    private static final int count = 3;
    private static final CountDownLatch countDownLatch = new CountDownLatch(count);
    private static final Logger logger = LoggerFactory.getLogger(MultiMessagingIT.class);

    @Inject
    @Channel("travellers-out")
    Emitter<byte[]> emitter;

    @Inject
    AvroUtils utils;

    @Incoming("processedtravellers-in")
    public CompletionStage<?> onProcessedEvent(Record<String, byte[]> record) throws IOException {
        logger.info("Event received for processed travellers");
        Assertions.assertEquals("Real Betis Balompie", record.key());
        Traveller traveller = (Traveller) this.utils.readObject((byte[]) record.value(), Traveller.class, new Class[0]);
        logger.info("Event deserialized sucessfully {}", traveller);
        assertTraveller(traveller);
        countDownLatch.countDown();
        logger.info("Count down is {}", Long.valueOf(countDownLatch.getCount()));
        return CompletableFuture.completedStage(null);
    }

    @Incoming("cancelled-in")
    public CompletionStage<?> onCancelledEvent(Message<byte[]> message) throws IOException {
        logger.info("Event received for cancelled travellers");
        Traveller traveller = (Traveller) this.utils.readObject((byte[]) message.getPayload(), Traveller.class, new Class[0]);
        logger.info("Event deserialized sucessfully {}", traveller);
        assertTraveller(traveller);
        countDownLatch.countDown();
        logger.info("Count down is {}", Long.valueOf(countDownLatch.getCount()));
        return CompletableFuture.completedStage(null);
    }

    @Test
    public void testProcess() throws InterruptedException {
        IntStream.range(0, count).mapToObj(i -> {
            return new Traveller("Name" + i, "LastName" + i, "email" + i, getNationality(i));
        }).forEach(traveller -> {
            this.emitter.send(this.utils.writeObject(traveller));
        });
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assertions.assertEquals(0L, countDownLatch.getCount());
    }

    private void assertTraveller(Traveller traveller) {
        Assertions.assertEquals(Boolean.valueOf(!traveller.getNationality().equals("American")), Boolean.valueOf(traveller.isProcessed()));
        Assertions.assertTrue(traveller.getFirstName().matches("Name[0-9]+"));
        Assertions.assertTrue(traveller.getLastName().matches("LastName[0-9]+"));
        Assertions.assertTrue(traveller.getEmail().matches("email[0-9]+"));
    }

    private String getNationality(int i) {
        return i % 2 == 0 ? "American" : "Spanish";
    }
}
