package org.apache.camel.component.vertx.kafka.operations;

import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.vertx.kafka.VertxKafkaConfigurationOptionsProxy;
import org.apache.camel.component.vertx.kafka.VertxKafkaConstants;
import org.apache.camel.component.vertx.kafka.VertxKafkaHeadersPropagation;
import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
import org.apache.camel.component.vertx.kafka.serde.VertxKafkaTypeSerializer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.class */
public class VertxKafkaProducerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaProducerOperations.class);
    private final KafkaProducer<Object, Object> kafkaProducer;
    private final VertxKafkaConfigurationOptionsProxy configurationOptionsProxy;

    public VertxKafkaProducerOperations(KafkaProducer<Object, Object> kafkaProducer, VertxKafkaConfiguration vertxKafkaConfiguration) {
        ObjectHelper.notNull(kafkaProducer, "kafkaProducer");
        ObjectHelper.notNull(vertxKafkaConfiguration, "configuration");
        this.kafkaProducer = kafkaProducer;
        this.configurationOptionsProxy = new VertxKafkaConfigurationOptionsProxy(vertxKafkaConfiguration);
    }

    public boolean sendEvents(Message message, AsyncCallback asyncCallback) {
        ObjectHelper.notNull(message, "exchange cannot be null");
        ObjectHelper.notNull(asyncCallback, "callback cannot be null");
        return sendEvents(message, list -> {
            LOG.debug("Processed one event...");
        }, asyncCallback);
    }

    public boolean sendEvents(Message message, Consumer<List<RecordMetadata>> consumer, AsyncCallback asyncCallback) {
        ObjectHelper.notNull(message, "inMessage cannot be null");
        ObjectHelper.notNull(asyncCallback, "callback cannot be null");
        sendAsyncEvents(message).subscribe(consumer, th -> {
            LOG.debug("Error processing async exchange with error:" + th.getMessage());
            message.getExchange().setException(th);
            asyncCallback.done(false);
        }, () -> {
            LOG.debug("All events with exchange have been sent successfully.");
            asyncCallback.done(false);
        });
        return false;
    }

    private Mono<List<RecordMetadata>> sendAsyncEvents(Message message) {
        return Flux.fromIterable(createKafkaProducerRecords(message)).flatMap(this::sendDataToKafka).collectList().doOnError(th -> {
            LOG.error(th.getMessage());
        });
    }

    private Mono<RecordMetadata> sendDataToKafka(KafkaProducerRecord<Object, Object> kafkaProducerRecord) {
        return Mono.create(monoSink -> {
            this.kafkaProducer.send(kafkaProducerRecord, asyncResult -> {
                if (asyncResult.failed()) {
                    monoSink.error(asyncResult.cause());
                } else {
                    monoSink.success((RecordMetadata) asyncResult.result());
                }
            });
        });
    }

    private Iterable<KafkaProducerRecord<Object, Object>> createKafkaProducerRecords(Message message) {
        return message.getBody() instanceof Iterable ? createProducerRecordFromIterable((Iterable) message.getBody(), message) : Collections.singletonList(createProducerRecordFromMessage(message, null));
    }

    private Iterable<KafkaProducerRecord<Object, Object>> createProducerRecordFromIterable(Iterable<Object> iterable, Message message) {
        LinkedList linkedList = new LinkedList();
        String topic = getTopic(message, null);
        iterable.forEach(obj -> {
            if (obj instanceof Exchange) {
                linkedList.add(createProducerRecordFromExchange((Exchange) obj, topic));
            } else if (obj instanceof Message) {
                linkedList.add(createProducerRecordFromMessage((Message) obj, topic));
            } else {
                linkedList.add(createProducerRecordFromObject(obj, message, topic));
            }
        });
        return linkedList;
    }

    private KafkaProducerRecord<Object, Object> createProducerRecordFromExchange(Exchange exchange, String str) {
        return createProducerRecordFromMessage(exchange.getIn(), str);
    }

    private KafkaProducerRecord<Object, Object> createProducerRecordFromMessage(Message message, String str) {
        return createProducerRecordFromObject(message.getBody(), message, str);
    }

    private KafkaProducerRecord<Object, Object> createProducerRecordFromObject(Object obj, Message message, String str) {
        String topic = getTopic(message, str);
        Object messageKey = getMessageKey(message);
        Object messageValue = getMessageValue(message, obj);
        Integer partitionId = getPartitionId(message);
        return KafkaProducerRecord.create(topic, messageKey, messageValue, partitionId).addHeaders(new VertxKafkaHeadersPropagation(this.configurationOptionsProxy.getConfiguration().getHeaderFilterStrategy()).getPropagatedHeaders(message));
    }

    private String getTopic(Message message, String str) {
        String topic = getTopic(message, this.configurationOptionsProxy.getOverrideTopic(message), (String) message.getHeader(VertxKafkaConstants.TOPIC, String.class), str);
        if (ObjectHelper.isEmpty(topic)) {
            throw new IllegalArgumentException("Topic cannot be empty, provide a topic in the config or in the headers.");
        }
        return topic;
    }

    private String getTopic(Message message, String str, String str2, String str3) {
        String str4 = ObjectHelper.isEmpty(str) ? str2 : str;
        String str5 = ObjectHelper.isEmpty(str4) ? str3 : str4;
        return ObjectHelper.isEmpty(str5) ? this.configurationOptionsProxy.getTopic(message) : str5;
    }

    private Object getMessageKey(Message message) {
        return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, this.configurationOptionsProxy.getMessageKey(message), this.configurationOptionsProxy.getKeySerializer(message));
    }

    private Integer getPartitionId(Message message) {
        return this.configurationOptionsProxy.getPartitionId(message);
    }

    private Object getMessageValue(Message message, Object obj) {
        return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, obj, this.configurationOptionsProxy.getValueSerializer(message));
    }
}
