package org.infinispan.cloudevents;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.cloudevents.configuration.CloudEventsGlobalConfigurationBuilder;
import org.infinispan.cloudevents.impl.KafkaEventSender;
import org.infinispan.commands.module.TestGlobalConfigurationBuilder;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.StorageType;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.internal.PrivateGlobalConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.rehash.TestWriteOperation;
import org.infinispan.encoding.DataConversion;
import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "cloudevents.impl.CacheEntryCloudEventsTest")
/* loaded from: input_file:org/infinispan/cloudevents/CacheEntryCloudEventsTest.class */
public class CacheEntryCloudEventsTest extends MultipleCacheManagersTest {
    public static final String CACHE_NAME = "testCache";
    private final MockKafkaEventSender mockSender = new MockKafkaEventSender();
    private StorageType storageType;
    private boolean serverMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.infinispan.cloudevents.CacheEntryCloudEventsTest$1, reason: invalid class name */
    /* loaded from: input_file:org/infinispan/cloudevents/CacheEntryCloudEventsTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation = new int[TestWriteOperation.values().length];

        static {
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.PUT_CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.PUT_CREATE_FUNCTIONAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.PUT_IF_ABSENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.PUT_MAP_CREATE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.PUT_OVERWRITE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.PUT_OVERWRITE_FUNCTIONAL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REPLACE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REPLACE_EXACT.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REPLACE_FUNCTIONAL.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REPLACE_EXACT_FUNCTIONAL.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REPLACE_META_FUNCTIONAL.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REMOVE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REMOVE_EXACT.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REMOVE_FUNCTIONAL.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[TestWriteOperation.REMOVE_EXACT_FUNCTIONAL.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
        }
    }

    public Object[] factory() {
        return new Object[]{new CacheEntryCloudEventsTest().m0storageType(StorageType.OBJECT), new CacheEntryCloudEventsTest().m0storageType(StorageType.BINARY), new CacheEntryCloudEventsTest().m0storageType(StorageType.HEAP).serverMode(true)};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] operations() {
        return new Object[]{new Object[]{TestWriteOperation.PUT_CREATE}, new Object[]{TestWriteOperation.PUT_OVERWRITE}, new Object[]{TestWriteOperation.PUT_IF_ABSENT}, new Object[]{TestWriteOperation.REPLACE}, new Object[]{TestWriteOperation.REPLACE_EXACT}, new Object[]{TestWriteOperation.REMOVE}, new Object[]{TestWriteOperation.REMOVE_EXACT}, new Object[]{TestWriteOperation.PUT_MAP_CREATE}};
    }

    /* renamed from: storageType, reason: merged with bridge method [inline-methods] */
    public CacheEntryCloudEventsTest m0storageType(StorageType storageType) {
        this.storageType = storageType;
        return this;
    }

    private Object serverMode(boolean z) {
        this.serverMode = z;
        return this;
    }

    protected void createCacheManagers() {
        addNode();
        addNode();
        addNode();
        waitForClusterToForm();
    }

    protected String[] parameterNames() {
        return new String[]{"storage", "server"};
    }

    protected Object[] parameterValues() {
        Object[] objArr = new Object[2];
        objArr[0] = this.storageType;
        objArr[1] = this.serverMode ? "y" : null;
        return objArr;
    }

    private Address addNode() {
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder.defaultCacheName(CACHE_NAME).serialization().addContextInitializer(TestDataSCI.INSTANCE);
        if (this.serverMode) {
            defaultClusteredBuilder.addModule(PrivateGlobalConfigurationBuilder.class).serverMode(true);
        }
        CloudEventsGlobalConfigurationBuilder addModule = defaultClusteredBuilder.addModule(CloudEventsGlobalConfigurationBuilder.class);
        addModule.bootstrapServers("localhost:9092");
        addModule.cacheEntriesTopic("ispn");
        defaultClusteredBuilder.addModule(TestGlobalConfigurationBuilder.class).testGlobalComponent(KafkaEventSender.class.getName(), this.mockSender);
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
        configurationBuilder.memory().storage(this.storageType);
        return addClusterEnabledCacheManager(defaultClusteredBuilder, configurationBuilder).getAddress();
    }

    @Test(dataProvider = "operations")
    public void testSingleKeyOperations(TestWriteOperation testWriteOperation) throws InterruptedException, TimeoutException, ExecutionException {
        AdvancedCache advancedCache = advancedCache(0);
        Iterator it = caches().iterator();
        while (it.hasNext()) {
            MagicKey magicKey = new MagicKey((Cache) it.next());
            if (testWriteOperation.getPreviousValue() != null) {
                this.mockSender.clear();
                advancedCache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put(magicKey, testWriteOperation.getPreviousValue());
                AssertJUnit.assertTrue(this.mockSender.getProducer().history().isEmpty());
            }
            this.mockSender.clear();
            CompletionStage performAsync = testWriteOperation.performAsync(advancedCache, magicKey);
            AssertJUnit.assertFalse(performAsync.toCompletableFuture().isDone());
            this.mockSender.completeSend();
            performAsync.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            Object value = testWriteOperation.getValue() != null ? testWriteOperation.getValue() : testWriteOperation.getPreviousValue();
            assertEntryEventSent(magicKey, value, testWriteOperation);
            AssertJUnit.assertEquals(testWriteOperation.getReturnValue(), performAsync.toCompletableFuture().get(30L, TimeUnit.SECONDS));
            assertEntryEventSent(magicKey, value, testWriteOperation);
        }
    }

