package org.jgroups.tests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Message;
import org.jgroups.PhysicalAddress;
import org.jgroups.protocols.RingBufferBundler;
import org.jgroups.protocols.TP;
import org.jgroups.util.AsciiString;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.RingBuffer;
import org.jgroups.util.Util;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/RingBundlerTest.class */
public class RingBundlerTest {
    protected static final Address a;
    protected static final Address b;
    protected static final Address c;
    protected static final Address d;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jgroups/tests/RingBundlerTest$MockTransport.class */
    public static class MockTransport extends TP {
        protected final Map<Address, Integer> map = new HashMap();

        public MockTransport() {
            this.cluster_name = new AsciiString("mock");
            this.thread_factory = new DefaultThreadFactory("", false);
        }

        @Override // org.jgroups.protocols.TP
        public boolean supportsMulticasting() {
            return false;
        }

        @Override // org.jgroups.protocols.TP
        public void sendToAll(byte[] bArr, int i, int i2) throws Exception {
            incrCount(null);
        }

        @Override // org.jgroups.protocols.TP
        protected void sendTo(Address address, byte[] bArr, int i, int i2) throws Exception {
            incrCount(address);
        }

        @Override // org.jgroups.protocols.TP
        public void sendUnicast(PhysicalAddress physicalAddress, byte[] bArr, int i, int i2) throws Exception {
        }

        @Override // org.jgroups.protocols.TP
        public String getInfo() {
            return null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.jgroups.protocols.TP
        public PhysicalAddress getPhysicalAddress() {
            return null;
        }

        protected void incrCount(Address address) {
            this.map.merge(address, 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
        }
    }

    public void testReceiveAndSend() throws Exception {
        RingBufferBundler ringBufferBundler = new RingBufferBundler(16);
        RingBuffer<Message> buf = ringBufferBundler.buf();
        MockTransport mockTransport = new MockTransport();
        ringBufferBundler.init(mockTransport);
        for (int i = 0; i < 6; i++) {
            ringBufferBundler.send(new EmptyMessage(null));
        }
        System.out.println("rb = " + buf);
        int countLockLockless = buf.countLockLockless();
        if (!$assertionsDisabled && countLockLockless != 6) {
            throw new AssertionError();
        }
        ringBufferBundler.sendBundledMessages(buf.buf(), buf.readIndexLockless(), countLockLockless);
        buf.publishReadIndex(countLockLockless);
        System.out.println("rb = " + buf);
        if (!$assertionsDisabled && buf.readIndex() != 6) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && buf.writeIndex() != 6) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !buf.isEmpty()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && mockTransport.map.get(null).intValue() != 1) {
            throw new AssertionError();
        }
        mockTransport.map.clear();
        Iterator<Message> it = create(10000, null, a, a, a, b, c, d, d, a, null, null, a).iterator();
        while (it.hasNext()) {
            ringBufferBundler.send(it.next());
        }
        System.out.println("rb = " + buf);
        int countLockLockless2 = buf.countLockLockless();
        if (!$assertionsDisabled && countLockLockless2 != 12) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && buf.readIndex() != 6) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && buf.writeIndex() != 2) {
            throw new AssertionError();
        }
        ringBufferBundler.sendBundledMessages(buf.buf(), buf.readIndexLockless(), buf.countLockLockless());
        buf.publishReadIndex(countLockLockless2);
        if (!$assertionsDisabled && buf.readIndex() != 2) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && buf.writeIndex() != 2) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !buf.isEmpty()) {
            throw new AssertionError();
        }
        Stream.of((Object[]) new Address[]{null, a, b, c, d}).forEach(address -> {
            if (!$assertionsDisabled && mockTransport.map.get(address).intValue() != 1) {
                throw new AssertionError();
            }
        });
    }

    public void testFullBufferAndRead() throws Exception {
        RingBufferBundler ringBufferBundler = new RingBufferBundler(16);
        RingBuffer<Message> buf = ringBufferBundler.buf();
        ringBufferBundler.init(new MockTransport());
        ringBufferBundler.stop();
        for (int i = 0; i < 16; i++) {
            ringBufferBundler.send(new EmptyMessage(a));
        }
        System.out.println("rb = " + buf);
        Thread[] threadArr = new Thread[16];
        for (int i2 = 0; i2 < 16; i2++) {
            threadArr[i2] = new Thread(() -> {
                try {
                    ringBufferBundler.send(new EmptyMessage(b));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "Adder-" + i2);
            threadArr[i2].start();
        }
        int waitForMessages = buf.waitForMessages(5, (num, num2) -> {
            LockSupport.park();
        });
        System.out.println("available = " + waitForMessages);
        if (!$assertionsDisabled && waitForMessages != 16) {
            throw new AssertionError();
        }
        buf.publishReadIndex(waitForMessages);
        while (true) {
            if (buf.isEmpty() && allDone(threadArr)) {
                break;
            }
            int waitForMessages2 = buf.waitForMessages(5, (num3, num4) -> {
                LockSupport.parkNanos(1L);
            });
            System.out.println("available = " + waitForMessages2);
            buf.publishReadIndex(waitForMessages2);
        }
        if (!$assertionsDisabled && !buf.isEmpty()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !allDone(threadArr)) {
            throw new AssertionError();
        }
    }

    protected static boolean allDone(Thread[] threadArr) {
        for (Thread thread : threadArr) {
            if (thread.isAlive()) {
                return false;
            }
        }
        return true;
    }

    protected static List<Message> create(int i, Address... addressArr) {
        ArrayList arrayList = new ArrayList(addressArr.length);
        for (Address address : addressArr) {
            arrayList.add(new BytesMessage(address, new byte[i]));
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !RingBundlerTest.class.desiredAssertionStatus();
        a = Util.createRandomAddress("A");
        b = Util.createRandomAddress("B");
        c = Util.createRandomAddress("C");
        d = Util.createRandomAddress("D");
    }
}
