package org.infinispan.server.functional;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.dataconversion.internal.Json;
import org.infinispan.server.test.core.InfinispanServerDriver;
import org.infinispan.server.test.core.InfinispanServerListener;
import org.infinispan.server.test.core.ServerRunMode;
import org.infinispan.server.test.junit4.InfinispanServerRule;
import org.infinispan.server.test.junit4.InfinispanServerRuleBuilder;
import org.infinispan.server.test.junit4.InfinispanServerTestMethodRule;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/infinispan/server/functional/CloudEventsIntegrationIT.class */
public class CloudEventsIntegrationIT {
    public static final String CACHE_ENTRIES_TOPIC = "cache-entries";

    @ClassRule
    public static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("quay.io/cloudservices/cp-kafka:5.4.3").asCompatibleSubstituteFor("confluentinc/cp-kafka"));

    @ClassRule
    public static final InfinispanServerRule SERVERS = InfinispanServerRuleBuilder.config("configuration/CloudEventsIntegration.xml").numServers(1).runMode(ServerRunMode.EMBEDDED).addListener(new InfinispanServerListener() { // from class: org.infinispan.server.functional.CloudEventsIntegrationIT.1
        public void before(InfinispanServerDriver infinispanServerDriver) {
            CloudEventsIntegrationIT.KAFKA.start();
            infinispanServerDriver.getConfiguration().properties().setProperty("kafka.bootstrap.servers", CloudEventsIntegrationIT.KAFKA.getBootstrapServers());
        }

        public void after(InfinispanServerDriver infinispanServerDriver) {
            CloudEventsIntegrationIT.KAFKA.stop();
        }
    }).build();

    @Rule
    public InfinispanServerTestMethodRule SERVER_TEST = new InfinispanServerTestMethodRule(SERVERS);

    @Test
    public void testSendCacheEntryEvent() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", KAFKA.getBootstrapServers());
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.setProperty("group.id", CloudEventsIntegrationIT.class.getSimpleName());
        properties.setProperty("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            Assert.assertTrue(kafkaConsumer.listTopics().containsKey(CACHE_ENTRIES_TOPIC));
            kafkaConsumer.subscribe(Collections.singleton(CACHE_ENTRIES_TOPIC));
            RemoteCache cache = this.SERVER_TEST.hotrod().createRemoteCacheManager().getCache("default");
            Assert.assertNotNull(cache);
            cache.put("k1", "v1");
            Assert.assertEquals("v1", cache.get("k1"));
            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(1L));
            Assert.assertEquals(1L, poll.count());
            Assert.assertEquals(Json.object().set("_type", "string").set("_value", "k1"), Json.read(new String((byte[]) ((ConsumerRecord) poll.iterator().next()).key())));
            kafkaConsumer.close();
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }
}
