/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.PrinterKafkaImpl;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.sample.kjar.StockTickEvent;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.RemoteStreamingKieSession;
import org.kie.remote.TopicsConfig;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.command.SnapshotOnDemandCommand;
import org.kie.remote.impl.RemoteKieSessionImpl;
import org.kie.remote.impl.RemoteStreamingKieSessionImpl;
import org.kie.remote.impl.producer.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaUtilTest
implements AutoCloseable {
    private static final String ZOOKEEPER_HOST = "127.0.0.1";
    private static final String BROKER_HOST = "127.0.0.1";
    private static final String BROKER_PORT = "9092";
    private static final Logger logger = LoggerFactory.getLogger(KafkaUtilTest.class);
    private KafkaServer kafkaServer;
    private EmbeddedZookeeper zkServer;
    private String tmpDir;
    private KafkaAdminClient adminClient;
    private Logger kafkaLogger = LoggerFactory.getLogger((String)"org.hacep");

    public Map<String, Object> getKafkaProps() {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 0x2000000);
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        return props;
    }

    public KafkaServer startServer() throws IOException {
        this.tmpDir = Files.createTempDirectory(Paths.get(System.getProperty("user.dir"), File.separator, "target"), "kafkatest-", new FileAttribute[0]).toAbsolutePath().toString();
        this.zkServer = new EmbeddedZookeeper();
        String zkConnect = "127.0.0.1:" + this.zkServer.port();
        Properties brokerProps = new Properties();
        brokerProps.setProperty("zookeeper.connect", zkConnect);
        brokerProps.setProperty("broker.id", "0");
        brokerProps.setProperty("log.dirs", this.tmpDir);
        brokerProps.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092");
        brokerProps.setProperty("offsets.topic.replication.factor", "1");
        brokerProps.setProperty("auto.create.topics.enable", "true");
        KafkaConfig config = new KafkaConfig((Map)brokerProps);
        SystemTime mock = new SystemTime();
        this.kafkaServer = TestUtils.createServer((KafkaConfig)config, (Time)mock);
        Map<String, Object> props = this.getKafkaProps();
        this.adminClient = (KafkaAdminClient)AdminClient.create(props);
        return this.kafkaServer;
    }

    public void shutdownServer() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        logger.warn("Shutdown kafka server");
        Path tmp = Paths.get(this.tmpDir, new String[0]);
        try {
            if (this.kafkaServer.brokerState().currentState() != NotRunning.state()) {
                this.kafkaServer.shutdown();
                this.kafkaServer.awaitShutdown();
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
        this.kafkaServer = null;
        try {
            this.zkServer.shutdown();
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
        this.zkServer = null;
        try {
            logger.warn("Deleting kafka temp dir:{}", (Object)tmp.toString());
            Files.walk(tmp, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
        try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(tmp.getParent());){
            for (Path path : directoryStream) {
                if (!path.toString().startsWith("kafkatest-")) continue;
                logger.warn("Deleting kafkatest folder:{}", (Object)path.toString());
                Files.walk(path, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
            }
        }
        catch (IOException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void close() {
        this.shutdownServer();
    }

    public <K, V> void sendSingleMsg(KafkaProducer<K, V> producer, ProducerRecord<K, V> data) {
        producer.send(data);
        producer.close();
    }

    private Properties getConsumerConfig() {
        Properties consumerProps = new Properties();
        consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
        consumerProps.setProperty("group.id", "group0");
        consumerProps.setProperty("client.id", "consumer0");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return consumerProps;
    }

    private Properties getProducerConfig() {
        Properties producerProps = new Properties();
        producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
        return producerProps;
    }

    public <K, V> KafkaConsumer<K, V> getStringConsumer(String topic) {
        Properties consumerProps = this.getConsumerConfig();
        consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }

    public <K, V> KafkaConsumer<K, V> getByteArrayConsumer(String topic) {
        Properties consumerProps = this.getConsumerConfig();
        consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }

    public <K, V> KafkaProducer<K, V> getByteArrayProducer() {
        Properties producerProps = this.getProducerConfig();
        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer(producerProps);
    }

    public KafkaConsumer getConsumer(String topic, Properties props) {
        KafkaConsumer consumer = new KafkaConsumer(props);
        List infos = consumer.partitionsFor(topic);
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        if (infos != null) {
            for (PartitionInfo partition : infos) {
                partitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
        }
        consumer.assign(partitions);
        Set assignments = consumer.assignment();
        assignments.forEach(topicPartition -> consumer.seekToBeginning((Collection)assignments));
        return consumer;
    }

    public void insertBatchStockTicketEvent(int items, TopicsConfig topicsConfig, Class sessionType) {
        this.insertBatchStockTicketEvent(items, topicsConfig, sessionType, Config.getProducerConfig((String)"InsertBatchStockTicketEvent"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void insertBatchStockTicketEvent(int items, TopicsConfig topicsConfig, Class sessionType, Properties props) {
        StockTickEvent ticket;
        int i;
        RemoteKieSessionImpl producer;
        if (sessionType.equals(RemoteKieSession.class)) {
            producer = new RemoteKieSessionImpl(props, topicsConfig);
            producer.fireUntilHalt();
            try {
                for (i = 0; i < items; ++i) {
                    ticket = new StockTickEvent("RHT", (double)ThreadLocalRandom.current().nextLong(80L, 100L));
                    producer.insert((Object)ticket);
                }
            }
            finally {
                producer.close();
            }
        }
        if (sessionType.equals(RemoteStreamingKieSession.class)) {
            producer = new RemoteStreamingKieSessionImpl(props, topicsConfig);
            producer.fireUntilHalt();
            try {
                for (i = 0; i < items; ++i) {
                    ticket = new StockTickEvent("RHT", (double)ThreadLocalRandom.current().nextLong(80L, 100L));
                    producer.insert((Object)ticket);
                }
            }
            finally {
                producer.close();
            }
        }
    }

    public static void insertSnapshotOnDemandCommand() {
        Properties props = Config.getProducerConfig((String)"insertSnapshotOnDemandCommand");
        Sender sender = new Sender(props);
        sender.start();
        SnapshotOnDemandCommand command = new SnapshotOnDemandCommand();
        sender.sendCommand((RemoteCommand)command, TopicsConfig.getDefaultTopicsConfig().getEventsTopicName());
        sender.stop();
    }

    public static EnvConfig getEnvConfig() {
        return EnvConfig.anEnvConfig().withNamespace("default").withControlTopicName("control").withEventsTopicName("events").withSnapshotTopicName("snapshot").withKieSessionInfosTopicName("kiesessioninfos").withPrinterType(PrinterKafkaImpl.class.getName()).withPollTimeUnit("millisec").withPollTimeout("1000").withIterationBetweenSnapshot("10").skipOnDemandSnapshot("true").withMaxSnapshotAgeSeconds("60000").withPollSnapshotTimeUnit("sec").withPollSnapshotTimeout("10").withUpdatableKJar("false").underTest(true);
    }

    public void tearDown() {
        this.kafkaLogger.warn("tearDown");
        try {
            Bootstrap.stopEngine();
        }
        catch (ConcurrentModificationException ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        }
        this.kafkaLogger.warn("shutdownServer");
        this.shutdownServer();
    }
}

