/*
 * Decompiled with CFR 0.152.
 */
package org.wildfly.camel.test.kafka;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.I0Itec.zkclient.ZkClient;

public class EmbeddedKafkaCluster {
    private final List<Integer> ports;
    private final String zkConnection;
    private final Properties baseProperties;
    private final String brokerList;
    private final List<KafkaServer> brokers;
    private final List<File> logDirs;

    public EmbeddedKafkaCluster(String zkConnection) {
        this(zkConnection, new Properties());
    }

    public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) {
        this(zkConnection, baseProperties, Collections.singletonList(-1));
    }

    public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List<Integer> ports) {
        this.zkConnection = zkConnection;
        this.ports = this.resolvePorts(ports);
        this.baseProperties = baseProperties;
        this.brokers = new ArrayList<KafkaServer>();
        this.logDirs = new ArrayList<File>();
        this.brokerList = this.constructBrokerList(this.ports);
    }

    public ZkClient getZkClient() {
        Iterator<KafkaServer> iterator = this.brokers.iterator();
        if (iterator.hasNext()) {
            KafkaServer server = iterator.next();
            return server.zkClient();
        }
        return null;
    }

    public void createTopics(String ... topics) {
        for (String topic : topics) {
            AdminUtils.createTopic((ZkClient)this.getZkClient(), (String)topic, (int)2, (int)1, (Properties)new Properties());
        }
    }

    private List<Integer> resolvePorts(List<Integer> ports) {
        ArrayList<Integer> resolvedPorts = new ArrayList<Integer>();
        for (Integer port : ports) {
            resolvedPorts.add(this.resolvePort(port));
        }
        return resolvedPorts;
    }

    private int resolvePort(int port) {
        if (port == -1) {
            return TestUtils.getAvailablePort();
        }
        return port;
    }

    private String constructBrokerList(List<Integer> ports) {
        StringBuilder sb = new StringBuilder();
        for (Integer port : ports) {
            if (sb.length() > 0) {
                sb.append(",");
            }
            sb.append("localhost:").append(port);
        }
        return sb.toString();
    }

    public void startup() {
        for (int i = 0; i < this.ports.size(); ++i) {
            Integer port = this.ports.get(i);
            File logDir = TestUtils.constructTempDir("kafka-local");
            Properties properties = new Properties();
            properties.putAll((Map<?, ?>)this.baseProperties);
            properties.setProperty("zookeeper.connect", this.zkConnection);
            properties.setProperty("broker.id", String.valueOf(i + 1));
            properties.setProperty("host.name", "localhost");
            properties.setProperty("port", Integer.toString(port));
            properties.setProperty("log.dir", logDir.getAbsolutePath());
            properties.setProperty("num.partitions", String.valueOf(1));
            properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
            System.out.println("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath());
            properties.setProperty("log.flush.interval.messages", String.valueOf(1));
            KafkaServer broker = this.startBroker(properties);
            this.brokers.add(broker);
            this.logDirs.add(logDir);
        }
    }

    private KafkaServer startBroker(Properties props) {
        KafkaServer server = new KafkaServer(new KafkaConfig(props), (Time)new SystemTime());
        server.startup();
        return server;
    }

    public Properties getProps() {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)this.baseProperties);
        props.put("metadata.broker.list", this.brokerList);
        props.put("zookeeper.connect", this.zkConnection);
        return props;
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public List<Integer> getPorts() {
        return this.ports;
    }

    public String getZkConnection() {
        return this.zkConnection;
    }

    public void shutdown() {
        for (KafkaServer broker : this.brokers) {
            try {
                broker.shutdown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        for (File logDir : this.logDirs) {
            try {
                TestUtils.deleteFile(logDir);
            }
            catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{");
        sb.append("brokerList='").append(this.brokerList).append('\'');
        sb.append('}');
        return sb.toString();
    }

    private static final class TestUtils {
        private static final Random RANDOM = new Random();

        private TestUtils() {
        }

        public static File constructTempDir(String dirPrefix) {
            File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt(10000000));
            if (!file.mkdirs()) {
                throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
            }
            file.deleteOnExit();
            return file;
        }

        public static int getAvailablePort() {
            int n;
            ServerSocket socket = new ServerSocket(0);
            try {
                n = socket.getLocalPort();
            }
            catch (Throwable throwable) {
                try {
                    socket.close();
                    throw throwable;
                }
                catch (IOException e) {
                    throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e);
                }
            }
            socket.close();
            return n;
        }

        public static boolean deleteFile(File path) throws FileNotFoundException {
            boolean ret = true;
            if (path != null && path.exists()) {
                if (path.isDirectory()) {
                    for (File f : path.listFiles()) {
                        ret = ret && TestUtils.deleteFile(f);
                    }
                }
                ret = path.delete();
            }
            return ret;
        }
    }

    private static class SystemTime
    implements Time {
        private SystemTime() {
        }

        public long milliseconds() {
            return System.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public void sleep(long ms) {
            try {
                Thread.sleep(ms);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

