package org.jgroups.tests;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import net.sf.webdav.WebdavStatus;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.protocols.pbcast.NakAckHeader;
import org.jgroups.stack.NakReceiverWindow;
import org.jgroups.stack.Retransmitter;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, sequential = true)
/* loaded from: input_file:WEB-INF/lib/jgroups-2.10.0.Beta2.jar:org/jgroups/tests/NakReceiverWindowTest2.class */
public class NakReceiverWindowTest2 {
    static final int NUM_THREADS = 200;
    static final int NUM_MSGS = 5000;
    static final short NAKACK_ID = 100;
    final Address self = Util.createRandomAddress();
    final Address sender = Util.createRandomAddress();
    final CyclicBarrier barrier = new CyclicBarrier(WebdavStatus.SC_CREATED);
    NakReceiverWindow win;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:WEB-INF/lib/jgroups-2.10.0.Beta2.jar:org/jgroups/tests/NakReceiverWindowTest2$RandomSender.class */
    static class RandomSender extends Sender {
        public RandomSender(int i, NakReceiverWindow nakReceiverWindow, Address address, CyclicBarrier cyclicBarrier, ConcurrentMap<Long, AtomicInteger> concurrentMap) {
            super(i, nakReceiverWindow, address, cyclicBarrier, concurrentMap);
        }

