package org.kie.hacep;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.sample.kjar.StockTickEvent;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.RemoteStreamingKieSession;
import org.kie.remote.TopicsConfig;
import org.kie.remote.command.SnapshotOnDemandCommand;
import org.kie.remote.impl.RemoteKieSessionImpl;
import org.kie.remote.impl.RemoteStreamingKieSessionImpl;
import org.kie.remote.impl.producer.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/hacep/KafkaUtilTest.class */
public class KafkaUtilTest implements AutoCloseable {
    private static final String ZOOKEEPER_HOST = "127.0.0.1";
    private static final String BROKER_HOST = "127.0.0.1";
    private static final String BROKER_PORT = "9092";
    private static final Logger logger = LoggerFactory.getLogger(KafkaUtilTest.class);
    private KafkaServer kafkaServer;
    private EmbeddedZookeeper zkServer;
    private String tmpDir;
    private KafkaAdminClient adminClient;
    private Logger kafkaLogger = LoggerFactory.getLogger("org.hacep");

    public Map<String, Object> getKafkaProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9092");
        hashMap.put("retries", 0);
        hashMap.put("batch.size", 16384);
        hashMap.put("linger.ms", 1);
        hashMap.put("buffer.memory", 33554432);
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", StringSerializer.class);
        return hashMap;
    }

    public KafkaServer startServer() throws IOException {
        this.tmpDir = Files.createTempDirectory(Paths.get(System.getProperty("user.dir"), File.separator, "target"), "kafkatest-", new FileAttribute[0]).toAbsolutePath().toString();
        this.zkServer = new EmbeddedZookeeper();
        String str = "127.0.0.1:" + this.zkServer.port();
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("broker.id", "0");
        properties.setProperty("log.dirs", this.tmpDir);
        properties.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092");
        properties.setProperty("offsets.topic.replication.factor", "1");
        properties.setProperty("auto.create.topics.enable", "true");
        this.kafkaServer = TestUtils.createServer(new KafkaConfig(properties), new SystemTime());
        this.adminClient = AdminClient.create(getKafkaProps());
        return this.kafkaServer;
    }

    public void shutdownServer() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        logger.warn("Shutdown kafka server");
        Path path = Paths.get(this.tmpDir, new String[0]);
        try {
            if (this.kafkaServer.brokerState().currentState() != NotRunning.state()) {
                this.kafkaServer.shutdown();
                this.kafkaServer.awaitShutdown();
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        this.kafkaServer = null;
        try {
            this.zkServer.shutdown();
        } catch (Exception e2) {
            logger.error(e2.getMessage(), e2);
        }
        this.zkServer = null;
        try {
            logger.warn("Deleting kafka temp dir:{}", path.toString());
            Files.walk(path, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                return v0.toFile();
            }).forEach((v0) -> {
                v0.delete();
            });
        } catch (Exception e3) {
            logger.error(e3.getMessage(), e3);
        }
        try {
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path.getParent());
            Throwable th = null;
            try {
                try {
                    for (Path path2 : newDirectoryStream) {
                        if (path2.toString().startsWith("kafkatest-")) {
                            logger.warn("Deleting kafkatest folder:{}", path2.toString());
                            Files.walk(path2, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                                return v0.toFile();
                            }).forEach((v0) -> {
                                v0.delete();
                            });
                        }
                    }
                    if (newDirectoryStream != null) {
                        if (0 != 0) {
                            try {
                                newDirectoryStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newDirectoryStream.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e4) {
            logger.error(e4.getMessage(), e4);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        shutdownServer();
    }

    public <K, V> void sendSingleMsg(KafkaProducer<K, V> kafkaProducer, ProducerRecord<K, V> producerRecord) {
        kafkaProducer.send(producerRecord);
        kafkaProducer.close();
    }

    private Properties getConsumerConfig() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("group.id", "group0");
        properties.setProperty("client.id", "consumer0");
        properties.put("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    private Properties getProducerConfig() {
        Properties properties = new Properties();
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        return properties;
    }

    public <K, V> KafkaConsumer<K, V> getStringConsumer(String str) {
        Properties consumerConfig = getConsumerConfig();
        consumerConfig.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
        kafkaConsumer.subscribe(Arrays.asList(str));
        return kafkaConsumer;
    }

    public <K, V> KafkaConsumer<K, V> getByteArrayConsumer(String str) {
        Properties consumerConfig = getConsumerConfig();
        consumerConfig.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
        kafkaConsumer.subscribe(Arrays.asList(str));
        return kafkaConsumer;
    }

    public <K, V> KafkaProducer<K, V> getByteArrayProducer() {
        Properties producerConfig = getProducerConfig();
        producerConfig.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<>(producerConfig);
    }

    public KafkaConsumer getConsumer(String str, Properties properties) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
        ArrayList arrayList = new ArrayList();
        if (partitionsFor != null) {
            for (PartitionInfo partitionInfo : partitionsFor) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        kafkaConsumer.assign(arrayList);
        Set assignment = kafkaConsumer.assignment();
        assignment.forEach(topicPartition -> {
            kafkaConsumer.seekToBeginning(assignment);
        });
        return kafkaConsumer;
    }

    public void insertBatchStockTicketEvent(int i, TopicsConfig topicsConfig, Class cls) {
        insertBatchStockTicketEvent(i, topicsConfig, cls, Config.getProducerConfig("InsertBatchStockTicketEvent"));
    }

    public void insertBatchStockTicketEvent(int i, TopicsConfig topicsConfig, Class cls, Properties properties) {
        if (cls.equals(RemoteKieSession.class)) {
            RemoteKieSessionImpl remoteKieSessionImpl = new RemoteKieSessionImpl(properties, topicsConfig);
            remoteKieSessionImpl.fireUntilHalt();
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    remoteKieSessionImpl.insert(new StockTickEvent("RHT", ThreadLocalRandom.current().nextLong(80L, 100L)));
                } finally {
                    remoteKieSessionImpl.close();
                }
            }
        }
        if (cls.equals(RemoteStreamingKieSession.class)) {
            RemoteStreamingKieSessionImpl remoteStreamingKieSessionImpl = new RemoteStreamingKieSessionImpl(properties, topicsConfig);
            remoteStreamingKieSessionImpl.fireUntilHalt();
            for (int i3 = 0; i3 < i; i3++) {
                try {
                    remoteStreamingKieSessionImpl.insert(new StockTickEvent("RHT", ThreadLocalRandom.current().nextLong(80L, 100L)));
                } finally {
                    remoteStreamingKieSessionImpl.close();
                }
            }
        }
    }

    public static void insertSnapshotOnDemandCommand() {
        Sender sender = new Sender(Config.getProducerConfig("insertSnapshotOnDemandCommand"));
        sender.start();
        sender.sendCommand(new SnapshotOnDemandCommand(), TopicsConfig.getDefaultTopicsConfig().getEventsTopicName());
        sender.stop();
    }

    public static EnvConfig getEnvConfig() {
        return EnvConfig.anEnvConfig().withNamespace("default").withControlTopicName("control").withEventsTopicName("events").withSnapshotTopicName("snapshot").withKieSessionInfosTopicName("kiesessioninfos").withPrinterType(PrinterKafkaImpl.class.getName()).withPollTimeUnit("millisec").withPollTimeout("1000").withIterationBetweenSnapshot("10").skipOnDemandSnapshot("true").withMaxSnapshotAgeSeconds("60000").withPollSnapshotTimeUnit("sec").withPollSnapshotTimeout("10").withUpdatableKJar("false").underTest(true);
    }

    public void tearDown() {
        this.kafkaLogger.warn("tearDown");
        try {
            Bootstrap.stopEngine();
            this.kafkaLogger.warn("shutdownServer");
            shutdownServer();
        } catch (ConcurrentModificationException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
