package io.smallrye.reactive.messaging.kafka.impl.ce;

import io.smallrye.reactive.messaging.ce.DefaultCloudEventMetadataBuilder;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.ce.impl.BaseCloudEventMetadata;
import io.smallrye.reactive.messaging.ce.impl.DefaultIncomingCloudEventMetadata;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaCloudEventMetadata;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSinkConfiguration;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.buffer.Buffer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.services.event.correlation.EventDataCorrelationResolver;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/ce/KafkaCloudEventHelper.class */
public class KafkaCloudEventHelper {
    public static final String KAFKA_HEADER_CONTENT_TYPE = "content-type";
    public static final String CE_CONTENT_TYPE_PREFIX = "application/cloudevents";
    public static final String CE_HEADER_PREFIX = "ce_";
    public static final String STRUCTURED_CONTENT_TYPE = "application/cloudevents+json; charset=UTF-8";
    public static final String KAFKA_HEADER_FOR_SPEC_VERSION = "ce_specversion";
    public static final String KAFKA_HEADER_FOR_TYPE = "ce_type";
    public static final String KAFKA_HEADER_FOR_SOURCE = "ce_source";
    public static final String KAFKA_HEADER_FOR_ID = "ce_id";
    public static final String KAFKA_HEADER_FOR_SCHEMA = "ce_dataschema";
    public static final String KAFKA_HEADER_FOR_CONTENT_TYPE = "ce_datacontenttype";
    public static final String KAFKA_HEADER_FOR_SUBJECT = "ce_subject";
    public static final String KAFKA_HEADER_FOR_TIME = "ce_time";
    public static final DateTimeFormatter RFC3339_DATE_FORMAT = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd'T'HH:mm:ss").appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).appendZoneOrOffsetId().toFormatter();

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/ce/KafkaCloudEventHelper$CloudEventMode.class */
    public enum CloudEventMode {
        STRUCTURED,
        BINARY,
        NOT_A_CLOUD_EVENT
    }

    private KafkaCloudEventHelper() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromStructuredCloudEvent(ConsumerRecord<K, T> consumerRecord) {
        JsonObject jsonObject;
        DefaultCloudEventMetadataBuilder defaultCloudEventMetadataBuilder = new DefaultCloudEventMetadataBuilder();
        if (consumerRecord.value() instanceof JsonObject) {
            jsonObject = (JsonObject) consumerRecord.value();
        } else if (consumerRecord.value() instanceof String) {
            jsonObject = new JsonObject((String) consumerRecord.value());
        } else {
            if (!(consumerRecord.value() instanceof byte[])) {
                throw new IllegalArgumentException("Invalid value type. Structured Cloud Event can only be created from String, JsonObject and byte[], found: " + consumerRecord.value().getClass());
            }
            jsonObject = Buffer.buffer((byte[]) consumerRecord.value()).toJsonObject();
        }
        defaultCloudEventMetadataBuilder.withSpecVersion(jsonObject.getString("specversion"));
        defaultCloudEventMetadataBuilder.withId(jsonObject.getString("id"));
        String string = jsonObject.getString("source");
        if (string == null) {
            throw new IllegalArgumentException("The JSON value must contain the source attribute");
        }
        defaultCloudEventMetadataBuilder.withSource(URI.create(string));
        defaultCloudEventMetadataBuilder.withType(jsonObject.getString("type"));
        String string2 = jsonObject.getString("datacontenttype");
        if (string2 != null) {
            defaultCloudEventMetadataBuilder.withDataContentType(string2);
        }
        String string3 = jsonObject.getString("dataschema");
        if (string3 != null) {
            defaultCloudEventMetadataBuilder.withDataSchema(URI.create(string3));
        }
        String string4 = jsonObject.getString("subject");
        if (string4 != null) {
            defaultCloudEventMetadataBuilder.withSubject(string4);
        }
        String string5 = jsonObject.getString("time");
        if (string5 != null) {
            defaultCloudEventMetadataBuilder.withTimestamp(ZonedDateTime.parse(string5, RFC3339_DATE_FORMAT));
        }
        if (consumerRecord.key() != null) {
            defaultCloudEventMetadataBuilder.withExtension(IncomingKafkaCloudEventMetadata.CE_KAFKA_KEY, consumerRecord.key());
        }
        defaultCloudEventMetadataBuilder.withExtension(IncomingKafkaCloudEventMetadata.CE_KAFKA_TOPIC, consumerRecord.topic());
        defaultCloudEventMetadataBuilder.withData(jsonObject.getValue(EventDataCorrelationResolver.DATA_REFERENCE_KEY));
        BaseCloudEventMetadata build = defaultCloudEventMetadataBuilder.build();
        build.validate();
        return new DefaultIncomingKafkaCloudEventMetadata(new DefaultIncomingCloudEventMetadata(build));
    }

