package org.apache.camel.quarkus.component.kafka;

import java.math.BigInteger;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.json.Json;
import javax.json.JsonObject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.quarkus.component.kafka.model.KafkaMessage;
import org.apache.camel.quarkus.component.kafka.model.Price;
import org.apache.camel.spi.RouteController;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;

@Path("/kafka")
@ApplicationScoped
/* loaded from: input_file:org/apache/camel/quarkus/component/kafka/CamelKafkaResource.class */
public class CamelKafkaResource {

    @Inject
    @Named("kafka-consumer-properties")
    Properties consumerProperties;

    @Inject
    @Named("kafka-producer-properties")
    Properties producerProperties;

    @Inject
    CamelContext context;

    @Inject
    ProducerTemplate producerTemplate;

    @Inject
    ConsumerTemplate consumerTemplate;

    @GET
    @Produces({"text/plain"})
    @Path("/custom/client/factory/missing")
    public boolean kafkaClientFactoryIsMissing() {
        return this.context.getRegistry().findByType(KafkaClientFactory.class).isEmpty();
    }

    @POST
    @Produces({"application/json"})
    @Path("/{topicName}")
    public JsonObject post(@PathParam("topicName") String str, String str2) throws Exception {
        KafkaProducer kafkaProducer = new KafkaProducer(this.producerProperties);
        try {
            RecordMetadata recordMetadata = (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, 1, str2)).get();
            JsonObject build = Json.createObjectBuilder().add("topicName", recordMetadata.topic()).add("partition", recordMetadata.partition()).add("offset", recordMetadata.offset()).build();
            kafkaProducer.close();
            return build;
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/{topicName}")
    public JsonObject get(@PathParam("topicName") String str) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerProperties);
        try {
            kafkaConsumer.subscribe(Collections.singletonList(str));
            ConsumerRecord consumerRecord = (ConsumerRecord) kafkaConsumer.poll(Duration.ofSeconds(60L)).iterator().next();
            JsonObject build = Json.createObjectBuilder().add("topicName", consumerRecord.topic()).add("partition", consumerRecord.partition()).add("offset", consumerRecord.offset()).add("key", ((Integer) consumerRecord.key()).intValue()).add("body", (String) consumerRecord.value()).build();
            kafkaConsumer.close();
            return build;
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Path("idempotent/{id}")
    @PUT
    @Consumes({"application/json"})
    public Response addMessage(@PathParam("id") Integer num, String str) {
        this.producerTemplate.sendBodyAndHeader("direct:idempotent", str, "id", num);
        return Response.accepted().build();
    }

    @GET
    @Produces({"application/json"})
    @Path("idempotent")
    public List<String> getIdempotentResults() {
        return (List) this.context.getEndpoint("mock:idempotent-results", MockEndpoint.class).getReceivedExchanges().stream().map((v0) -> {
            return v0.getMessage();
        }).map(message -> {
            return (String) message.getBody(String.class);
        }).collect(Collectors.toList());
    }

    @POST
    @Path("/foo/{action}")
    public Response modifyFooConsumerState(@PathParam("action") String str) throws Exception {
        RouteController routeController = this.context.getRouteController();
        if (str.equals("start")) {
            routeController.startRoute("foo");
        } else {
            if (!str.equals("stop")) {
                throw new IllegalArgumentException("Unknown action: " + str);
            }
            routeController.stopRoute("foo");
        }
        return Response.ok().build();
    }

    @GET
    @Path("/seda/{queue}")
    public String getSedaMessage(@PathParam("queue") String str) {
        return (String) this.consumerTemplate.receiveBody(String.format("seda:%s", str), 10000L, String.class);
    }

    @POST
    @Path("price/{key}")
    public Response postPrice(@PathParam("key") Integer num, Double d) {
        this.producerTemplate.sendBodyAndHeader("kafka:test-serializer?autoOffsetReset=earliest&keySerializer=org.apache.kafka.common.serialization.IntegerSerializer&valueSerializer=org.apache.kafka.common.serialization.DoubleSerializer", d, "kafka.KEY", num);
        return Response.ok().build();
    }

    @GET
    @Produces({"application/json"})
    @Path("price")
    public Price getPrice() {
        Exchange receive = this.consumerTemplate.receive("seda:serializer", 10000L);
        return new Price((Integer) receive.getMessage().getHeader("kafka.KEY", Integer.class), (Double) receive.getMessage().getBody(Double.class));
    }

    @POST
    @Path("propagate/{id}")
    public Response postMessageWithHeader(@PathParam("id") Integer num, String str) {
        KafkaProducer kafkaProducer = new KafkaProducer(this.producerProperties);
        try {
            ProducerRecord producerRecord = new ProducerRecord("test-propagation", num, str);
            producerRecord.headers().add(new RecordHeader("id", BigInteger.valueOf(num.intValue()).toByteArray()));
            kafkaProducer.send(producerRecord);
            kafkaProducer.close();
            return Response.ok().build();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("propagate")
    public KafkaMessage getKafkaMessage() {
        Exchange receive = this.consumerTemplate.receive("seda:propagation", 10000L);
        return new KafkaMessage((String) receive.getMessage().getHeader("id", String.class), (String) receive.getMessage().getBody(String.class));
    }
}
