package org.jgroups.tests;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.httpclient.cookie.Cookie2;
import org.jgroups.Address;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.UNICAST;
import org.jgroups.protocols.pbcast.NAKACK;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/jgroups-@{artifactId}:org/jgroups/tests/OOBTest.class
 */
@Test(groups = {Global.STACK_DEPENDENT}, sequential = true)
/* loaded from: input_file:WEB-INF/lib/jgroups-2.9.0.GA.jar:org/jgroups/tests/OOBTest.class */
public class OOBTest extends ChannelTestBase {
    private JChannel c1;
    private JChannel c2;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:WEB-INF/lib/jgroups-@{artifactId}:org/jgroups/tests/OOBTest$BlockingReceiver.class
     */
    /* loaded from: input_file:WEB-INF/lib/jgroups-2.9.0.GA.jar:org/jgroups/tests/OOBTest$BlockingReceiver.class */
    public static class BlockingReceiver extends ReceiverAdapter {
        final Lock lock;
        final List<Long> msgs = Collections.synchronizedList(new LinkedList());

        public BlockingReceiver(Lock lock) {
            this.lock = lock;
        }

        public List<Long> getMsgs() {
            return this.msgs;
        }

        @Override // org.jgroups.ReceiverAdapter, org.jgroups.MessageListener
        public void receive(Message message) {
            if (!message.isFlagSet((byte) 1)) {
                this.lock.lock();
                this.lock.unlock();
            }
            this.msgs.add((Long) message.getObject());
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:WEB-INF/lib/jgroups-@{artifactId}:org/jgroups/tests/OOBTest$MyReceiver.class
     */
    /* loaded from: input_file:WEB-INF/lib/jgroups-2.9.0.GA.jar:org/jgroups/tests/OOBTest$MyReceiver.class */
    private static class MyReceiver extends ReceiverAdapter {
        private final Collection<Integer> msgs = new ConcurrentLinkedQueue();
        final String name;

        public MyReceiver(String str) {
            this.name = str;
        }

        public Collection<Integer> getMsgs() {
            return this.msgs;
        }

        @Override // org.jgroups.ReceiverAdapter, org.jgroups.MessageListener
        public void receive(Message message) {
            this.msgs.add((Integer) message.getObject());
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:WEB-INF/lib/jgroups-@{artifactId}:org/jgroups/tests/OOBTest$MySleepingReceiver.class
     */
    /* loaded from: input_file:WEB-INF/lib/jgroups-2.9.0.GA.jar:org/jgroups/tests/OOBTest$MySleepingReceiver.class */
    private static class MySleepingReceiver extends MyReceiver {
        final long sleep_time;

        public MySleepingReceiver(String str, long j) {
            super(str);
            this.sleep_time = j;
        }

        @Override // org.jgroups.tests.OOBTest.MyReceiver, org.jgroups.ReceiverAdapter, org.jgroups.MessageListener
        public void receive(Message message) {
            super.receive(message);
            System.out.println("-- received " + message.getObject());
            Util.sleep(this.sleep_time);
        }
    }

    @BeforeMethod
    public void init() throws Exception {
        this.c1 = createChannel(true, 2);
        this.c1.setName("C1");
        this.c2 = createChannel(this.c1);
        this.c2.setName("C2");
        setOOBPoolSize(this.c1, this.c2);
        setStableGossip(this.c1, this.c2);
        this.c1.connect("OOBMcastTest");
        this.c2.connect("OOBMcastTest");
        View view = this.c2.getView();
        this.log.info("view = " + view);
        if (!$assertionsDisabled && view.size() != 2) {
            throw new AssertionError("view is " + view);
        }
    }

    @AfterMethod
    public void cleanup() {
        Util.sleep(1000L);
        Util.close(this.c2, this.c1);
    }

    public void testNonBlockingUnicastOOBMessage() throws ChannelNotConnectedException, ChannelClosedException {
        send(this.c2.getAddress());
    }

    public void testNonBlockingMulticastOOBMessage() throws ChannelNotConnectedException, ChannelClosedException {
        send(null);
    }

    public void testRegularAndOOBUnicasts() throws Exception {
        DISCARD discard = new DISCARD();
        this.c1.getProtocolStack().insertProtocol(discard, 2, UNICAST.class);
        Address address = this.c2.getAddress();
        Message message = new Message(address, (Address) null, (Serializable) 1);
        Message message2 = new Message(address, (Address) null, (Serializable) 2);
        message2.setFlag((byte) 1);
        Message message3 = new Message(address, (Address) null, (Serializable) 3);
        MyReceiver myReceiver = new MyReceiver("C2");
        this.c2.setReceiver(myReceiver);
        this.c1.send(message);
        discard.setDropDownUnicasts(1);
        this.c1.send(message2);
        this.c1.send(message3);
        sendStableMessages(this.c1, this.c2);
        Util.sleep(1000L);
        Collection<Integer> msgs = myReceiver.getMsgs();
        if (!$assertionsDisabled && msgs.size() != 3) {
            throw new AssertionError("list is " + msgs);
        }
        if ($assertionsDisabled) {
            return;
        }
        if (!msgs.contains(1) || !msgs.contains(2) || !msgs.contains(3)) {
            throw new AssertionError();
        }
    }

    public void testRegularAndOOBUnicasts2() throws Exception {
        DISCARD discard = new DISCARD();
        this.c1.getProtocolStack().insertProtocol(discard, 2, UNICAST.class);
        Address address = this.c2.getAddress();
        Message message = new Message(address, (Address) null, (Serializable) 1);
        Message message2 = new Message(address, (Address) null, (Serializable) 2);
        message2.setFlag((byte) 1);
        Message message3 = new Message(address, (Address) null, (Serializable) 3);
        message3.setFlag((byte) 1);
        Message message4 = new Message(address, (Address) null, (Serializable) 4);
        MyReceiver myReceiver = new MyReceiver("C2");
        this.c2.setReceiver(myReceiver);
        this.c1.send(message);
        discard.setDropDownUnicasts(1);
        this.c1.send(message3);
        discard.setDropDownUnicasts(1);
        this.c1.send(message2);
        this.c1.send(message4);
        Util.sleep(1000L);
        Collection<Integer> msgs = myReceiver.getMsgs();
        int i = 10;
        while (msgs.size() < 4) {
            i--;
            if (i <= 0) {
                break;
            }
            Util.sleep(500L);
            sendStableMessages(this.c1, this.c2);
        }
        this.log.info("list = " + msgs);
        if (!$assertionsDisabled && msgs.size() != 4) {
            throw new AssertionError("list is " + msgs);
        }
        if ($assertionsDisabled) {
            return;
        }
        if (!msgs.contains(1) || !msgs.contains(2) || !msgs.contains(3) || !msgs.contains(4)) {
            throw new AssertionError();
        }
    }

    public void testRegularAndOOBMulticasts() throws Exception {
        DISCARD discard = new DISCARD();
        this.c1.getProtocolStack().insertProtocol(discard, 2, NAKACK.class);
        this.c1.setOpt(3, false);
        Message message = new Message((Address) null, (Address) null, (Serializable) 1);
        Message message2 = new Message((Address) null, (Address) null, (Serializable) 2);
        message2.setFlag((byte) 1);
        Message message3 = new Message((Address) null, (Address) null, (Serializable) 3);
        MyReceiver myReceiver = new MyReceiver("C2");
        this.c2.setReceiver(myReceiver);
        this.c1.send(message);
        discard.setDropDownMulticasts(1);
        this.c1.send(message2);
        this.c1.send(message3);
        Util.sleep(500L);
        Collection<Integer> msgs = myReceiver.getMsgs();
        for (int i = 0; i < 10; i++) {
            this.log.info("list = " + msgs);
            if (msgs.size() == 3) {
                break;
            }
            Util.sleep(1000L);
            sendStableMessages(this.c1, this.c2);
        }
        if (!$assertionsDisabled && msgs.size() != 3) {
            throw new AssertionError("list is " + msgs);
        }
        if ($assertionsDisabled) {
            return;
        }
        if (!msgs.contains(1) || !msgs.contains(2) || !msgs.contains(3)) {
            throw new AssertionError();
        }
    }

    @Test(invocationCount = 5)
    public void testRandomRegularAndOOBMulticasts() throws Exception {
        DISCARD discard = new DISCARD();
        discard.setLocalAddress(this.c1.getAddress());
        discard.setDownDiscardRate(0.5d);
        ProtocolStack protocolStack = this.c1.getProtocolStack();
        protocolStack.insertProtocol(discard, 2, NAKACK.class);
        MyReceiver myReceiver = new MyReceiver("C1");
        MyReceiver myReceiver2 = new MyReceiver("C2");
        this.c1.setReceiver(myReceiver);
        this.c2.setReceiver(myReceiver2);
        send(null, 20, 10, 0.5d);
        Collection<Integer> msgs = myReceiver.getMsgs();
        Collection<Integer> msgs2 = myReceiver2.getMsgs();
        for (int i = 0; i < 10 && (msgs.size() != 20 || msgs2.size() != 20); i++) {
            this.log.info("one size " + msgs.size() + ", two size " + msgs2.size());
            Util.sleep(1000L);
            sendStableMessages(this.c1, this.c2);
        }
        this.log.info("one size " + msgs.size() + ", two size " + msgs2.size());
        protocolStack.removeProtocol("DISCARD");
        for (int i2 = 0; i2 < 5 && (msgs.size() != 20 || msgs2.size() != 20); i2++) {
            sendStableMessages(this.c1, this.c2);
            Util.sleep(500L);
        }
        System.out.println("C1 received " + msgs.size() + " messages (20 expected)\nC2 received " + msgs2.size() + " messages (20 expected)");
        check(20, msgs, msgs2);
    }

    public void testOOBMessageLoss() throws ChannelNotConnectedException, ChannelClosedException {
        Util.close(this.c2);
        MySleepingReceiver mySleepingReceiver = new MySleepingReceiver("C1", 1000L);
        this.c1.setReceiver(mySleepingReceiver);
        this.c1.getProtocolStack().getTransport().setOOBRejectionPolicy(Cookie2.DISCARD);
        for (int i = 1; i <= 10; i++) {
            Message message = new Message((Address) null, (Address) null, Integer.valueOf(i));
            message.setFlag((byte) 1);
            this.c1.send(message);
        }
        STABLE stable = (STABLE) this.c1.getProtocolStack().findProtocol(STABLE.class);
        if (stable != null) {
            stable.runMessageGarbageCollection();
        }
        Collection<Integer> msgs = mySleepingReceiver.getMsgs();
        for (int i2 = 0; i2 < 20 && msgs.size() != 10; i2++) {
            Util.sleep(1000L);
            sendStableMessages(this.c1, this.c2);
        }
        System.out.println("msgs = " + Util.print(msgs));
        if (!$assertionsDisabled && msgs.size() != 10) {
            throw new AssertionError("expected 10 messages but got " + msgs.size() + ", msgs=" + Util.print(msgs));
        }
        for (int i3 = 1; i3 <= 10; i3++) {
            if (!$assertionsDisabled && !msgs.contains(Integer.valueOf(i3))) {
                throw new AssertionError();
            }
        }
    }

    public void testOOBUnicastMessageLoss() throws ChannelNotConnectedException, ChannelClosedException {
        MySleepingReceiver mySleepingReceiver = new MySleepingReceiver("C2", 1000L);
        this.c2.setReceiver(mySleepingReceiver);
        this.c1.getProtocolStack().getTransport().setOOBRejectionPolicy(Cookie2.DISCARD);
        Address address = this.c2.getAddress();
        for (int i = 1; i <= 10; i++) {
            Message message = new Message(address, (Address) null, Integer.valueOf(i));
            message.setFlag((byte) 1);
            this.c1.send(message);
        }
        Collection<Integer> msgs = mySleepingReceiver.getMsgs();
        for (int i2 = 0; i2 < 20 && msgs.size() != 10; i2++) {
            Util.sleep(1000L);
        }
        if (!$assertionsDisabled && msgs.size() != 10) {
            throw new AssertionError("expected 10 messages but got " + msgs.size() + ", msgs=" + Util.print(msgs));
        }
        for (int i3 = 1; i3 <= 10; i3++) {
            if (!$assertionsDisabled && !msgs.contains(Integer.valueOf(i3))) {
                throw new AssertionError();
            }
        }
    }

    private void send(final Address address, int i, int i2, final double d) throws Exception {
        if (i2 <= 0) {
            throw new IllegalArgumentException("number of threads <= 0");
        }
        if (i % i2 != 0) {
            throw new IllegalArgumentException("number of messages ( " + i + ") needs to be divisible by the number o threads (" + i2 + ")");
        }
        if (i2 <= 1) {
            for (int i3 = 0; i3 < i; i3++) {
                JChannel jChannel = Util.tossWeightedCoin(0.5d) ? this.c1 : this.c2;
                boolean z = Util.tossWeightedCoin(d);
                Message message = new Message(address, (Address) null, Integer.valueOf(i3));
                if (z) {
                    message.setFlag((byte) 1);
                }
                jChannel.send(message);
            }
            return;
        }
        final int i4 = i / i2;
        Thread[] threadArr = new Thread[i2];
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i5 = 0; i5 < threadArr.length; i5++) {
            threadArr[i5] = new Thread() { // from class: org.jgroups.tests.OOBTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    for (int i6 = 0; i6 < i4; i6++) {
                        JChannel jChannel2 = Util.tossWeightedCoin(0.5d) ? OOBTest.this.c1 : OOBTest.this.c2;
                        boolean z2 = Util.tossWeightedCoin(d);
                        Message message2 = new Message(address, (Address) null, Integer.valueOf(atomicInteger.incrementAndGet()));
                        if (z2) {
                            message2.setFlag((byte) 1);
                        }
                        try {
                            jChannel2.send(message2);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            threadArr[i5].start();
        }
        for (Thread thread : threadArr) {
            thread.join(20000L);
        }
    }

    private void send(Address address) throws ChannelNotConnectedException, ChannelClosedException {
        ReentrantLock reentrantLock = new ReentrantLock();
        BlockingReceiver blockingReceiver = new BlockingReceiver(reentrantLock);
        this.c2.setReceiver(blockingReceiver);
        reentrantLock.lock();
        this.c1.send(new Message(address, (Address) null, (Serializable) 1L));
        Util.sleep(1000L);
        for (int i = 2; i <= 10; i++) {
            Message message = new Message(address, (Address) null, Long.valueOf(i));
            message.setFlag((byte) 1);
            this.c1.send(message);
        }
        sendStableMessages(this.c1, this.c2);
        Util.sleep(500L);
        List<Long> msgs = blockingReceiver.getMsgs();
        for (int i2 = 0; i2 < 10; i2++) {
            this.log.info("list = " + msgs);
            if (msgs.size() == 9) {
                break;
            }
            Util.sleep(1000L);
        }
        System.out.println("list = " + msgs);
        if (!$assertionsDisabled && msgs.size() != 9) {
            throw new AssertionError("list is " + msgs);
        }
        if (!$assertionsDisabled && !msgs.contains(2L)) {
            throw new AssertionError();
        }
        this.log.info("[" + Thread.currentThread().getName() + "]: unlocking lock");
        if (reentrantLock.isHeldByCurrentThread()) {
            reentrantLock.unlock();
        }
        Util.sleep(10L);
        List<Long> msgs2 = blockingReceiver.getMsgs();
        System.out.println("list = " + msgs2);
        if (!$assertionsDisabled && msgs2.size() != 10) {
            throw new AssertionError("list is " + msgs2);
        }
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 10) {
                return;
            }
            if (!$assertionsDisabled && !msgs2.contains(Long.valueOf(j2))) {
                throw new AssertionError();
            }
            j = j2 + 1;
        }
    }

    private void check(int i, Collection<Integer>... collectionArr) {
        for (Collection<Integer> collection : collectionArr) {
            this.log.info("list: " + collection);
        }
        for (Collection<Integer> collection2 : collectionArr) {
            TreeSet treeSet = new TreeSet();
            if (collection2.size() != i) {
                for (int i2 = 0; i2 < i; i2++) {
                    treeSet.add(Integer.valueOf(i2));
                }
                treeSet.removeAll(collection2);
                if (!$assertionsDisabled && collection2.size() != i) {
                    throw new AssertionError("expected " + i + " elements, but got " + collection2.size() + " (list=" + collection2 + "), missing=" + treeSet);
                }
            }
        }
    }

    private static void setOOBPoolSize(JChannel... jChannelArr) {
        for (JChannel jChannel : jChannelArr) {
            TP transport = jChannel.getProtocolStack().getTransport();
            transport.setOOBThreadPoolMinThreads(1);
            transport.setOOBThreadPoolMaxThreads(2);
        }
    }

    private static void setStableGossip(JChannel... jChannelArr) {
        for (JChannel jChannel : jChannelArr) {
            ((STABLE) jChannel.getProtocolStack().findProtocol(STABLE.class)).setDesiredAverageGossip(2000L);
        }
    }

    private static void sendStableMessages(JChannel... jChannelArr) {
        for (JChannel jChannel : jChannelArr) {
            STABLE stable = (STABLE) jChannel.getProtocolStack().findProtocol(STABLE.class);
            if (stable != null) {
                stable.runMessageGarbageCollection();
            }
        }
    }

    static {
        $assertionsDisabled = !OOBTest.class.desiredAssertionStatus();
    }
}
