package org.jgroups.protocols;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.NakAckHeader2;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/protocols/NAKACK_StressTest.class */
public class NAKACK_StressTest {
    static final int NUM_MSGS = 1000000;
    static final int NUM_THREADS = 50;
    static final short NAKACK_ID;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jgroups/protocols/NAKACK_StressTest$Sender.class */
    public static class Sender extends Thread {
        final NAKACK2 nak;
        final CountDownLatch latch;
        final AtomicInteger num_msgs;
        final AtomicLong current_seqno;
        final boolean oob;
        final Address sender;

        public Sender(NAKACK2 nakack2, CountDownLatch countDownLatch, AtomicInteger atomicInteger, AtomicLong atomicLong, boolean z, Address address) {
            this.nak = nakack2;
            this.latch = countDownLatch;
            this.num_msgs = atomicInteger;
            this.current_seqno = atomicLong;
            this.oob = z;
            this.sender = address;
            setName("Adder");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.latch.await();
                while (this.num_msgs.getAndDecrement() > 0) {
                    this.nak.up(NAKACK_StressTest.createMessage(null, this.sender, this.current_seqno.getAndIncrement(), this.oob));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void testStress() {
        start(NUM_THREADS, NUM_MSGS, false);
    }

    public static void testStressOOB() {
        start(NUM_THREADS, NUM_MSGS, true);
    }

    private static void start(int i, final int i2, boolean z) {
        NAKACK2 nakack2 = new NAKACK2();
        AtomicInteger atomicInteger = new AtomicInteger(i2);
        AtomicLong atomicLong = new AtomicLong(1L);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final ReentrantLock reentrantLock = new ReentrantLock();
        final Condition newCondition = reentrantLock.newCondition();
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Address createRandomAddress = Util.createRandomAddress("A");
        Address createRandomAddress2 = Util.createRandomAddress("B");
        nakack2.setDownProtocol(new Protocol() { // from class: org.jgroups.protocols.NAKACK_StressTest.1
            @Override // org.jgroups.stack.Protocol
            public Object down(Event event) {
                return null;
            }
        });
        nakack2.setUpProtocol(new Protocol() { // from class: org.jgroups.protocols.NAKACK_StressTest.2
            @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
            public Object up(Message message) {
                atomicInteger2.incrementAndGet();
                NakAckHeader2 nakAckHeader2 = (NakAckHeader2) message.getHeader(NAKACK_StressTest.NAKACK_ID);
                if (nakAckHeader2 != null) {
                    concurrentLinkedQueue.add(Long.valueOf(nakAckHeader2.getSeqno()));
                }
                if (atomicInteger2.get() < i2) {
                    return null;
                }
                reentrantLock.lock();
                try {
                    newCondition.signalAll();
                    return null;
                } finally {
                    reentrantLock.unlock();
                }
            }

            @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
            public void up(MessageBatch messageBatch) {
                Iterator<Message> it = messageBatch.iterator();
                while (it.hasNext()) {
                    Message next = it.next();
                    atomicInteger2.incrementAndGet();
                    NakAckHeader2 nakAckHeader2 = (NakAckHeader2) next.getHeader(NAKACK_StressTest.NAKACK_ID);
                    if (nakAckHeader2 != null) {
                        concurrentLinkedQueue.add(Long.valueOf(nakAckHeader2.getSeqno()));
                    }
                    if (atomicInteger2.get() >= i2) {
                        reentrantLock.lock();
                        try {
                            newCondition.signalAll();
                            reentrantLock.unlock();
                        } catch (Throwable th) {
                            reentrantLock.unlock();
                            throw th;
                        }
                    }
                }
            }
        });
        nakack2.setDiscardDeliveredMsgs(true);
        Protocol protocol = nakack2;
        while (true) {
            Protocol protocol2 = protocol;
            if (protocol2 == null) {
                break;
            }
            protocol2.setAddress(createRandomAddress);
            protocol = protocol2.getDownProtocol();
        }
        nakack2.down(new Event(16));
        View create = View.create(createRandomAddress, 1L, createRandomAddress, createRandomAddress2);
        nakack2.down(new Event(6, create));
        MutableDigest mutableDigest = new MutableDigest(create.getMembersRaw());
        mutableDigest.set(createRandomAddress, 0L, 0L);
        mutableDigest.set(createRandomAddress2, 0L, 0L);
        nakack2.down(new Event(41, mutableDigest));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Sender[] senderArr = new Sender[i];
        for (int i3 = 0; i3 < senderArr.length; i3++) {
            senderArr[i3] = new Sender(nakack2, countDownLatch, atomicInteger, atomicLong, z, createRandomAddress2);
            senderArr[i3].start();
        }
        long currentTimeMillis = System.currentTimeMillis();
        countDownLatch.countDown();
        int i4 = 30;
        reentrantLock.lock();
        while (atomicInteger2.get() < i2) {
            try {
                int i5 = i4;
                i4--;
                if (i5 <= 0) {
                    break;
                }
                try {
                    newCondition.await(1000L, TimeUnit.MILLISECONDS);
                    System.out.println("received " + atomicInteger2.get() + " msgs");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        System.out.printf("\nTime: %d ms, %.2f requests / sec\n", Long.valueOf(currentTimeMillis2), Double.valueOf(i2 / (currentTimeMillis2 / 1000.0d)));
        System.out.println("Delivered messages: " + concurrentLinkedQueue.size());
        if (concurrentLinkedQueue.size() < 100) {
            System.out.println("Elements: " + concurrentLinkedQueue);
        }
        nakack2.stop();
        ArrayList<Long> arrayList = new ArrayList(concurrentLinkedQueue);
        if (z) {
            Collections.sort(arrayList);
        }
        if (!$assertionsDisabled && arrayList.size() != i2) {
            throw new AssertionError("expected " + i2 + ", but got " + arrayList.size());
        }
        System.out.println("Checking results consistency");
        int i6 = 1;
        for (Long l : arrayList) {
            if (l.longValue() != i6) {
                if (!$assertionsDisabled && i6 != l.longValue()) {
                    throw new AssertionError("expected " + i6 + " but got " + l);
                }
                return;
            }
            i6++;
        }
        System.out.println("OK");
    }

    private static Message createMessage(Address address, Address address2, long j, boolean z) {
        Message src = new BytesMessage(address, "hello world").setSrc(address2);
        src.putHeader(NAKACK_ID, NakAckHeader2.createMessageHeader(j));
        if (z) {
            src.setFlag(Message.Flag.OOB);
        }
        return src;
    }

    static {
        $assertionsDisabled = !NAKACK_StressTest.class.desiredAssertionStatus();
        NAKACK_ID = ClassConfigurator.getProtocolId(NAKACK2.class);
    }
}
