package org.jgroups.tests;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.NioMessage;
import org.jgroups.Receiver;
import org.jgroups.RefcountedNioMessage;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL})
/* loaded from: input_file:org/jgroups/tests/RefcountedNioMessageTest.class */
public class RefcountedNioMessageTest {
    protected static final int NUM_MSGS = 50;
    protected static final int NUM_SENDERS = 10;
    protected static final int MSG_SIZE = 100;
    protected static final int POOL_SIZE = 10;
    protected static final boolean DIRECT = false;
    protected static final Collection<Message> MSGS;
    protected final BlockingQueue<ByteBuffer> pool = new ArrayBlockingQueue(10);
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/RefcountedNioMessageTest$Sender.class */
    protected static final class Sender implements Runnable {
        protected final JChannel ch;
        protected final BlockingQueue<ByteBuffer> pool;
        protected final Address dest;

        public Sender(JChannel jChannel, BlockingQueue<ByteBuffer> blockingQueue, Address address) {
            this.ch = jChannel;
            this.pool = blockingQueue;
            this.dest = address;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                _run();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        protected void _run() throws InterruptedException {
            Consumer<Message> consumer = message -> {
                this.pool.add(((NioMessage) message).getBuf());
            };
            for (int i = 0; i < RefcountedNioMessageTest.NUM_MSGS; i++) {
                RefcountedNioMessage onRelease = new RefcountedNioMessage(this.dest, this.pool.take()).onRelease(consumer);
                RefcountedNioMessageTest.MSGS.add(onRelease);
                try {
                    this.ch.send(onRelease);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @BeforeMethod
    protected void setup() {
        MSGS.clear();
    }

    @AfterMethod
    protected void cleanup() {
        MSGS.clear();
        this.pool.clear();
    }

    public void testSimpleCreation() {
        RefcountedNioMessage refcountedNioMessage = new RefcountedNioMessage(null, ByteBuffer.wrap(bla3.HELLO.getBytes()));
        if (!$assertionsDisabled && refcountedNioMessage.getRefcount() != 0) {
            throw new AssertionError();
        }
        refcountedNioMessage.incr2().incr2();
        if (!$assertionsDisabled && refcountedNioMessage.getRefcount() != 2) {
            throw new AssertionError();
        }
        AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(atomicReference);
        refcountedNioMessage.onRelease((v1) -> {
            r1.set(v1);
        });
        if (!$assertionsDisabled && atomicReference.get() != null) {
            throw new AssertionError();
        }
        refcountedNioMessage.decr2().decr2();
        if (!$assertionsDisabled && atomicReference.get() != refcountedNioMessage) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && refcountedNioMessage.getRefcount() != 0) {
            throw new AssertionError();
        }
        refcountedNioMessage.decr2();
        if (!$assertionsDisabled && refcountedNioMessage.getRefcount() != 0) {
            throw new AssertionError();
        }
    }

    public void testUnicastRefcounting() throws Exception {
        JChannel name = new JChannel(Util.getTestStack(new Protocol[0])).name("A");
        try {
            JChannel name2 = new JChannel(Util.getTestStack(new Protocol[0])).name("B");
            try {
                ((UNICAST3) name2.getProtocolStack().findProtocol(UNICAST3.class)).setAckThreshold(1);
                name.connect("testUnicastRefcounting");
                name2.connect("testUnicastRefcounting");
                Util.waitUntilAllChannelsHaveSameView(10000L, 500L, name, name2);
                Address address = name2.getAddress();
                for (int i = 0; i < 10; i++) {
                    this.pool.put(ByteBuffer.allocate(100));
                }
                final AtomicInteger atomicInteger = new AtomicInteger();
                name2.setReceiver(new Receiver() { // from class: org.jgroups.tests.RefcountedNioMessageTest.1
                    @Override // org.jgroups.Receiver
                    public void receive(Message message) {
                        atomicInteger.incrementAndGet();
                    }

                    @Override // org.jgroups.Receiver
                    public void receive(MessageBatch messageBatch) {
                        atomicInteger.addAndGet(messageBatch.size());
                    }
                });
                Sender[] senderArr = new Sender[10];
                for (int i2 = 0; i2 < senderArr.length; i2++) {
                    senderArr[i2] = new Sender(name, this.pool, address);
                    new Thread(senderArr[i2], "sender-" + (i2 + i2)).start();
                }
                Util.waitUntil(30000L, 500L, () -> {
                    return atomicInteger.get() == 500;
                }, () -> {
                    return String.format("received=%d (expected=%d)", Integer.valueOf(atomicInteger.get()), 500);
                });
                if (!$assertionsDisabled && MSGS.size() != 500) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && !MSGS.stream().allMatch(message -> {
                    return message instanceof RefcountedNioMessage;
                })) {
                    throw new AssertionError();
                }
                Util.waitUntil(10000L, 500L, () -> {
                    return MSGS.stream().allMatch(message2 -> {
                        return ((RefcountedNioMessage) message2).getRefcount() == 0;
                    });
                });
                if (!$assertionsDisabled && !MSGS.stream().allMatch(message2 -> {
                    return ((RefcountedNioMessage) message2).getRefcount() == 0;
                })) {
                    throw new AssertionError();
                }
                System.out.printf("\n*** pool size: %d, %d msgs\n", Integer.valueOf(this.pool.size()), Integer.valueOf(MSGS.size()));
                if (!$assertionsDisabled && this.pool.size() != 10) {
                    throw new AssertionError();
                }
                if (name2 != null) {
                    name2.close();
                }
                if (name != null) {
                    name.close();
                }
            } catch (Throwable th) {
                if (name2 != null) {
                    try {
                        name2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (name != null) {
                try {
                    name.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    static {
        $assertionsDisabled = !RefcountedNioMessageTest.class.desiredAssertionStatus();
        MSGS = new ConcurrentLinkedQueue();
    }
}
