/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.functional;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.server.test.core.InfinispanServerDriver;
import org.infinispan.server.test.core.InfinispanServerListener;
import org.infinispan.server.test.core.ServerRunMode;
import org.infinispan.server.test.junit4.InfinispanServerRule;
import org.infinispan.server.test.junit4.InfinispanServerRuleBuilder;
import org.infinispan.server.test.junit4.InfinispanServerTestMethodRule;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class CloudEventsIntegrationIT {
    public static final String CACHE_ENTRIES_TOPIC = "cache-entries";
    public static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse((String)"quay.io/cloudservices/cp-kafka:5.4.3").asCompatibleSubstituteFor("confluentinc/cp-kafka"));
    @ClassRule
    public static final InfinispanServerRule SERVERS = ((InfinispanServerRuleBuilder)((InfinispanServerRuleBuilder)((InfinispanServerRuleBuilder)InfinispanServerRuleBuilder.config((String)"configuration/CloudEventsIntegration.xml").numServers(1)).runMode(ServerRunMode.EMBEDDED)).addListener(new InfinispanServerListener(){

        public void before(InfinispanServerDriver driver) {
            KAFKA.start();
            driver.getConfiguration().properties().setProperty("kafka.bootstrap.servers", KAFKA.getBootstrapServers());
        }

        public void after(InfinispanServerDriver driver) {
            KAFKA.stop();
        }
    })).build();
    @Rule
    public InfinispanServerTestMethodRule SERVER_TEST = new InfinispanServerTestMethodRule(SERVERS);

    @Test
    public void testSendCacheEntryEvent() {
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", KAFKA.getBootstrapServers());
        kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        kafkaProperties.setProperty("group.id", CloudEventsIntegrationIT.class.getSimpleName());
        kafkaProperties.setProperty("auto.offset.reset", "earliest");
        try (KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaProperties);){
            Assert.assertTrue((boolean)kafkaConsumer.listTopics().containsKey(CACHE_ENTRIES_TOPIC));
            kafkaConsumer.subscribe(Collections.singleton(CACHE_ENTRIES_TOPIC));
            RemoteCacheManager rcm = this.SERVER_TEST.hotrod().createRemoteCacheManager();
            RemoteCache cache = rcm.getCache("default");
            Assert.assertNotNull((Object)cache);
            cache.put((Object)"k1", (Object)"v1");
            Assert.assertEquals((Object)"v1", (Object)cache.get((Object)"k1"));
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1L));
            Assert.assertEquals((long)1L, (long)records.count());
            ConsumerRecord eventRecord = (ConsumerRecord)records.iterator().next();
            Json expectedKeyJson = Json.object().set("_type", (Object)"string").set("_value", (Object)"k1");
            Json actualKeyJson = Json.read((String)new String((byte[])eventRecord.key()));
            Assert.assertEquals((Object)expectedKeyJson, (Object)actualKeyJson);
        }
    }
}