    public void testMultiKeyOperations() throws InterruptedException, TimeoutException, ExecutionException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i = 0; i < caches().size(); i++) {
            linkedHashMap.put(new MagicKey("key-" + i, cache(i)), "value-" + i);
        }
        for (Cache cache : caches()) {
            this.log.tracef("Testing on %s", address(cache));
            this.mockSender.clear();
            CompletableFuture putAllAsync = cache.putAllAsync(linkedHashMap);
            AssertJUnit.assertFalse(putAllAsync.isDone());
            this.mockSender.completeSend(linkedHashMap.size());
            putAllAsync.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            linkedHashMap.forEach((magicKey, obj) -> {
                assertEntryEventSent(magicKey, obj, TestWriteOperation.PUT_MAP_CREATE);
            });
            this.mockSender.clear();
            CompletableFuture clearAsync = cache.clearAsync();
            AssertJUnit.assertFalse(clearAsync.isDone());
            this.mockSender.completeSend(linkedHashMap.size());
            clearAsync.toCompletableFuture().get(30L, TimeUnit.SECONDS);
            linkedHashMap.forEach((magicKey2, obj2) -> {
                assertEntryEventSent(magicKey2, obj2, TestWriteOperation.REMOVE);
            });
        }
    }

    private void assertEntryEventSent(Object obj, Object obj2, TestWriteOperation testWriteOperation) {
        byte[] keyBytes = getKeyBytes(obj);
        byte[] valueBytes = getValueBytes(obj2);
        String translateType = translateType(testWriteOperation);
        Optional findFirst = this.mockSender.getProducer().history().stream().filter(producerRecord -> {
            return Arrays.equals((byte[]) producerRecord.key(), keyBytes);
        }).findFirst();
        AssertJUnit.assertTrue(findFirst.isPresent());
        Json read = Json.read(new String((byte[]) ((ProducerRecord) findFirst.get()).value()));
        AssertJUnit.assertEquals("1.0", read.at("specversion").asString());
        AssertJUnit.assertEquals(translateType, read.at("type").asString());
        String asString = read.at("source").asString();
        AssertJUnit.assertTrue(asString.startsWith("/infinispan"));
        AssertJUnit.assertTrue(asString.endsWith("/testCache"));
        Instant.parse(read.at("time").asString());
        boolean asBoolean = read.at("orginfinispansubject_isbase64", false).asBoolean();
        AssertJUnit.assertEquals(expectedContentType(asBoolean).toString(), read.at("orginfinispansubject_contenttype", "application/json").asString());
        String asString2 = read.at("subject").asString();
        AssertJUnit.assertEquals(keyBytes, asBoolean ? Base64.getDecoder().decode(asString2) : asString2.getBytes(StandardCharsets.UTF_8));
        String asString3 = read.at("data").asString();
        boolean asBoolean2 = read.at("orginfinispandata_isbase64", false).asBoolean();
        AssertJUnit.assertEquals(expectedContentType(asBoolean2).toString(), read.at("datacontenttype", "application/json").asString());
        AssertJUnit.assertEquals(valueBytes, asBoolean2 ? Base64.getDecoder().decode(asString3) : asString3.getBytes(StandardCharsets.UTF_8));
    }

    private MediaType expectedContentType(boolean z) {
        return z ? this.serverMode ? MediaType.APPLICATION_UNKNOWN : MediaType.APPLICATION_PROTOSTREAM : MediaType.APPLICATION_JSON;
    }

    private byte[] getKeyBytes(Object obj) {
        return getBytes(obj, advancedCache(0).getKeyDataConversion());
    }

    private byte[] getValueBytes(Object obj) {
        return getBytes(obj, advancedCache(0).getValueDataConversion());
    }

    private byte[] getBytes(Object obj, DataConversion dataConversion) {
        return dataConversion.getStorageMediaType().match(MediaType.APPLICATION_OBJECT) ? obj instanceof String ? getBytes((String) obj) : (byte[]) transcode(transcode(obj, MediaType.APPLICATION_OBJECT, MediaType.APPLICATION_PROTOSTREAM), MediaType.APPLICATION_PROTOSTREAM, MediaType.APPLICATION_JSON) : (byte[]) dataConversion.getWrapper().unwrap(dataConversion.toStorage(obj));
    }

    private Object transcode(Object obj, MediaType mediaType, MediaType mediaType2) {
        return ((EncoderRegistry) TestingUtil.extractGlobalComponent(manager(0), EncoderRegistry.class)).getTranscoder(mediaType, mediaType2).transcode(obj, mediaType, mediaType2);
    }

    private byte[] getBytes(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }

    private static String translateType(TestWriteOperation testWriteOperation) {
        switch (AnonymousClass1.$SwitchMap$org$infinispan$distribution$rehash$TestWriteOperation[testWriteOperation.ordinal()]) {
            case 1:
            case 2:
            case 3:
            case 4:
                return "org.infinispan.entry.created";
            case 5:
            case 6:
            case 7:
            case 8:
            case 9:
            case 10:
            case 11:
                return "org.infinispan.entry.modified";
            case 12:
            case 13:
            case 14:
            case 15:
                return "org.infinispan.entry.removed";
            default:
                throw new IllegalArgumentException("Unsupported event type: " + testWriteOperation);
        }
    }
}
