/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.cloudevents.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Base64;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.infinispan.cloudevents.configuration.CloudEventsGlobalConfiguration;
import org.infinispan.cloudevents.impl.KafkaEventSender;
import org.infinispan.cloudevents.impl.Log;
import org.infinispan.cloudevents.impl.StructuredEventBuilder;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.Transcoder;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.encoding.impl.StorageConfigurationManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.marshall.persistence.PersistenceMarshaller;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.logging.LogFactory;

@Listener(primaryOnly=true, observation=Listener.Observation.POST)
@Scope(value=Scopes.NAMED_CACHE)
public class EntryEventListener {
    private static final Log log = (Log)LogFactory.getLog(EntryEventListener.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    @Inject
    StorageConfigurationManager storageConfigurationManager;
    @ComponentName(value="org.infinispan.marshaller.persistence")
    @Inject
    PersistenceMarshaller persistenceMarshaller;
    @Inject
    CacheNotifier<?, ?> cacheNotifier;
    @Inject
    Transport transport;
    @Inject
    KafkaEventSender kafkaEventSender;
    private Transcoder keyJsonTranscoder;
    private Transcoder valueJsonTranscoder;
    private String clusterName;
    private String cacheEntriesTopic;
    private Address localAddress;

    @Inject
    void inject(EncoderRegistry encoderRegistry, GlobalConfiguration globalConfiguration) {
        MediaType keyBytesMediaType = this.bytesMediaType(this.storageConfigurationManager.getKeyStorageMediaType(), this.persistenceMarshaller.mediaType());
        if (encoderRegistry.isConversionSupported(keyBytesMediaType, MediaType.APPLICATION_JSON)) {
            this.keyJsonTranscoder = encoderRegistry.getTranscoder(keyBytesMediaType, MediaType.APPLICATION_JSON);
        }
        MediaType valueBytesMediaType = this.bytesMediaType(this.storageConfigurationManager.getKeyStorageMediaType(), this.persistenceMarshaller.mediaType());
        if (encoderRegistry.isConversionSupported(keyBytesMediaType, MediaType.APPLICATION_JSON)) {
            this.valueJsonTranscoder = encoderRegistry.getTranscoder(valueBytesMediaType, MediaType.APPLICATION_JSON);
        }
        this.clusterName = globalConfiguration.transport().clusterName();
        CloudEventsGlobalConfiguration cloudEventsGlobalConfiguration = (CloudEventsGlobalConfiguration)globalConfiguration.module(CloudEventsGlobalConfiguration.class);
        this.cacheEntriesTopic = cloudEventsGlobalConfiguration.cacheEntriesTopic();
    }

    @Start
    void start() {
        this.localAddress = this.localAddress;
        this.cacheNotifier.addListener((Object)this);
    }

    @CacheEntryCreated
    @CacheEntryModified
    @CacheEntryRemoved
    @CacheEntryExpired
    public CompletionStage<Void> handleCacheEntryEvent(CacheEntryEvent<?, ?> e) {
        try {
            ProducerRecord<byte[], byte[]> record = this.entryEventToKafkaMessage(e);
            if (trace) {
                log.tracef("Sending cloudevents message for %s %s %s event", e.getType(), e.getKey(), e.getSource());
            }
            return this.kafkaEventSender.send(record);
        }
        catch (IOException ioException) {
            log.sendError(e.getType(), e.getKey(), e.getSource());
            return null;
        }
        catch (InterruptedException interruptedException) {
            if (trace) {
                log.tracef("Cache manager is shutting down, skipping event", new Object[0]);
            }
            return null;
        }
    }

    private MediaType bytesMediaType(MediaType storageMediaType, MediaType persistenceMediaType) {
        return storageMediaType.match(MediaType.APPLICATION_OBJECT) ? persistenceMediaType : storageMediaType;
    }

    public ProducerRecord<byte[], byte[]> entryEventToKafkaMessage(CacheEntryEvent<?, ?> event) throws IOException, InterruptedException {
        StructuredEventBuilder writer = new StructuredEventBuilder();
        Object key = this.storageConfigurationManager.getKeyWrapper().unwrap(event.getKey());
        Object wrappedValue = event.getType() != Event.Type.CACHE_ENTRY_REMOVED ? event.getValue() : ((CacheEntryRemovedEvent)event).getOldValue();
        Object value = this.storageConfigurationManager.getValueWrapper().unwrap(wrappedValue);
        String cacheName = event.getCache().getName();
        writer.setSource("/infinispan/" + this.clusterName + "/" + cacheName);
        writer.setType(EntryEventListener.translateType(event.getType()));
        writer.setTime(Instant.now().toString());
        String stringKey = this.writeKey(writer, key, this.storageConfigurationManager.getKeyStorageMediaType());
        Object source = event.getSource();
        if (source == null) {
            source = ThreadLocalRandom.current().nextLong();
        }
        writer.setId(stringKey + ":" + source);
        MediaType storageMediaType = this.storageConfigurationManager.getValueStorageMediaType();
        this.writeValue(writer, value, storageMediaType);
        this.writeVersion(event, writer);
        return writer.toKafkaRecord(this.cacheEntriesTopic);
    }

    private String writeKey(StructuredEventBuilder writer, Object key, MediaType storageMediaType) throws IOException, InterruptedException {
        MediaType mediaType;
        boolean validUtf8;
        byte[] keyBytes;
        String keyString = null;
        if (storageMediaType.match(MediaType.APPLICATION_OBJECT)) {
            if (StructuredEventBuilder.isJsonPrimitive(key.getClass())) {
                keyString = Json.make((Object)key).toString();
                keyBytes = keyString.getBytes(StandardCharsets.UTF_8);
                validUtf8 = true;
                mediaType = MediaType.APPLICATION_JSON;
            } else {
                byte[] storageBytes = this.persistenceMarshaller.getUserMarshaller().objectToByteBuffer(key);
                if (this.valueJsonTranscoder != null) {
                    Object jsonBytes = this.keyJsonTranscoder.transcode((Object)storageBytes, storageMediaType, MediaType.APPLICATION_JSON);
                    keyBytes = (byte[])jsonBytes;
                    mediaType = MediaType.APPLICATION_JSON;
                    validUtf8 = true;
                } else {
                    keyBytes = storageBytes;
                    mediaType = storageMediaType;
                    validUtf8 = EntryEventListener.isValidUtf8(storageBytes);
                }
            }
        } else {
            keyBytes = (byte[])key;
            mediaType = storageMediaType;
            validUtf8 = EntryEventListener.isValidUtf8(keyBytes);
        }
        if (keyString == null) {
            keyString = this.bytesToString(keyBytes, validUtf8);
        }
        writer.setKey(keyBytes);
        writer.setSubject(keyString, mediaType, validUtf8);
        return keyString;
    }

    private void writeValue(StructuredEventBuilder writer, Object value, MediaType storageMediaType) throws IOException, InterruptedException {
        if (value != null) {
            if (storageMediaType.match(MediaType.APPLICATION_OBJECT)) {
                if (StructuredEventBuilder.isJsonPrimitive(value.getClass())) {
                    writer.setPrimitiveData(value);
                } else {
                    byte[] storageBytes = this.persistenceMarshaller.getUserMarshaller().objectToByteBuffer(value);
                    if (this.valueJsonTranscoder != null) {
                        Object jsonBytes = this.valueJsonTranscoder.transcode((Object)storageBytes, storageMediaType, MediaType.APPLICATION_JSON);
                        writer.setData((byte[])jsonBytes, MediaType.APPLICATION_JSON, !EntryEventListener.isValidUtf8((byte[])jsonBytes));
                    } else {
                        writer.setData(storageBytes, storageMediaType, !EntryEventListener.isValidUtf8(storageBytes));
                    }
                }
            } else {
                byte[] valueBytes;
                writer.setData(valueBytes, storageMediaType, !EntryEventListener.isValidUtf8(valueBytes = (byte[])value));
            }
        } else {
            writer.setPrimitiveData(null);
        }
    }

    private String bytesToString(byte[] valueBytes, boolean validUtf8) {
        if (!validUtf8) {
            return Base64.getEncoder().encodeToString(valueBytes);
        }
        return new String(valueBytes, StandardCharsets.UTF_8);
    }

    private void writeVersion(CacheEntryEvent<?, ?> event, StructuredEventBuilder writer) throws IOException, InterruptedException {
        EntryVersion version;
        if (event.getMetadata() != null && (version = event.getMetadata().version()) != null) {
            byte[] versionBytes = this.persistenceMarshaller.objectToByteBuffer((Object)version);
            writer.setEntryVersion(versionBytes);
        }
    }

    private static String translateType(Event.Type type) {
        switch (type) {
            case CACHE_ENTRY_CREATED: {
                return "org.infinispan.entry.created";
            }
            case CACHE_ENTRY_EVICTED: {
                return "org.infinispan.entry.evicted";
            }
            case CACHE_ENTRY_EXPIRED: {
                return "org.infinispan.entry.expired";
            }
            case CACHE_ENTRY_MODIFIED: {
                return "org.infinispan.entry.modified";
            }
            case CACHE_ENTRY_REMOVED: {
                return "org.infinispan.entry.removed";
            }
        }
        throw new IllegalArgumentException("Unsupported event type: " + type);
    }

    private static boolean isValidUtf8(byte[] bytes) {
        CoderResult cr;
        if (bytes.length == 0) {
            return true;
        }
        CharsetDecoder utf8Decoder = StandardCharsets.UTF_8.newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
        ByteBuffer in = ByteBuffer.wrap(bytes);
        CharBuffer out = CharBuffer.allocate(512);
        while (in.hasRemaining() && !(cr = utf8Decoder.decode(in, out, true)).isUnderflow()) {
            if (cr.isOverflow()) continue;
            return false;
        }
        return true;
    }
}

