package org.jgroups.tests;

import java.lang.Thread;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Message;
import org.jgroups.util.NonBlockingCredit;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/jgroups/tests/NonBlockingCreditTest.class */
public class NonBlockingCreditTest {
    protected static final Address dest;
    protected static final int max_credits = 10000;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/NonBlockingCreditTest$MessageSender.class */
    protected static class MessageSender implements Consumer<Message> {
        protected int sent_msgs;

        protected MessageSender() {
        }

        public int getSentMessages() {
            return this.sent_msgs;
        }

        @Override // java.util.function.Consumer
        public void accept(Message message) {
            this.sent_msgs++;
        }
    }

    public void testDecrement() {
        NonBlockingCredit nonBlockingCredit = new NonBlockingCredit(10000L, 500000, new ReentrantLock(), new MessageSender());
        Message msg = msg(1000);
        boolean decrementIfEnoughCredits = nonBlockingCredit.decrementIfEnoughCredits(msg, msg.getLength(), 500L);
        if (!$assertionsDisabled && (!decrementIfEnoughCredits || nonBlockingCredit.get() != 9000)) {
            throw new AssertionError();
        }
        nonBlockingCredit.increment(2000L, 10000L);
        if ($assertionsDisabled) {
            return;
        }
        if (nonBlockingCredit.isQueuing() || nonBlockingCredit.get() != 10000) {
            throw new AssertionError();
        }
    }

