package org.jgroups.protocols;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UNICAST;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, sequential = true)
/* loaded from: input_file:WEB-INF/lib/jgroups-2.10.0.Alpha2.jar:org/jgroups/protocols/UNICAST_StressTest.class */
public class UNICAST_StressTest {
    static final int NUM_MSGS = 1000000;
    static final int NUM_THREADS = 50;
    static final int MAX_MSG_BATCH_SIZE = 50000;
    static final short UNICAST_ID;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/jgroups-2.10.0.Alpha2.jar:org/jgroups/protocols/UNICAST_StressTest$Adder.class */
    public static class Adder extends Thread {
        final UNICAST unicast;
        final CountDownLatch latch;
        final AtomicInteger num_msgs;
        final AtomicLong current_seqno;
        final boolean oob;
        final Address dest;
        final Address sender;

        public Adder(UNICAST unicast, CountDownLatch countDownLatch, AtomicInteger atomicInteger, AtomicLong atomicLong, boolean z, Address address, Address address2) {
            this.unicast = unicast;
            this.latch = countDownLatch;
            this.num_msgs = atomicInteger;
            this.current_seqno = atomicLong;
            this.oob = z;
            this.dest = address;
            this.sender = address2;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.latch.await();
                while (this.num_msgs.getAndDecrement() > 0) {
                    this.unicast.up(new Event(1, UNICAST_StressTest.createMessage(this.dest, this.sender, this.current_seqno.getAndIncrement(), this.oob, false)));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Test
    public static void stressTest() {
        start(50, 1000000, false, MAX_MSG_BATCH_SIZE);
    }

    @Test
    public static void stressTestOOB() {
        start(50, 1000000, true, MAX_MSG_BATCH_SIZE);
    }

    private static void start(int i, final int i2, boolean z, int i3) {
        UNICAST unicast = new UNICAST();
        AtomicInteger atomicInteger = new AtomicInteger(i2);
        AtomicLong atomicLong = new AtomicLong(1L);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final ReentrantLock reentrantLock = new ReentrantLock();
        final Condition newCondition = reentrantLock.newCondition();
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Address createRandomAddress = Util.createRandomAddress();
        Address createRandomAddress2 = Util.createRandomAddress();
        unicast.setDownProtocol(new Protocol() { // from class: org.jgroups.protocols.UNICAST_StressTest.1
            @Override // org.jgroups.stack.Protocol
            public Object down(Event event) {
                return null;
            }
        });
        unicast.setUpProtocol(new Protocol() { // from class: org.jgroups.protocols.UNICAST_StressTest.2
            @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
            public Object up(Event event) {
                if (event.getType() != 1) {
                    return null;
                }
                atomicInteger2.incrementAndGet();
                UNICAST.UnicastHeader unicastHeader = (UNICAST.UnicastHeader) ((Message) event.getArg()).getHeader(UNICAST_StressTest.UNICAST_ID);
                if (unicastHeader != null) {
                    concurrentLinkedQueue.add(Long.valueOf(unicastHeader.getSeqno()));
                }
                if (atomicInteger2.get() < i2) {
                    return null;
                }
                reentrantLock.lock();
                try {
                    newCondition.signalAll();
                    reentrantLock.unlock();
                    return null;
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }
        });
        unicast.down(new Event(8, createRandomAddress));
        unicast.setMaxMessageBatchSize(i3);
        unicast.up(new Event(1, createMessage(createRandomAddress, createRandomAddress2, 1L, z, true)));
        Util.sleep(500L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Adder[] adderArr = new Adder[i];
        for (int i4 = 0; i4 < adderArr.length; i4++) {
            adderArr[i4] = new Adder(unicast, countDownLatch, atomicInteger, atomicLong, z, createRandomAddress, createRandomAddress2);
            adderArr[i4].start();
        }
        long currentTimeMillis = System.currentTimeMillis();
        countDownLatch.countDown();
        reentrantLock.lock();
        while (atomicInteger2.get() < i2) {
            try {
                try {
                    newCondition.await(1000L, TimeUnit.MILLISECONDS);
                    System.out.println("received " + atomicInteger2.get() + " msgs");
                    unicast.up(new Event(1, createMessage(createRandomAddress, createRandomAddress2, 1L, z, false)));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        System.out.println("\nTime: " + currentTimeMillis2 + " ms, " + Util.format(i2 / (currentTimeMillis2 / 1000.0d)) + " requests / sec\n");
        System.out.println("Delivered messages: " + concurrentLinkedQueue.size());
        if (concurrentLinkedQueue.size() < 100) {
            System.out.println("Elements: " + concurrentLinkedQueue);
        }
        ArrayList<Long> arrayList = new ArrayList(concurrentLinkedQueue);
        if (z) {
            Collections.sort(arrayList);
        }
        if (!$assertionsDisabled && arrayList.size() != i2) {
            throw new AssertionError("expected " + i2 + ", but got " + arrayList.size());
        }
        System.out.println("Checking results consistency");
        int i5 = 1;
        for (Long l : arrayList) {
            if (l.longValue() != i5) {
                if (!$assertionsDisabled && i5 != l.longValue()) {
                    throw new AssertionError("expected " + i5 + " but got " + l);
                }
                return;
            }
            i5++;
        }
        System.out.println("OK");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Message createMessage(Address address, Address address2, long j, boolean z, boolean z2) {
        Message message = new Message(address, address2, "hello world");
        message.putHeader(UNICAST_ID, UNICAST.UnicastHeader.createDataHeader(j, (short) 1, z2));
        if (z) {
            message.setFlag((byte) 1);
        }
        return message;
    }

    @Test(enabled = false)
    public static void main(String[] strArr) {
        int i;
        int i2 = 10;
        int i3 = 1000000;
        int i4 = 20000;
        boolean z = false;
        int i5 = 0;
        while (i5 < strArr.length) {
            if (strArr[i5].equals("-num_threads")) {
                i = i5 + 1;
                i2 = Integer.parseInt(strArr[i]);
            } else if (strArr[i5].equals("-num_msgs")) {
                i = i5 + 1;
                i3 = Integer.parseInt(strArr[i]);
            } else if (strArr[i5].equals("-oob")) {
                i = i5 + 1;
                z = Boolean.parseBoolean(strArr[i]);
            } else if (!strArr[i5].equals("-max")) {
                System.out.println("UNICAST_StressTest [-num_msgs msgs] [-num_threads threads] [-oob <true | false>] + [-max <batch size>]");
                return;
            } else {
                i = i5 + 1;
                i4 = Integer.parseInt(strArr[i]);
            }
            i5 = i + 1;
        }
        start(i2, i3, z, i4);
    }

    static {
        $assertionsDisabled = !UNICAST_StressTest.class.desiredAssertionStatus();
        UNICAST_ID = ClassConfigurator.getProtocolId(UNICAST.class);
    }
}