        @Override // org.jgroups.tests.NakReceiverWindowTest2.Sender, java.lang.Thread, java.lang.Runnable
        public void run() {
            ArrayList arrayList = new ArrayList(this.num);
            long j = 1;
            while (true) {
                long j2 = j;
                if (j2 > this.num) {
                    break;
                }
                arrayList.add(Long.valueOf(j2));
                j = j2 + 1;
            }
            Collections.shuffle(arrayList);
            waitForBarrier();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                add(((Long) it.next()).longValue());
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jgroups-2.10.0.Beta2.jar:org/jgroups/tests/NakReceiverWindowTest2$SameSeqnoSender.class */
    static class SameSeqnoSender extends Sender {
        public SameSeqnoSender(int i, NakReceiverWindow nakReceiverWindow, Address address, CyclicBarrier cyclicBarrier, ConcurrentMap<Long, AtomicInteger> concurrentMap) {
            super(i, nakReceiverWindow, address, cyclicBarrier, concurrentMap);
        }

        @Override // org.jgroups.tests.NakReceiverWindowTest2.Sender, java.lang.Thread, java.lang.Runnable
        public void run() {
            waitForBarrier();
            for (int i = 1; i <= this.num; i++) {
                add(1L);
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jgroups-2.10.0.Beta2.jar:org/jgroups/tests/NakReceiverWindowTest2$Sender.class */
    static class Sender extends Thread {
        final int num;
        final NakReceiverWindow win;
        final Address sender;
        final CyclicBarrier barrier;
        final ConcurrentMap<Long, AtomicInteger> map;

        public Sender(int i, NakReceiverWindow nakReceiverWindow, Address address, CyclicBarrier cyclicBarrier, ConcurrentMap<Long, AtomicInteger> concurrentMap) {
            this.num = i;
            this.win = nakReceiverWindow;
            this.sender = address;
            this.barrier = cyclicBarrier;
            this.map = concurrentMap;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            waitForBarrier();
            for (int i = 1; i <= this.num; i++) {
                add(i);
            }
        }

        protected void add(long j) {
            NakAckHeader createMessageHeader = NakAckHeader.createMessageHeader(j);
            Message message = new Message((Address) null, this.sender, "hello");
            message.putHeader((short) 100, createMessageHeader);
            if (this.win.add(j, message)) {
                this.map.get(Long.valueOf(j)).incrementAndGet();
            }
        }

        protected void waitForBarrier() {
            try {
                this.barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @BeforeMethod
    void init() {
        this.win = new NakReceiverWindow(this.self, new Retransmitter.RetransmitCommand() { // from class: org.jgroups.tests.NakReceiverWindowTest2.1
            @Override // org.jgroups.stack.Retransmitter.RetransmitCommand
            public void retransmit(long j, long j2, Address address) {
            }
        }, 0L, 0L, new TimeScheduler(2));
    }

    @AfterMethod
    void cleanup() {
        this.win.reset();
    }

    @Test(invocationCount = 10)
    public void testConcurrentInsertions() throws BrokenBarrierException, InterruptedException {
        Sender[] senderArr = new Sender[200];
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (int i = 1; i <= 5000; i++) {
            concurrentHashMap.put(Long.valueOf(i), new AtomicInteger(0));
        }
        for (int i2 = 0; i2 < senderArr.length; i2++) {
            senderArr[i2] = new Sender(5000, this.win, this.sender, this.barrier, concurrentHashMap);
            senderArr[i2].start();
        }
        Util.sleep(2000L);
        System.out.println("Concurrently inserting 5000 messages with 200 threads");
        this.barrier.await();
        for (Sender sender : senderArr) {
            sender.join(20000L);
        }
        System.out.println("OK: 5000 were added to the NakReceiverWindow concurrently by 200 threads");
        Set keySet = concurrentHashMap.keySet();
        System.out.println("checking for missing or duplicate seqnos in " + keySet.size() + " seqnos:");
        for (int i3 = 1; i3 <= 5000; i3++) {
            AtomicInteger atomicInteger = (AtomicInteger) concurrentHashMap.get(Long.valueOf(i3));
            if (atomicInteger.get() != 1) {
                System.err.println(i3 + " was not added exactly once (successful insertions=" + atomicInteger.get() + ")");
            }
        }
        for (int i4 = 1; i4 <= 5000; i4++) {
            AtomicInteger atomicInteger2 = (AtomicInteger) concurrentHashMap.get(Long.valueOf(i4));
            if (!$assertionsDisabled && atomicInteger2 == null) {
                throw new AssertionError(i4 + " is missing in " + concurrentHashMap.keySet());
            }
            if (!$assertionsDisabled && atomicInteger2.get() != 1) {
                throw new AssertionError(i4 + " was not added exactly once (successful insertions=" + atomicInteger2.get() + ")");
            }
        }
        System.out.println("OK: " + keySet.size() + " seqnos were added exactly once");
    }

    @Test(invocationCount = 5)
    public void testConcurrentRandomInsertions() throws BrokenBarrierException, InterruptedException {
        RandomSender[] randomSenderArr = new RandomSender[200];
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (int i = 1; i <= 5000; i++) {
            concurrentHashMap.put(Long.valueOf(i), new AtomicInteger(0));
        }
        for (int i2 = 0; i2 < randomSenderArr.length; i2++) {
            randomSenderArr[i2] = new RandomSender(5000, this.win, this.sender, this.barrier, concurrentHashMap);
            randomSenderArr[i2].start();
        }
        Util.sleep(2000L);
        System.out.println("Concurrently inserting 5000 messages with 200 threads");
        this.barrier.await();
        for (RandomSender randomSender : randomSenderArr) {
            randomSender.join(20000L);
        }
        System.out.println("OK: 5000 were added to the NakReceiverWindow concurrently by 200 threads");
        Set keySet = concurrentHashMap.keySet();
        System.out.println("checking for missing or duplicate seqnos in " + keySet.size() + " seqnos:");
        for (int i3 = 1; i3 <= 5000; i3++) {
            AtomicInteger atomicInteger = (AtomicInteger) concurrentHashMap.get(Long.valueOf(i3));
            if (atomicInteger.get() != 1) {
                System.err.println(i3 + " was not added exactly once (successful insertions=" + atomicInteger.get() + ")");
            }
        }
        for (int i4 = 1; i4 <= 5000; i4++) {
            AtomicInteger atomicInteger2 = (AtomicInteger) concurrentHashMap.get(Long.valueOf(i4));
            if (!$assertionsDisabled && atomicInteger2 == null) {
                throw new AssertionError(i4 + " is missing in " + concurrentHashMap.keySet());
            }
            if (!$assertionsDisabled && atomicInteger2.get() != 1) {
                throw new AssertionError(i4 + " was not added exactly once (successful insertions=" + atomicInteger2.get() + ")");
            }
        }
        System.out.println("OK: " + keySet.size() + " seqnos were added exactly once");
    }

    @Test(invocationCount = 5)
    public void testConcurrentInsertionOfSameSeqno() throws BrokenBarrierException, InterruptedException {
        SameSeqnoSender[] sameSeqnoSenderArr = new SameSeqnoSender[200];
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (int i = 1; i <= 5000; i++) {
            concurrentHashMap.put(Long.valueOf(i), new AtomicInteger(0));
        }
        for (int i2 = 0; i2 < sameSeqnoSenderArr.length; i2++) {
            sameSeqnoSenderArr[i2] = new SameSeqnoSender(5000, this.win, this.sender, this.barrier, concurrentHashMap);
            sameSeqnoSenderArr[i2].start();
        }
        Util.sleep(2000L);
        System.out.println("Concurrently inserting 1 message with 200 threads");
        this.barrier.await();
        for (SameSeqnoSender sameSeqnoSender : sameSeqnoSenderArr) {
            sameSeqnoSender.join(20000L);
        }
        System.out.println("OK: 1 message was added to the NakReceiverWindow concurrently by 200 threads");
        System.out.println("checking for missing or duplicate seqnos in " + concurrentHashMap.keySet().size() + " seqnos:");
        AtomicInteger atomicInteger = (AtomicInteger) concurrentHashMap.get(1L);
        if (atomicInteger.get() != 1) {
            System.err.println("1 was not added exactly once (successful insertions=" + atomicInteger.get() + ")");
        }
        if (!$assertionsDisabled && atomicInteger.get() != 1) {
            throw new AssertionError("1 was not added exactly once (successful insertions=" + atomicInteger.get() + ")");
        }
        System.out.println("OK: 1 seqno was added exactly once");
    }

    static {
        $assertionsDisabled = !NakReceiverWindowTest2.class.desiredAssertionStatus();
    }
}
