package org.jgroups.tests;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.EmptyMessage;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/MessageBatchDrainTest2.class */
public class MessageBatchDrainTest2 {
    protected final Lock lock = new ReentrantLock();
    protected final MessageBatch batch = new MessageBatch(BATCH_SIZE);
    protected final AtomicInteger adders = new AtomicInteger(0);
    protected final LongAdder added = new LongAdder();
    protected final LongAdder removed = new LongAdder();
    protected final LongAdder num_removers = new LongAdder();
    protected final AverageMinMax avg_removed = new AverageMinMax();
    protected final AverageMinMax avg_remove_loops = new AverageMinMax();
    protected static final boolean RESIZE = false;
    protected static final int BATCH_SIZE = 500;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/MessageBatchDrainTest2$MyThread.class */
    protected class MyThread extends Thread {
        protected final CountDownLatch latch;
        protected volatile boolean running = true;

        public MyThread(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        protected void cancel() {
            this.running = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while (this.running) {
                if (Util.tossWeightedCoin(0.3d)) {
                    MessageBatchDrainTest2.this.add(new EmptyMessage());
                } else {
                    MessageBatchDrainTest2.this.add(new MessageBatch(Arrays.asList(MessageBatchDrainTest2.create(10))));
                }
            }
        }
    }

    public void testDraining() throws InterruptedException {
        MyThread[] myThreadArr = new MyThread[10];
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < myThreadArr.length; i++) {
            myThreadArr[i] = new MyThread(countDownLatch);
            myThreadArr[i].start();
        }
        countDownLatch.countDown();
        Util.sleep(5000L);
        System.out.print("\nStopping threads\n");
        for (MyThread myThread : myThreadArr) {
            myThread.cancel();
        }
        System.out.print("done, joining threads\n");
        for (MyThread myThread2 : myThreadArr) {
            myThread2.join();
        }
        System.out.printf("\ncounter=%d, added=%d, removed=%d, avg_removed=%s, avg_remove_loops=%s (removers=%d)\n", Integer.valueOf(this.adders.get()), Long.valueOf(this.added.sum()), Long.valueOf(this.removed.sum()), this.avg_removed, this.avg_remove_loops, Long.valueOf(this.num_removers.sum()));
        if (!$assertionsDisabled && this.added.sum() != this.removed.sum()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.adders.get() != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !this.batch.isEmpty()) {
            throw new AssertionError();
        }
    }

    protected void add(Message message) {
        int _add = _add(message);
        if (_add > 0) {
            this.added.add(_add);
            drain(_add);
        }
    }

    protected void add(MessageBatch messageBatch) {
        int _add = _add(messageBatch);
        if (_add > 0) {
            this.added.add(_add);
            drain(_add);
        }
    }

    protected void drain(int i) {
        if (this.adders.getAndIncrement() == 0) {
            this.num_removers.increment();
            int i2 = 0;
            int i3 = 0;
            MessageBatch messageBatch = new MessageBatch(i);
            do {
                messageBatch.reset();
                int _transfer = _transfer(messageBatch);
                if (_transfer > 0) {
                    i3 += _transfer;
                    this.removed.add(_transfer);
                }
                i2++;
            } while (this.adders.decrementAndGet() != 0);
            this.avg_remove_loops.add(i2);
            this.avg_removed.add(i3);
        }
    }

    protected int _transfer(MessageBatch messageBatch) {
        this.lock.lock();
        try {
            return messageBatch.transferFrom(this.batch, true);
        } finally {
            this.lock.unlock();
        }
    }

    protected int _add(Message message) {
        this.lock.lock();
        try {
            return this.batch.add(message, false);
        } finally {
            this.lock.unlock();
        }
    }

    protected int _add(MessageBatch messageBatch) {
        this.lock.lock();
        try {
            return this.batch.add(messageBatch, false);
        } finally {
            this.lock.unlock();
        }
    }

    protected int _clear() {
        this.lock.lock();
        try {
            int size = this.batch.size();
            this.batch.clear();
            return size;
        } finally {
            this.lock.unlock();
        }
    }

    protected static Message[] create(int i) {
        Message[] messageArr = new Message[(int) Util.random(i)];
        for (int i2 = 0; i2 < messageArr.length; i2++) {
            messageArr[i2] = new EmptyMessage();
        }
        return messageArr;
    }

    static {
        $assertionsDisabled = !MessageBatchDrainTest2.class.desiredAssertionStatus();
    }
}
