/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.util.RingBuffer;
import org.jgroups.util.Util;

public class RingBufferStressTest {
    static void start(int capacity, int num_msgs, int num_adders, int num_removers) {
        int i;
        RingBuffer<Integer> buffer = new RingBuffer<Integer>(capacity);
        final AtomicInteger added = new AtomicInteger(0);
        final AtomicInteger seqno = new AtomicInteger(1);
        final AtomicInteger removed = new AtomicInteger(0);
        Adder[] adders = new Adder[num_adders];
        final Remover[] removers = new Remover[num_removers];
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                System.out.println("added=" + added + ", removed=" + removed + ", seqno=" + seqno);
                RingBufferStressTest.dump(removers);
            }
        });
        CountDownLatch latch = new CountDownLatch(1);
        for (i = 0; i < adders.length; ++i) {
            adders[i] = new Adder(buffer, latch, num_msgs, added, seqno);
            adders[i].start();
        }
        for (i = 0; i < removers.length; ++i) {
            removers[i] = new Remover(buffer, latch, num_msgs, removed);
            removers[i].start();
        }
        long start = System.currentTimeMillis();
        latch.countDown();
        for (Adder adder : adders) {
            try {
                adder.join();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        LinkedList<Integer> all_values = new LinkedList<Integer>();
        for (Remover remover : removers) {
            try {
                remover.join();
                all_values.addAll(remover.getList());
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long time = System.currentTimeMillis() - start;
        double requests_sec = (double)num_msgs / ((double)time / 1000.0);
        System.out.println("\nTime: " + time + " ms, " + Util.format(requests_sec) + " requests / sec\n");
        System.out.println("Total removed messages: " + removed);
        if (all_values.size() < 100) {
            Collections.sort(all_values);
            System.out.println("values (expected=" + num_msgs + ", removed=" + all_values.size() + "):\n" + Util.print(all_values));
        }
    }

    static void dump(Remover[] removers) {
        LinkedList<Integer> list = new LinkedList<Integer>();
        for (Remover remover : removers) {
            list.addAll(remover.getList());
        }
        Collections.sort(list);
        System.out.println("\n" + list.size() + " elements: " + list.getFirst() + " - " + list.getLast());
        System.out.println("Checking for missing elements:");
        int prev = 0;
        int count = 0;
        for (Integer i : list) {
            if (prev + 1 != i) {
                System.err.println(prev + 1 + " is missing, sequence: prev=" + prev + ", current=" + i);
                count += i - prev - 1;
                prev = i;
                continue;
            }
            ++prev;
        }
        if (count == 0) {
            System.out.println("Found no missing elements\n");
        } else {
            System.out.println("Found " + count + " missing elements\n");
        }
    }

    public static void main(String[] args) {
        int capacity = 100;
        int num_adders = 10;
        int num_removers = 1;
        int num_msgs = 1000000;
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-num_adders")) {
                num_adders = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-num_removers")) {
                num_removers = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-num_msgs")) {
                num_msgs = Integer.parseInt(args[++i]);
                continue;
            }
            if (args[i].equals("-capacity")) {
                capacity = Integer.parseInt(args[++i]);
                continue;
            }
            System.out.println("BlockingRingBufferStressTest [-capacity <buffer capacity>] [-num_msgs msgs] [-num_adders adders] [-num_removers removers]");
            return;
        }
        RingBufferStressTest.start(capacity, num_msgs, num_adders, num_removers);
    }

    static class Remover
    extends Thread {
        final RingBuffer<Integer> buffer;
        final CountDownLatch latch;
        final int msgs_to_remove;
        final AtomicInteger removed_msgs;
        final LinkedList<Integer> list = new LinkedList();

        public Remover(RingBuffer<Integer> buffer, CountDownLatch latch, int msgs_to_remove, AtomicInteger removed_msgs) {
            this.buffer = buffer;
            this.latch = latch;
            this.msgs_to_remove = msgs_to_remove;
            this.removed_msgs = removed_msgs;
            this.setName("Remover");
        }

        public List<Integer> getList() {
            return this.list;
        }

        @Override
        public void run() {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
            while (this.removed_msgs.get() < this.msgs_to_remove) {
                Integer obj = this.buffer.remove();
                if (obj == null) continue;
                this.removed_msgs.incrementAndGet();
                this.list.add(obj);
            }
        }
    }

    static class Adder
    extends Thread {
        final RingBuffer<Integer> buffer;
        final CountDownLatch latch;
        final AtomicInteger added_msgs;
        final AtomicInteger current_seqno;
        final int num_msgs;

        public Adder(RingBuffer<Integer> buffer, CountDownLatch latch, int num_msgs, AtomicInteger added_msgs, AtomicInteger current_seqno) {
            this.buffer = buffer;
            this.latch = latch;
            this.added_msgs = added_msgs;
            this.current_seqno = current_seqno;
            this.num_msgs = num_msgs;
            this.setName("Adder");
        }

        @Override
        public void run() {
            try {
                this.latch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
            while (true) {
                if (this.added_msgs.incrementAndGet() > this.num_msgs) break;
                int seqno = this.current_seqno.getAndIncrement();
                this.buffer.add(seqno);
            }
            this.added_msgs.decrementAndGet();
        }
    }
}

