/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.cloudevents;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.cloudevents.MockKafkaEventSender;
import org.infinispan.cloudevents.configuration.CloudEventsGlobalConfigurationBuilder;
import org.infinispan.cloudevents.impl.KafkaEventSender;
import org.infinispan.commands.module.TestGlobalConfigurationBuilder;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.StorageType;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.internal.PrivateGlobalConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.rehash.TestWriteOperation;
import org.infinispan.encoding.DataConversion;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="cloudevents.impl.CacheEntryCloudEventsTest")
public class CacheEntryCloudEventsTest
extends MultipleCacheManagersTest {
    public static final String CACHE_NAME = "testCache";
    private final MockKafkaEventSender mockSender = new MockKafkaEventSender();
    private StorageType storageType;
    private boolean serverMode;

    public Object[] factory() {
        return new Object[]{new CacheEntryCloudEventsTest().storageType(StorageType.OBJECT), new CacheEntryCloudEventsTest().storageType(StorageType.BINARY), new CacheEntryCloudEventsTest().storageType(StorageType.HEAP).serverMode(true)};
    }

    @DataProvider
    public static Object[][] operations() {
        return new Object[][]{{TestWriteOperation.PUT_CREATE}, {TestWriteOperation.PUT_OVERWRITE}, {TestWriteOperation.PUT_IF_ABSENT}, {TestWriteOperation.REPLACE}, {TestWriteOperation.REPLACE_EXACT}, {TestWriteOperation.REMOVE}, {TestWriteOperation.REMOVE_EXACT}, {TestWriteOperation.PUT_MAP_CREATE}};
    }

    public CacheEntryCloudEventsTest storageType(StorageType storageType) {
        this.storageType = storageType;
        return this;
    }

    private Object serverMode(boolean serverMode) {
        this.serverMode = serverMode;
        return this;
    }

    protected void createCacheManagers() {
        this.addNode();
        this.addNode();
        this.addNode();
        this.waitForClusterToForm();
    }

    protected String[] parameterNames() {
        return new String[]{"storage", "server"};
    }

    protected Object[] parameterValues() {
        return new Object[]{this.storageType, this.serverMode ? "y" : null};
    }

    private Address addNode() {
        GlobalConfigurationBuilder managerBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        managerBuilder.defaultCacheName(CACHE_NAME).serialization().addContextInitializer((SerializationContextInitializer)TestDataSCI.INSTANCE);
        if (this.serverMode) {
            ((PrivateGlobalConfigurationBuilder)managerBuilder.addModule(PrivateGlobalConfigurationBuilder.class)).serverMode(true);
        }
        CloudEventsGlobalConfigurationBuilder cloudEventsGlobalBuilder = (CloudEventsGlobalConfigurationBuilder)managerBuilder.addModule(CloudEventsGlobalConfigurationBuilder.class);
        cloudEventsGlobalBuilder.bootstrapServers("localhost:9092");
        cloudEventsGlobalBuilder.cacheEntriesTopic("ispn");
        TestGlobalConfigurationBuilder testGlobalConfigurationBuilder = (TestGlobalConfigurationBuilder)managerBuilder.addModule(TestGlobalConfigurationBuilder.class);
        testGlobalConfigurationBuilder.testGlobalComponent(KafkaEventSender.class.getName(), (Object)this.mockSender);
        ConfigurationBuilder cacheBuilder = new ConfigurationBuilder();
        cacheBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
        cacheBuilder.memory().storage(this.storageType);
        EmbeddedCacheManager manager = this.addClusterEnabledCacheManager(managerBuilder, cacheBuilder);
        return manager.getAddress();
    }

    @Test(dataProvider="operations")
    public void testSingleKeyOperations(TestWriteOperation op) throws InterruptedException, TimeoutException, ExecutionException {
        AdvancedCache originator = this.advancedCache(0);
        for (Cache cache : this.caches()) {
            MagicKey key = new MagicKey(cache);
            if (op.getPreviousValue() != null) {
                this.mockSender.clear();
                originator.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put((Object)key, op.getPreviousValue());
                AssertJUnit.assertTrue((boolean)this.mockSender.getProducer().history().isEmpty());
            }
            this.mockSender.clear();
            CompletionStage stage = op.performAsync(originator, (Object)key);
            AssertJUnit.assertFalse((boolean)stage.toCompletableFuture().isDone());
            this.mockSender.completeSend();
            stage.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            Object eventValue = op.getValue() != null ? op.getValue() : op.getPreviousValue();
            this.assertEntryEventSent(key, eventValue, op);
            Object returnValue = stage.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals((Object)op.getReturnValue(), returnValue);
            this.assertEntryEventSent(key, eventValue, op);
        }
    }

    public void testMultiKeyOperations() throws InterruptedException, TimeoutException, ExecutionException {
        LinkedHashMap<MagicKey, Object> data = new LinkedHashMap<MagicKey, Object>();
        for (int i = 0; i < this.caches().size(); ++i) {
            MagicKey key2 = new MagicKey("key-" + i, this.cache(i));
            String value2 = "value-" + i;
            data.put(key2, value2);
        }
        for (Cache cache : this.caches()) {
            this.log.tracef("Testing on %s", (Object)this.address(cache));
            this.mockSender.clear();
            CompletableFuture putAllFuture = cache.putAllAsync(data);
            AssertJUnit.assertFalse((boolean)putAllFuture.isDone());
            this.mockSender.completeSend(data.size());
            putAllFuture.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            data.forEach((key, value) -> this.assertEntryEventSent(key, value, TestWriteOperation.PUT_MAP_CREATE));
            this.mockSender.clear();
            CompletableFuture clearFuture = cache.clearAsync();
            AssertJUnit.assertFalse((boolean)clearFuture.isDone());
            this.mockSender.completeSend(data.size());
            clearFuture.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            data.forEach((key, value) -> this.assertEntryEventSent(key, value, TestWriteOperation.REMOVE));
        }
    }

    private void assertEntryEventSent(Object key, Object value, TestWriteOperation op) {
        byte[] expectedKeyBytes = this.getKeyBytes(key);
        byte[] expectedValueBytes = this.getValueBytes(value);
        String type = CacheEntryCloudEventsTest.translateType(op);
        Optional<ProducerRecord> record = this.mockSender.getProducer().history().stream().filter(r -> Arrays.equals((byte[])r.key(), expectedKeyBytes)).findFirst();
        AssertJUnit.assertTrue((boolean)record.isPresent());
        byte[] eventBytes = (byte[])record.get().value();
        Json json = Json.read((String)new String(eventBytes));
        AssertJUnit.assertEquals((String)"1.0", (String)json.at("specversion").asString());
        AssertJUnit.assertEquals((String)type, (String)json.at("type").asString());
        String source = json.at("source").asString();
        AssertJUnit.assertTrue((boolean)source.startsWith("/infinispan"));
        AssertJUnit.assertTrue((boolean)source.endsWith("/testCache"));
        Instant.parse(json.at("time").asString());
        boolean keyIsBase64 = json.at("orginfinispansubject_isbase64", (Object)false).asBoolean();
        AssertJUnit.assertEquals((String)this.expectedContentType(keyIsBase64).toString(), (String)json.at("orginfinispansubject_contenttype", (Object)"application/json").asString());
        String subject = json.at("subject").asString();
        byte[] keyBytes = keyIsBase64 ? Base64.getDecoder().decode(subject) : subject.getBytes(StandardCharsets.UTF_8);
        AssertJUnit.assertEquals((byte[])expectedKeyBytes, (byte[])keyBytes);
        String data = json.at("data").asString();
        boolean valueIsBase64 = json.at("orginfinispandata_isbase64", (Object)false).asBoolean();
        AssertJUnit.assertEquals((String)this.expectedContentType(valueIsBase64).toString(), (String)json.at("datacontenttype", (Object)"application/json").asString());
        byte[] valueBytes = valueIsBase64 ? Base64.getDecoder().decode(data) : data.getBytes(StandardCharsets.UTF_8);
        AssertJUnit.assertEquals((byte[])expectedValueBytes, (byte[])valueBytes);
    }

    private MediaType expectedContentType(boolean isBase64) {
        return isBase64 ? (this.serverMode ? MediaType.APPLICATION_UNKNOWN : MediaType.APPLICATION_PROTOSTREAM) : MediaType.APPLICATION_JSON;
    }

    private byte[] getKeyBytes(Object key) {
        DataConversion keyDataConversion = this.advancedCache(0).getKeyDataConversion();
        return this.getBytes(key, keyDataConversion);
    }

    private byte[] getValueBytes(Object value) {
        DataConversion valueDataConversion = this.advancedCache(0).getValueDataConversion();
        return this.getBytes(value, valueDataConversion);
    }

    private byte[] getBytes(Object o, DataConversion dataConversion) {
        MediaType storageMediaType = dataConversion.getStorageMediaType();
        if (storageMediaType.match(MediaType.APPLICATION_OBJECT)) {
            if (o instanceof String) {
                return this.getBytes((String)o);
            }
            Object protostream = this.transcode(o, MediaType.APPLICATION_OBJECT, MediaType.APPLICATION_PROTOSTREAM);
            return (byte[])this.transcode(protostream, MediaType.APPLICATION_PROTOSTREAM, MediaType.APPLICATION_JSON);
        }
        return (byte[])dataConversion.getWrapper().unwrap(dataConversion.toStorage(o));
    }

    private Object transcode(Object o, MediaType sourceMediaType, MediaType targetMediaType) {
        EncoderRegistry encoderRegistry = (EncoderRegistry)TestingUtil.extractGlobalComponent((CacheContainer)this.manager(0), EncoderRegistry.class);
        return encoderRegistry.getTranscoder(sourceMediaType, targetMediaType).transcode(o, sourceMediaType, targetMediaType);
    }

    private byte[] getBytes(String s) {
        return s.getBytes(StandardCharsets.UTF_8);
    }

    private static String translateType(TestWriteOperation op) {
        switch (op) {
            case PUT_CREATE: 
            case PUT_CREATE_FUNCTIONAL: 
            case PUT_IF_ABSENT: 
            case PUT_MAP_CREATE: {
                return "org.infinispan.entry.created";
            }
            case PUT_OVERWRITE: 
            case PUT_OVERWRITE_FUNCTIONAL: 
            case REPLACE: 
            case REPLACE_EXACT: 
            case REPLACE_FUNCTIONAL: 
            case REPLACE_EXACT_FUNCTIONAL: 
            case REPLACE_META_FUNCTIONAL: {
                return "org.infinispan.entry.modified";
            }
            case REMOVE: 
            case REMOVE_EXACT: 
            case REMOVE_FUNCTIONAL: 
            case REMOVE_EXACT_FUNCTIONAL: {
                return "org.infinispan.entry.removed";
            }
        }
        throw new IllegalArgumentException("Unsupported event type: " + op);
    }
}

