package org.infinispan.cloudevents.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Base64;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.infinispan.cloudevents.configuration.CloudEventsGlobalConfiguration;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.Transcoder;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.encoding.impl.StorageConfigurationManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.marshall.persistence.PersistenceMarshaller;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.util.logging.LogFactory;

@Scope(Scopes.NAMED_CACHE)
@Listener(primaryOnly = true, observation = Listener.Observation.POST)
/* loaded from: input_file:org/infinispan/cloudevents/impl/EntryEventListener.class */
public class EntryEventListener {
    private static final Log log = (Log) LogFactory.getLog(EntryEventListener.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();

    @Inject
    StorageConfigurationManager storageConfigurationManager;

    @ComponentName("org.infinispan.marshaller.persistence")
    @Inject
    PersistenceMarshaller persistenceMarshaller;

    @Inject
    CacheNotifier<?, ?> cacheNotifier;

    @Inject
    KafkaEventSender kafkaEventSender;
    private Transcoder keyJsonTranscoder;
    private Transcoder valueJsonTranscoder;
    private String clusterName;
    private String cacheEntriesTopic;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.infinispan.cloudevents.impl.EntryEventListener$1, reason: invalid class name */
    /* loaded from: input_file:org/infinispan/cloudevents/impl/EntryEventListener$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type = new int[Event.Type.values().length];

        static {
            try {
                $SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type[Event.Type.CACHE_ENTRY_CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type[Event.Type.CACHE_ENTRY_EVICTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type[Event.Type.CACHE_ENTRY_EXPIRED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type[Event.Type.CACHE_ENTRY_MODIFIED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type[Event.Type.CACHE_ENTRY_REMOVED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public void inject(EncoderRegistry encoderRegistry, GlobalConfiguration globalConfiguration) {
        MediaType bytesMediaType = bytesMediaType(this.storageConfigurationManager.getKeyStorageMediaType(), this.persistenceMarshaller.mediaType());
        if (encoderRegistry.isConversionSupported(bytesMediaType, MediaType.APPLICATION_JSON)) {
            this.keyJsonTranscoder = encoderRegistry.getTranscoder(bytesMediaType, MediaType.APPLICATION_JSON);
        }
        MediaType bytesMediaType2 = bytesMediaType(this.storageConfigurationManager.getKeyStorageMediaType(), this.persistenceMarshaller.mediaType());
        if (encoderRegistry.isConversionSupported(bytesMediaType, MediaType.APPLICATION_JSON)) {
            this.valueJsonTranscoder = encoderRegistry.getTranscoder(bytesMediaType2, MediaType.APPLICATION_JSON);
        }
        this.clusterName = globalConfiguration.transport().clusterName();
        this.cacheEntriesTopic = ((CloudEventsGlobalConfiguration) globalConfiguration.module(CloudEventsGlobalConfiguration.class)).cacheEntriesTopic();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Start
    public void start() {
        this.cacheNotifier.addListener(this);
    }

    @CacheEntryCreated
    @CacheEntryModified
    @CacheEntryRemoved
    @CacheEntryExpired
    public CompletionStage<Void> handleCacheEntryEvent(CacheEntryEvent<?, ?> cacheEntryEvent) {
        try {
            ProducerRecord<byte[], byte[]> entryEventToKafkaMessage = entryEventToKafkaMessage(cacheEntryEvent);
            if (trace) {
                log.tracef("Sending cloudevents message for %s %s %s event", cacheEntryEvent.getType(), cacheEntryEvent.getKey(), cacheEntryEvent.getSource());
            }
            return this.kafkaEventSender.send(entryEventToKafkaMessage);
        } catch (IOException e) {
            log.sendError(cacheEntryEvent.getType(), cacheEntryEvent.getKey(), cacheEntryEvent.getSource());
            return null;
        } catch (InterruptedException e2) {
            if (!trace) {
                return null;
            }
            log.tracef("Cache manager is shutting down, skipping event", new Object[0]);
            return null;
        }
    }

    private MediaType bytesMediaType(MediaType mediaType, MediaType mediaType2) {
        return mediaType.match(MediaType.APPLICATION_OBJECT) ? mediaType2 : mediaType;
    }

    public ProducerRecord<byte[], byte[]> entryEventToKafkaMessage(CacheEntryEvent<?, ?> cacheEntryEvent) throws IOException, InterruptedException {
        StructuredEventBuilder structuredEventBuilder = new StructuredEventBuilder();
        Object unwrap = this.storageConfigurationManager.getKeyWrapper().unwrap(cacheEntryEvent.getKey());
        Object unwrap2 = this.storageConfigurationManager.getValueWrapper().unwrap(cacheEntryEvent.getType() != Event.Type.CACHE_ENTRY_REMOVED ? cacheEntryEvent.getValue() : ((CacheEntryRemovedEvent) cacheEntryEvent).getOldValue());
        structuredEventBuilder.setSource("/infinispan/" + this.clusterName + "/" + cacheEntryEvent.getCache().getName());
        structuredEventBuilder.setType(translateType(cacheEntryEvent.getType()));
        structuredEventBuilder.setTime(Instant.now().toString());
        String writeKey = writeKey(structuredEventBuilder, unwrap, this.storageConfigurationManager.getKeyStorageMediaType());
        Object source = cacheEntryEvent.getSource();
        if (source == null) {
            source = Long.valueOf(ThreadLocalRandom.current().nextLong());
        }
        structuredEventBuilder.setId(writeKey + ":" + source);
        writeValue(structuredEventBuilder, unwrap2, this.storageConfigurationManager.getValueStorageMediaType());
        writeVersion(cacheEntryEvent, structuredEventBuilder);
        return structuredEventBuilder.toKafkaRecord(this.cacheEntriesTopic);
    }

    private String writeKey(StructuredEventBuilder structuredEventBuilder, Object obj, MediaType mediaType) throws IOException, InterruptedException {
        byte[] bArr;
        MediaType mediaType2;
        boolean isValidUtf8;
        String str = null;
        if (!mediaType.match(MediaType.APPLICATION_OBJECT)) {
            bArr = (byte[]) obj;
            mediaType2 = mediaType;
            isValidUtf8 = isValidUtf8(bArr);
        } else if (StructuredEventBuilder.isJsonPrimitive(obj.getClass())) {
            str = Json.make(obj).toString();
            bArr = str.getBytes(StandardCharsets.UTF_8);
            isValidUtf8 = true;
            mediaType2 = MediaType.APPLICATION_JSON;
        } else {
            byte[] objectToByteBuffer = this.persistenceMarshaller.getUserMarshaller().objectToByteBuffer(obj);
            if (this.valueJsonTranscoder != null) {
                bArr = (byte[]) this.keyJsonTranscoder.transcode(objectToByteBuffer, mediaType, MediaType.APPLICATION_JSON);
                mediaType2 = MediaType.APPLICATION_JSON;
                isValidUtf8 = true;
            } else {
                bArr = objectToByteBuffer;
                mediaType2 = mediaType;
                isValidUtf8 = isValidUtf8(objectToByteBuffer);
            }
        }
        if (str == null) {
            str = bytesToString(bArr, isValidUtf8);
        }
        structuredEventBuilder.setKey(bArr);
        structuredEventBuilder.setSubject(str, mediaType2, isValidUtf8);
        return str;
    }

    private void writeValue(StructuredEventBuilder structuredEventBuilder, Object obj, MediaType mediaType) throws IOException, InterruptedException {
        if (obj == null) {
            structuredEventBuilder.setPrimitiveData(null);
            return;
        }
        if (!mediaType.match(MediaType.APPLICATION_OBJECT)) {
            byte[] bArr = (byte[]) obj;
            structuredEventBuilder.setData(bArr, mediaType, !isValidUtf8(bArr));
        } else {
            if (StructuredEventBuilder.isJsonPrimitive(obj.getClass())) {
                structuredEventBuilder.setPrimitiveData(obj);
                return;
            }
            byte[] objectToByteBuffer = this.persistenceMarshaller.getUserMarshaller().objectToByteBuffer(obj);
            if (this.valueJsonTranscoder == null) {
                structuredEventBuilder.setData(objectToByteBuffer, mediaType, !isValidUtf8(objectToByteBuffer));
            } else {
                Object transcode = this.valueJsonTranscoder.transcode(objectToByteBuffer, mediaType, MediaType.APPLICATION_JSON);
                structuredEventBuilder.setData((byte[]) transcode, MediaType.APPLICATION_JSON, !isValidUtf8((byte[]) transcode));
            }
        }
    }

    private String bytesToString(byte[] bArr, boolean z) {
        return !z ? Base64.getEncoder().encodeToString(bArr) : new String(bArr, StandardCharsets.UTF_8);
    }

    private void writeVersion(CacheEntryEvent<?, ?> cacheEntryEvent, StructuredEventBuilder structuredEventBuilder) throws IOException, InterruptedException {
        EntryVersion version;
        if (cacheEntryEvent.getMetadata() == null || (version = cacheEntryEvent.getMetadata().version()) == null) {
            return;
        }
        structuredEventBuilder.setEntryVersion(this.persistenceMarshaller.objectToByteBuffer(version));
    }

    private static String translateType(Event.Type type) {
        switch (AnonymousClass1.$SwitchMap$org$infinispan$notifications$cachelistener$event$Event$Type[type.ordinal()]) {
            case 1:
                return "org.infinispan.entry.created";
            case 2:
                return "org.infinispan.entry.evicted";
            case 3:
                return "org.infinispan.entry.expired";
            case 4:
                return "org.infinispan.entry.modified";
            case 5:
                return "org.infinispan.entry.removed";
            default:
                throw new IllegalArgumentException("Unsupported event type: " + type);
        }
    }

    private static boolean isValidUtf8(byte[] bArr) {
        if (bArr.length == 0) {
            return true;
        }
        CharsetDecoder onUnmappableCharacter = StandardCharsets.UTF_8.newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        CharBuffer allocate = CharBuffer.allocate(StructuredEventBuilder.VALIDATION_BUFFER_SIZE);
        while (wrap.hasRemaining()) {
            CoderResult decode = onUnmappableCharacter.decode(wrap, allocate, true);
            if (decode.isUnderflow()) {
                return true;
            }
            if (!decode.isOverflow()) {
                return false;
            }
        }
        return true;
    }
}