    public void testDecrementAndQueing() {
        MessageSender messageSender = new MessageSender();
        NonBlockingCredit nonBlockingCredit = new NonBlockingCredit(10000L, 500000, new ReentrantLock(), messageSender);
        Message msg = msg(1000);
        boolean decrementIfEnoughCredits = nonBlockingCredit.decrementIfEnoughCredits(msg, msg.getLength(), 500L);
        if (!$assertionsDisabled && (!decrementIfEnoughCredits || nonBlockingCredit.get() != 9000)) {
            throw new AssertionError();
        }
        Message msg2 = msg(9000);
        boolean decrementIfEnoughCredits2 = nonBlockingCredit.decrementIfEnoughCredits(msg2, msg2.getLength(), 500L);
        if (!$assertionsDisabled && (!decrementIfEnoughCredits2 || nonBlockingCredit.get() != 0)) {
            throw new AssertionError();
        }
        for (int i = 0; i < 5; i++) {
            Message msg3 = msg(1000);
            boolean decrementIfEnoughCredits3 = nonBlockingCredit.decrementIfEnoughCredits(msg3, msg3.getLength(), 500L);
            if (!$assertionsDisabled && (decrementIfEnoughCredits3 || nonBlockingCredit.get() != 0)) {
                throw new AssertionError();
            }
        }
        if (!$assertionsDisabled && !nonBlockingCredit.isQueuing()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && nonBlockingCredit.getQueuedMessages() != 5) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && nonBlockingCredit.getQueuedMessageSize() != 5000) {
            throw new AssertionError();
        }
        nonBlockingCredit.increment(500L, 10000L);
        if (!$assertionsDisabled && !nonBlockingCredit.isQueuing()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && (nonBlockingCredit.get() != 500 || nonBlockingCredit.getQueuedMessages() != 5 || nonBlockingCredit.getQueuedMessageSize() != 5000)) {
            throw new AssertionError();
        }
        nonBlockingCredit.increment(500L, 10000L);
        if (!$assertionsDisabled && !nonBlockingCredit.isQueuing()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && (nonBlockingCredit.get() != 0 || nonBlockingCredit.getQueuedMessages() != 4 || nonBlockingCredit.getQueuedMessageSize() != 4000)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && messageSender.sent_msgs != 1) {
            throw new AssertionError();
        }
        nonBlockingCredit.increment(20000L, 10000L);
        if (!$assertionsDisabled && nonBlockingCredit.isQueuing()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && (nonBlockingCredit.get() != 6000 || nonBlockingCredit.getQueuedMessages() != 0 || nonBlockingCredit.getQueuedMessageSize() != 0)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && messageSender.sent_msgs != 5) {
            throw new AssertionError();
        }
    }

    public void testDecrementAndBlockingOnFullQueue() {
        NonBlockingCredit nonBlockingCredit = new NonBlockingCredit(2500L, 1500, new ReentrantLock(), new MessageSender());
        AtomicInteger atomicInteger = new AtomicInteger();
        new Thread(() -> {
            Util.sleep(2000L);
            System.out.printf("[%s] adding 10000 credits\n", Thread.currentThread());
            nonBlockingCredit.increment(10000L, 10000L);
        }).start();
        for (int i = 0; i < 10; i++) {
            Message msg = msg(1000);
            nonBlockingCredit.decrementIfEnoughCredits(msg, msg.getLength(), 2000L);
            atomicInteger.incrementAndGet();
        }
        System.out.printf("received %d msgs", Integer.valueOf(atomicInteger.get()));
        if (!$assertionsDisabled && atomicInteger.get() != 10) {
            throw new AssertionError();
        }
    }

    public void testQueueing() throws TimeoutException {
        NonBlockingCredit nonBlockingCredit = new NonBlockingCredit(10000L, 5000, new ReentrantLock(), null);
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < threadArr.length; i++) {
            Message msg = msg(1000);
            threadArr[i] = new Thread(() -> {
                nonBlockingCredit.decrementIfEnoughCredits(msg, msg.getLength(), 0L);
            });
        }
        Message msg2 = msg(max_credits);
        boolean decrementIfEnoughCredits = nonBlockingCredit.decrementIfEnoughCredits(msg2, msg2.getLength(), 500L);
        if (!$assertionsDisabled && !decrementIfEnoughCredits) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && nonBlockingCredit.isQueuing()) {
            throw new AssertionError();
        }
        for (int i2 = 0; i2 < 5; i2++) {
            if (!$assertionsDisabled && nonBlockingCredit.decrementIfEnoughCredits(msg(1000), 1000, 500L)) {
                throw new AssertionError();
            }
        }
        if (!$assertionsDisabled && !nonBlockingCredit.isQueuing()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && nonBlockingCredit.getQueuedMessages() != 5) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && nonBlockingCredit.getQueuedMessageSize() != 5000) {
            throw new AssertionError();
        }
        for (Thread thread : threadArr) {
            thread.start();
        }
        Util.waitUntil(10000L, 500L, () -> {
            return Arrays.stream(threadArr).allMatch(thread2 -> {
                return thread2.getState() == Thread.State.WAITING;
            });
        }, () -> {
            return "threads:\n" + ((String) Arrays.stream(threadArr).map(thread2 -> {
                long id = thread2.getId();
                thread2.getState();
                return id + ": " + id;
            }).collect(Collectors.joining("\n")));
        });
        System.out.printf("threads:\n%s\n", Arrays.stream(threadArr).map(thread2 -> {
            long id = thread2.getId();
            thread2.getState();
            return id + ": " + id;
        }).collect(Collectors.joining("\n")));
        if (!$assertionsDisabled && (!nonBlockingCredit.isQueuing() || nonBlockingCredit.getQueuedMessages() != 5)) {
            throw new AssertionError();
        }
        nonBlockingCredit.reset();
        Util.waitUntil(10000L, 500L, () -> {
            return Arrays.stream(threadArr).allMatch(thread3 -> {
                return thread3.getState() == Thread.State.TERMINATED;
            });
        }, () -> {
            return "threads:\n" + ((String) Arrays.stream(threadArr).map(thread3 -> {
                long id = thread3.getId();
                thread3.getState();
                return id + ": " + id;
            }).collect(Collectors.joining("\n")));
        });
        System.out.printf("threads:\n%s\n", Arrays.stream(threadArr).map(thread3 -> {
            long id = thread3.getId();
            thread3.getState();
            return id + ": " + id;
        }).collect(Collectors.joining("\n")));
    }

    protected static Message msg(int i) {
        return new BytesMessage(dest, new byte[i]);
    }

    static {
        $assertionsDisabled = !NonBlockingCreditTest.class.desiredAssertionStatus();
        dest = Util.createRandomAddress("A");
    }
}
