/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.protocols.SHUFFLE;
import org.jgroups.protocols.pbcast.NAKACK;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"stack-independent"}, sequential=true)
public class SequencerOrderTest {
    private JChannel c1;
    private JChannel c2;
    private JChannel c3;
    private MyReceiver r1;
    private MyReceiver r2;
    private MyReceiver r3;
    static final String GROUP = "SequencerOrderTest";
    static final int NUM_MSGS = 50;
    static final int NUM_THREADS = 10;
    static final int EXPECTED_MSGS = 500;
    static final String props = "sequencer.xml";
    private Sender[] senders = new Sender[10];

    @BeforeMethod
    void setUp() throws Exception {
        this.c1 = new JChannel(props);
        this.c1.setName("A");
        this.c1.connect(GROUP);
        this.r1 = new MyReceiver("A");
        this.c1.setReceiver(this.r1);
        this.c2 = new JChannel(props);
        this.c2.setName("B");
        this.c2.connect(GROUP);
        this.r2 = new MyReceiver("B");
        this.c2.setReceiver(this.r2);
        this.c3 = new JChannel(props);
        this.c3.setName("C");
        this.c3.connect(GROUP);
        this.r3 = new MyReceiver("C");
        this.c3.setReceiver(this.r3);
        AtomicInteger num = new AtomicInteger(1);
        for (int i = 0; i < this.senders.length; ++i) {
            this.senders[i] = new Sender(50, num, this.c1, this.c2, this.c3);
        }
    }

    @AfterMethod
    void tearDown() throws Exception {
        Util.close(this.c3, this.c2, this.c1);
    }

    @Test
    public void testBroadcastSequence() throws Exception {
        SequencerOrderTest.insertShuffle(this.c1, this.c2, this.c3);
        System.out.println("Starting " + this.senders.length + " sender threads (each sends " + 50 + " messages)");
        for (Sender sender : this.senders) {
            sender.start();
        }
        for (Sender sender : this.senders) {
            sender.join(20000L);
        }
        System.out.println("Ok, senders have completed");
        List<String> l1 = this.r1.getMsgs();
        List<String> l2 = this.r2.getMsgs();
        List<String> l3 = this.r3.getMsgs();
        System.out.println("-- verifying messages on A and B");
        SequencerOrderTest.verifyNumberOfMessages(500, l1, l2, l3);
        SequencerOrderTest.verifySameOrder(500, l1, l2, l3);
    }

    private static void insertShuffle(JChannel ... channels) throws Exception {
        for (JChannel ch : channels) {
            SHUFFLE shuffle = new SHUFFLE();
            shuffle.setDown(false);
            shuffle.setUp(true);
            shuffle.setMaxSize(10);
            shuffle.setMaxTime(1000L);
            ch.getProtocolStack().insertProtocol((Protocol)shuffle, 2, (Class<? extends Protocol>)NAKACK.class);
            shuffle.init();
        }
    }

    private static void verifyNumberOfMessages(int num_msgs, List<String> ... lists) throws Exception {
        int i;
        long end_time = System.currentTimeMillis() + 10000L;
        while (System.currentTimeMillis() < end_time) {
            boolean all_correct = true;
            for (List<String> list : lists) {
                if (list.size() == num_msgs) continue;
                all_correct = false;
                break;
            }
            if (all_correct) break;
            Util.sleep(1000L);
        }
        for (i = 0; i < lists.length; ++i) {
            System.out.println("list #" + (i + 1) + ": " + lists[i]);
        }
        for (i = 0; i < lists.length; ++i) {
            assert (lists[i].size() == num_msgs) : "list #" + (i + 1) + " should have " + num_msgs + " elements";
        }
        System.out.println("OK, all lists have the same size (" + num_msgs + ")\n");
    }

    private static void verifySameOrder(int expected_msgs, List<String> ... lists) throws Exception {
        for (int index = 0; index < expected_msgs; ++index) {
            String val = null;
            for (List<String> list : lists) {
                if (val == null) {
                    val = list.get(index);
                    continue;
                }
                String val2 = list.get(index);
                assert (val.equals(val2)) : "found different values at index " + index + ": " + val + " != " + val2;
            }
        }
        System.out.println("OK, all lists have the same order");
    }

    private static class MyReceiver
    extends ReceiverAdapter {
        final String name;
        final List<String> msgs = new LinkedList<String>();

        private MyReceiver(String name) {
            this.name = name;
        }

        public List<String> getMsgs() {
            return this.msgs;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void receive(Message msg) {
            String val = (String)msg.getObject();
            if (val != null) {
                List<String> list = this.msgs;
                synchronized (list) {
                    this.msgs.add(val);
                }
            }
        }
    }

    private static class Sender
    extends Thread {
        final int num_msgs;
        final JChannel[] channels;
        final AtomicInteger num;

        public Sender(int num_msgs, AtomicInteger num, JChannel ... channels) {
            this.num_msgs = num_msgs;
            this.num = num;
            this.channels = channels;
        }

        @Override
        public void run() {
            for (int i = 1; i <= this.num_msgs; ++i) {
                try {
                    JChannel ch = (JChannel)Util.pickRandomElement(this.channels);
                    String channel_name = ch.getName();
                    ch.send(null, null, (Serializable)((Object)(channel_name + ":" + this.num.getAndIncrement())));
                    continue;
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }
}

