package org.jgroups.tests;

import java.io.PrintStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.protocols.FRAG2;
import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.MFC;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.SHARED_LOOPBACK_PING;
import org.jgroups.protocols.UFC;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.util.Util;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/NakackTest.class */
public class NakackTest {
    static final int NUM_PEERS = 3;
    static final int NUM_SENDERS = 2;
    static final int NUM_MSGS = 1000;
    static final int MSGS_PER_STATUS_LINE = 500;
    static final int TOT_MSGS_FOR_ALL_RECEIVERS = 6000;
    static boolean notFIFO = false;
    JChannel[] channels = new JChannel[3];
    Thread[] threads = new Thread[3];
    boolean[] isSender = {false, true, true};
    protected final AtomicInteger received_msgs = new AtomicInteger(0);

    /* loaded from: input_file:org/jgroups/tests/NakackTest$MyReceiver.class */
    protected class MyReceiver implements Receiver {
        final JChannel channel;
        ConcurrentMap<Address, Long> senders = new ConcurrentHashMap();

        public MyReceiver(JChannel jChannel) {
            this.channel = jChannel;
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            Address src = message.getSrc();
            NakackTest.this.received_msgs.incrementAndGet();
            Long l = this.senders.get(src);
            if (l == null) {
                l = 1L;
                this.senders.putIfAbsent(src, 1);
            }
            long longValue = l.longValue();
            try {
                long longValue2 = ((Long) message.getObject()).longValue();
                if (longValue2 == longValue) {
                    this.senders.put(src, Long.valueOf(longValue + 1));
                } else {
                    NakackTest.notFIFO = true;
                    Assert.fail("FAIL: received msg #" + longValue2 + ", expected " + longValue2);
                }
                Address address = this.channel.getAddress();
                if (longValue2 % 500 == 0 && longValue2 > 0) {
                    PrintStream printStream = System.out;
                    printStream.println("<" + address + ">:PASS: received msg #" + longValue2 + " from " + printStream);
                }
            } catch (Exception e) {
                System.err.println(e.toString());
            }
        }
    }

    /* loaded from: input_file:org/jgroups/tests/NakackTest$Sender.class */
    static class Sender extends Thread {
        JChannel ch;
        boolean sender;

        public Sender(JChannel jChannel, boolean z) {
            this.ch = null;
            this.sender = false;
            this.ch = jChannel;
            this.sender = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (this.sender) {
                Address address = this.ch.getAddress();
                for (int i = 1; i <= 1000; i++) {
                    try {
                        this.ch.send(new BytesMessage((Address) null, Long.valueOf(i)).setSrc(address));
                        if (i % NakackTest.MSGS_PER_STATUS_LINE == 0) {
                            System.out.println("<" + address + ">: ==> " + i);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    @BeforeMethod
    protected void setUp() throws Exception {
        for (int i = 0; i < 3; i++) {
            this.channels[i] = createChannel().name(Character.toString((char) (i + 65)));
            this.channels[i].connect("NakackTest");
        }
        Receiver[] receiverArr = new Receiver[3];
        for (int i2 = 0; i2 < 3; i2++) {
            receiverArr[i2] = new MyReceiver(this.channels[i2]);
            this.channels[i2].setReceiver(receiverArr[i2]);
        }
        Util.waitUntilAllChannelsHaveSameView(10000L, 1000L, this.channels);
    }

    @AfterMethod
    void tearDown() throws Exception {
        Util.close(this.channels);
    }

    public void testReceptionOfAllMessages() throws TimeoutException {
        for (int i = 0; i < 3; i++) {
            this.threads[i] = new Sender(this.channels[i], this.isSender[i]);
            this.threads[i].start();
        }
        for (int i2 = 0; i2 < 3; i2++) {
            try {
                this.threads[i2].join();
            } catch (InterruptedException e) {
            }
        }
        for (int i3 = 0; i3 < 20 && this.received_msgs.get() < TOT_MSGS_FOR_ALL_RECEIVERS; i3++) {
            Util.sleep(500L);
        }
        Assert.assertEquals(TOT_MSGS_FOR_ALL_RECEIVERS, this.received_msgs.get(), "Incorrect number of messages received by the receiver thread");
        Assert.assertFalse(notFIFO, "Sequenece numbers for a peer not in correct order");
    }

    protected static JChannel createChannel() throws Exception {
        return new JChannel(new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), new MERGE3().setMinInterval(1000L).setMaxInterval(Global.THREADPOOL_SHUTDOWN_WAIT_TIME), new NAKACK2().useMcastXmit(false), new UNICAST3(), new STABLE().setMaxBytes(50000L), new GMS().printLocalAddress(false), new UFC(), new MFC(), new FRAG2());
    }
}