    public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromBinaryCloudEvent(ConsumerRecord<?, T> consumerRecord) {
        DefaultCloudEventMetadataBuilder defaultCloudEventMetadataBuilder = new DefaultCloudEventMetadataBuilder();
        HashMap hashMap = new HashMap();
        consumerRecord.headers().forEach(header -> {
            hashMap.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
        });
        defaultCloudEventMetadataBuilder.withSpecVersion((String) hashMap.remove(KAFKA_HEADER_FOR_SPEC_VERSION));
        defaultCloudEventMetadataBuilder.withId((String) hashMap.remove(KAFKA_HEADER_FOR_ID));
        String str = (String) hashMap.remove(KAFKA_HEADER_FOR_SOURCE);
        if (str == null) {
            throw new IllegalArgumentException("The Kafka record must contain the ce_source header");
        }
        defaultCloudEventMetadataBuilder.withSource(URI.create(str));
        defaultCloudEventMetadataBuilder.withType((String) hashMap.remove(KAFKA_HEADER_FOR_TYPE));
        String str2 = (String) hashMap.remove(KAFKA_HEADER_CONTENT_TYPE);
        if (str2 != null) {
            defaultCloudEventMetadataBuilder.withDataContentType(str2);
        }
        String str3 = (String) hashMap.remove(KAFKA_HEADER_FOR_SCHEMA);
        if (str3 != null) {
            defaultCloudEventMetadataBuilder.withDataSchema(URI.create(str3));
        }
        String str4 = (String) hashMap.remove(KAFKA_HEADER_FOR_SUBJECT);
        if (str4 != null) {
            defaultCloudEventMetadataBuilder.withSubject(str4);
        }
        String str5 = (String) hashMap.remove(KAFKA_HEADER_FOR_TIME);
        if (str5 != null) {
            defaultCloudEventMetadataBuilder.withTimestamp(ZonedDateTime.parse(str5, RFC3339_DATE_FORMAT));
        }
        if (consumerRecord.key() != null) {
            defaultCloudEventMetadataBuilder.withExtension(IncomingKafkaCloudEventMetadata.CE_KAFKA_KEY, consumerRecord.key());
        }
        defaultCloudEventMetadataBuilder.withExtension(IncomingKafkaCloudEventMetadata.CE_KAFKA_TOPIC, consumerRecord.topic());
        hashMap.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(CE_HEADER_PREFIX);
        }).forEach(entry2 -> {
            defaultCloudEventMetadataBuilder.withExtension(((String) entry2.getKey()).substring(CE_HEADER_PREFIX.length()), entry2.getValue());
        });
        defaultCloudEventMetadataBuilder.withData(consumerRecord.value());
        return new DefaultIncomingKafkaCloudEventMetadata(new DefaultIncomingCloudEventMetadata(defaultCloudEventMetadataBuilder.build()));
    }

    public static ProducerRecord<?, ?> createBinaryRecord(Message<?> message, String str, OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, IncomingKafkaRecordMetadata<?, ?> incomingKafkaRecordMetadata, OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        if (outgoingCloudEventMetadata == null) {
            outgoingCloudEventMetadata = OutgoingCloudEventMetadata.builder().build();
        }
        Integer partition = getPartition(outgoingKafkaRecordMetadata, runtimeKafkaSinkConfiguration);
        Object key = getKey(message, outgoingKafkaRecordMetadata, outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Long timestamp = getTimestamp(outgoingKafkaRecordMetadata);
        List<Header> headers = getHeaders(outgoingKafkaRecordMetadata, incomingKafkaRecordMetadata, runtimeKafkaSinkConfiguration);
        Optional<String> subject = getSubject(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Optional<String> dataContentType = getDataContentType(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Optional<URI> dataSchema = getDataSchema(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_SPEC_VERSION, outgoingCloudEventMetadata.getSpecVersion().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_ID, outgoingCloudEventMetadata.getId().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_TYPE, getType(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration).getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_SOURCE, getSource(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration).getBytes(StandardCharsets.UTF_8)));
        subject.ifPresent(str2 -> {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_SUBJECT, str2.getBytes(StandardCharsets.UTF_8)));
        });
        dataContentType.ifPresent(str3 -> {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_CONTENT_TYPE, str3.getBytes(StandardCharsets.UTF_8)));
            headers.add(new RecordHeader(KAFKA_HEADER_CONTENT_TYPE, str3.getBytes(StandardCharsets.UTF_8)));
        });
        dataSchema.ifPresent(uri -> {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_SCHEMA, uri.toString().getBytes(StandardCharsets.UTF_8)));
        });
        Optional<ZonedDateTime> timeStamp = outgoingCloudEventMetadata.getTimeStamp();
        if (timeStamp.isPresent()) {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_TIME, RFC3339_DATE_FORMAT.format(timeStamp.get()).getBytes(StandardCharsets.UTF_8)));
        } else if (timestamp != null) {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_TIME, RFC3339_DATE_FORMAT.format(Instant.ofEpochMilli(timestamp.longValue())).getBytes(StandardCharsets.UTF_8)));
        } else if (runtimeKafkaSinkConfiguration.getCloudEventsInsertTimestamp().booleanValue()) {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_TIME, RFC3339_DATE_FORMAT.format(ZonedDateTime.now()).getBytes(StandardCharsets.UTF_8)));
        }
        outgoingCloudEventMetadata.getExtensions().forEach((str4, obj) -> {
            if (obj != null) {
                headers.add(new RecordHeader(CE_HEADER_PREFIX + str4, obj.toString().getBytes(StandardCharsets.UTF_8)));
            }
        });
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record) payload).value();
        }
        return new ProducerRecord<>(str, partition, timestamp, key, payload, headers);
    }

    private static String getSource(OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        String uri = outgoingCloudEventMetadata.getSource() != null ? outgoingCloudEventMetadata.getSource().toString() : null;
        if (uri == null) {
            uri = runtimeKafkaSinkConfiguration.getCloudEventsSource().orElseThrow(() -> {
                return new IllegalArgumentException("Cannot build the Cloud Event Record - source is not set");
            });
        }
        return uri;
    }

    private static String getType(OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        String type = outgoingCloudEventMetadata.getType();
        if (type == null) {
            type = runtimeKafkaSinkConfiguration.getCloudEventsType().orElseThrow(() -> {
                return new IllegalArgumentException("Cannot build the Cloud Event Record - type is not set");
            });
        }
        return type;
    }

    private static Optional<String> getSubject(OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        return outgoingCloudEventMetadata.getSubject().isPresent() ? outgoingCloudEventMetadata.getSubject() : runtimeKafkaSinkConfiguration.getCloudEventsSubject();
    }

    private static Optional<URI> getDataSchema(OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        return outgoingCloudEventMetadata.getDataSchema().isPresent() ? outgoingCloudEventMetadata.getDataSchema() : runtimeKafkaSinkConfiguration.getCloudEventsDataSchema().map(URI::create);
    }

    private static Optional<String> getDataContentType(OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        return outgoingCloudEventMetadata.getDataContentType().isPresent() ? outgoingCloudEventMetadata.getDataContentType() : runtimeKafkaSinkConfiguration.getCloudEventsDataContentType();
    }

    private static List<Header> getHeaders(OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, IncomingKafkaRecordMetadata<?, ?> incomingKafkaRecordMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        ArrayList arrayList = new ArrayList();
        if (!isNotBlank(runtimeKafkaSinkConfiguration.getPropagateHeaders()) && incomingKafkaRecordMetadata != null && incomingKafkaRecordMetadata.getHeaders() != null) {
            Set set = (Set) Arrays.stream(runtimeKafkaSinkConfiguration.getPropagateHeaders().split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toSet());
            for (Header header : incomingKafkaRecordMetadata.getHeaders()) {
                if (set.contains(header.key())) {
                    arrayList.add(header);
                }
            }
        }
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getHeaders() != null) {
            Headers headers = outgoingKafkaRecordMetadata.getHeaders();
            Objects.requireNonNull(arrayList);
            headers.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    private static Long getTimestamp(OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata) {
        long j = -1;
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getTimestamp() != null) {
            j = outgoingKafkaRecordMetadata.getTimestamp().toEpochMilli();
        }
        if (j <= 0) {
            return null;
        }
        return Long.valueOf(j);
    }

    private static Object getKey(Message<?> message, OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        return (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getKey() == null) ? message.getPayload() instanceof Record ? ((Record) message.getPayload()).key() : outgoingCloudEventMetadata.getExtension(IncomingKafkaCloudEventMetadata.CE_KAFKA_KEY).orElse(runtimeKafkaSinkConfiguration.getKey().orElse(null)) : outgoingKafkaRecordMetadata.getKey();
    }

    private static Integer getPartition(OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        int partition = runtimeKafkaSinkConfiguration.getPartition();
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getPartition() != -1) {
            partition = outgoingKafkaRecordMetadata.getPartition();
        }
        if (partition < 0) {
            return null;
        }
        return Integer.valueOf(partition);
    }

    public static ProducerRecord<?, ?> createStructuredRecord(Message<?> message, String str, OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, IncomingKafkaRecordMetadata<?, ?> incomingKafkaRecordMetadata, OutgoingCloudEventMetadata<?> outgoingCloudEventMetadata, RuntimeKafkaSinkConfiguration runtimeKafkaSinkConfiguration) {
        if (outgoingCloudEventMetadata == null) {
            outgoingCloudEventMetadata = OutgoingCloudEventMetadata.builder().build();
        }
        Integer partition = getPartition(outgoingKafkaRecordMetadata, runtimeKafkaSinkConfiguration);
        Object key = getKey(message, outgoingKafkaRecordMetadata, outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Long timestamp = getTimestamp(outgoingKafkaRecordMetadata);
        List<Header> headers = getHeaders(outgoingKafkaRecordMetadata, incomingKafkaRecordMetadata, runtimeKafkaSinkConfiguration);
        String source = getSource(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        String type = getType(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Optional<String> subject = getSubject(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Optional<String> dataContentType = getDataContentType(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        Optional<URI> dataSchema = getDataSchema(outgoingCloudEventMetadata, runtimeKafkaSinkConfiguration);
        if (!headers.stream().filter(header -> {
            return header.key().equalsIgnoreCase(KAFKA_HEADER_CONTENT_TYPE);
        }).findFirst().isPresent()) {
            headers.add(new RecordHeader(KAFKA_HEADER_CONTENT_TYPE, STRUCTURED_CONTENT_TYPE.getBytes()));
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.put("specversion", outgoingCloudEventMetadata.getSpecVersion()).put("type", type).put("source", source).put("id", outgoingCloudEventMetadata.getId());
        ZonedDateTime orElse = outgoingCloudEventMetadata.getTimeStamp().orElse(null);
        if (orElse != null) {
            jsonObject.put("time", orElse.toInstant());
        } else if (runtimeKafkaSinkConfiguration.getCloudEventsInsertTimestamp().booleanValue()) {
            jsonObject.put("time", Instant.now());
        }
        dataSchema.ifPresent(uri -> {
            jsonObject.put("dataschema", uri);
        });
        dataContentType.ifPresent(str2 -> {
            jsonObject.put("datacontenttype", str2);
        });
        subject.ifPresent(str3 -> {
            jsonObject.put("subject", str3);
        });
        Map<String, Object> extensions = outgoingCloudEventMetadata.getExtensions();
        Objects.requireNonNull(jsonObject);
        extensions.forEach(jsonObject::put);
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record) payload).value();
        }
        if (payload instanceof String) {
            jsonObject.put(EventDataCorrelationResolver.DATA_REFERENCE_KEY, payload);
        } else {
            jsonObject.put(EventDataCorrelationResolver.DATA_REFERENCE_KEY, JsonObject.mapFrom(payload));
        }
        return new ProducerRecord<>(str, partition, timestamp, key, jsonObject.encode(), headers);
    }

    public static CloudEventMode getCloudEventMode(ConsumerRecord<?, ?> consumerRecord) {
        String header = getHeader(KAFKA_HEADER_CONTENT_TYPE, consumerRecord);
        return (header == null || !header.startsWith(CE_CONTENT_TYPE_PREFIX)) ? containsAllMandatoryAttributes(consumerRecord) ? CloudEventMode.BINARY : CloudEventMode.NOT_A_CLOUD_EVENT : CloudEventMode.STRUCTURED;
    }

    private static boolean containsAllMandatoryAttributes(ConsumerRecord<?, ?> consumerRecord) {
        return (getHeader(KAFKA_HEADER_FOR_ID, consumerRecord) == null || getHeader(KAFKA_HEADER_FOR_SOURCE, consumerRecord) == null || getHeader(KAFKA_HEADER_FOR_TYPE, consumerRecord) == null || getHeader(KAFKA_HEADER_FOR_SPEC_VERSION, consumerRecord) == null) ? false : true;
    }

    private static String getHeader(String str, ConsumerRecord<?, ?> consumerRecord) {
        for (Header header : consumerRecord.headers()) {
            if (header.key().equals(str)) {
                return new String(header.value(), StandardCharsets.UTF_8);
            }
        }
        return null;
    }

    public static boolean isNotBlank(String str) {
        return (str == null || str.trim().isEmpty()) ? false : true;
    }
}
