/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.utils.Exit$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.Random;

public final class EndToEndLatency$ {
    public static EndToEndLatency$ MODULE$;
    private final long timeout;
    private final short defaultReplicationFactor;
    private final int defaultNumPartitions;

    static {
        new EndToEndLatency$();
    }

    private long timeout() {
        return this.timeout;
    }

    private short defaultReplicationFactor() {
        return this.defaultReplicationFactor;
    }

    private int defaultNumPartitions() {
        return this.defaultNumPartitions;
    }

    public void main(String[] args) {
        None$ propsFile;
        if (args.length != 5 && args.length != 6) {
            System.err.println(new StringBuilder(103).append("USAGE: java ").append(this.getClass().getName()).append(" broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file").toString());
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        }
        String brokerList = args[0];
        String topic = args[1];
        int numMessages = new StringOps(Predef$.MODULE$.augmentString(args[2])).toInt();
        String producerAcks = args[3];
        int messageLen = new StringOps(Predef$.MODULE$.augmentString(args[4])).toInt();
        Object object = propsFile = args.length > 5 ? new Some((Object)args[5]).filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)EndToEndLatency$.$anonfun$main$1(x$1))) : None$.MODULE$;
        if (!new .colon.colon((Object)"1", (List)new .colon.colon((Object)"all", (List)Nil$.MODULE$)).contains((Object)producerAcks)) {
            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
        }
        Properties consumerProps = EndToEndLatency$.loadPropsWithBootstrapServers$1((Option)propsFile, brokerList);
        consumerProps.put("group.id", new StringBuilder(11).append("test-group-").append(System.currentTimeMillis()).toString());
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("auto.offset.reset", "latest");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("fetch.max.wait.ms", "0");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        Properties producerProps = EndToEndLatency$.loadPropsWithBootstrapServers$1((Option)propsFile, brokerList);
        producerProps.put("linger.ms", "0");
        producerProps.put("max.block.ms", ((Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE)).toString());
        producerProps.put("acks", producerAcks.toString());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer = new KafkaProducer(producerProps);
        if (!consumer.listTopics().containsKey(topic)) {
            try {
                this.createTopic(topic, EndToEndLatency$.loadPropsWithBootstrapServers$1((Option)propsFile, brokerList));
            }
            catch (Throwable t) {
                EndToEndLatency$.finalise$1(consumer, producer);
                throw new RuntimeException(new StringBuilder(23).append("Failed to create topic ").append(topic).toString(), t);
            }
        }
        java.util.List topicPartitions = (java.util.List)JavaConverters$.MODULE$.bufferAsJavaListConverter((Buffer)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topic)).asScala()).map((Function1 & Serializable & scala.Serializable)p -> new TopicPartition(p.topic(), p.partition()), Buffer$.MODULE$.canBuildFrom())).asJava();
        consumer.assign(topicPartitions);
        consumer.seekToEnd(topicPartitions);
        ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(consumer.assignment()).asScala()).foreach((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)consumer.position(x$1)));
        DoubleRef totalTime = DoubleRef.create((double)0.0);
        long[] latencies = new long[numMessages];
        Random random = new Random(0);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            byte[] message = MODULE$.randomBytesOfLen(random, messageLen);
            long begin = System.nanoTime();
            producer.send(new ProducerRecord(topic, message)).get();
            Iterator recordIter = consumer.poll(Duration.ofMillis(MODULE$.timeout())).iterator();
            long elapsed = System.nanoTime() - begin;
            if (!recordIter.hasNext()) {
                EndToEndLatency$.finalise$1(consumer, producer);
                throw new RuntimeException(new StringBuilder(53).append("poll() timed out before finding a result (timeout:[").append(MODULE$.timeout()).append("])").toString());
            }
            String sent = new String(message, StandardCharsets.UTF_8);
            String read = new String((byte[])recordIter.next().value(), StandardCharsets.UTF_8);
            if (!read.equals(sent)) {
                EndToEndLatency$.finalise$1(consumer, producer);
                throw new RuntimeException(new StringBuilder(53).append("The message read [").append(read).append("] did not match the message sent [").append(sent).append("]").toString());
            }
            if (recordIter.hasNext()) {
                int count = 1 + ((TraversableOnce)JavaConverters$.MODULE$.asScalaIteratorConverter(recordIter).asScala()).size();
                throw new RuntimeException(new StringBuilder(58).append("Only one result was expected during this test. We found [").append(count).append("]").toString());
            }
            if (i % 1000 == 0) {
                Predef$.MODULE$.println((Object)new StringBuilder(1).append(i).append("\t").append((double)elapsed / 1000.0 / 1000.0).toString());
            }
            totalTime$1.elem += (double)elapsed;
            latencies$1[i] = elapsed / 1000L / 1000L;
        });
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Avg latency: %.4f ms\n")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToDouble((double)(totalTime.elem / (double)numMessages / 1000.0 / 1000.0))})));
        Arrays.sort(latencies);
        long p50 = latencies[(int)((double)latencies.length * 0.5)];
        long p99 = latencies[(int)((double)latencies.length * 0.99)];
        long p999 = latencies[(int)((double)latencies.length * 0.999)];
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Percentiles: 50th = %d, 99th = %d, 99.9th = %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)p50), BoxesRunTime.boxToLong((long)p99), BoxesRunTime.boxToLong((long)p999)})));
        EndToEndLatency$.finalise$1(consumer, producer);
    }

    public byte[] randomBytesOfLen(Random random, int len) {
        return (byte[])Array$.MODULE$.fill(len, (Function0)(JFunction0.mcB.sp & Serializable & scala.Serializable)() -> (byte)(random.nextInt(26) + 65), ClassTag$.MODULE$.Byte());
    }

    public void createTopic(String topic, Properties props) {
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Topic \"%s\" does not exist. Will create topic with %d partition(s) and replication factor = %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic, BoxesRunTime.boxToInteger((int)this.defaultNumPartitions()), BoxesRunTime.boxToShort((short)this.defaultReplicationFactor())})));
        AdminClient adminClient = AdminClient.create(props);
        NewTopic newTopic = new NewTopic(topic, this.defaultNumPartitions(), this.defaultReplicationFactor());
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
        }
        finally {
            Utils.closeQuietly(adminClient, "AdminClient");
        }
    }

    public static final /* synthetic */ boolean $anonfun$main$1(String x$1) {
        return new StringOps(Predef$.MODULE$.augmentString(x$1)).nonEmpty();
    }

    /*
     * WARNING - void declaration
     */
    private static final Properties loadPropsWithBootstrapServers$1(Option propsFile$1, String brokerList$1) {
        void var2_2;
        Properties props = (Properties)propsFile$1.map((Function1 & Serializable & scala.Serializable)x$1 -> Utils.loadProps(x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> new Properties());
        props.put("bootstrap.servers", brokerList$1);
        return var2_2;
    }

    private static final void finalise$1(KafkaConsumer consumer$1, KafkaProducer producer$1) {
        consumer$1.commitSync();
        producer$1.close();
        consumer$1.close();
    }

    private EndToEndLatency$() {
        MODULE$ = this;
        this.timeout = 60000L;
        this.defaultReplicationFactor = 1;
        this.defaultNumPartitions = 1;
    }
}

