package org.jgroups.tests;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.protocols.BARRIER;
import org.jgroups.protocols.Discovery;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.SHARED_LOOPBACK_PING;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.TpHeader;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/BARRIERTest.class */
public class BARRIERTest {
    protected JChannel ch;
    protected Discovery discovery_prot;
    protected BARRIER barrier_prot;
    protected TP tp;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/BARRIERTest$BlockingReceiver.class */
    protected static class BlockingReceiver implements Receiver {
        protected final CyclicBarrier barrier;

        BlockingReceiver(CyclicBarrier cyclicBarrier) {
            this.barrier = cyclicBarrier;
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            try {
                this.barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:org/jgroups/tests/BARRIERTest$MyReceiver.class */
    protected static class MyReceiver implements Receiver {
        protected final AtomicInteger num_mgs_received = new AtomicInteger(0);

        protected MyReceiver() {
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            if (this.num_mgs_received.incrementAndGet() % 1000 == 0) {
                System.out.println("<== " + this.num_mgs_received.get());
            }
        }

        public int getNumberOfReceivedMessages() {
            return this.num_mgs_received.get();
        }
    }

    @BeforeMethod
    void setUp() throws Exception {
        SHARED_LOOPBACK shared_loopback = new SHARED_LOOPBACK();
        this.tp = shared_loopback;
        SHARED_LOOPBACK_PING shared_loopback_ping = new SHARED_LOOPBACK_PING();
        this.discovery_prot = shared_loopback_ping;
        BARRIER barrier = new BARRIER();
        this.barrier_prot = barrier;
        this.ch = new JChannel(shared_loopback, shared_loopback_ping, barrier).name("A");
        this.ch.connect("BARRIERTest");
    }

    @AfterMethod
    void destroy() {
        Util.close(this.ch);
    }

    public void testBlocking() {
        if (!$assertionsDisabled && this.barrier_prot.isClosed()) {
            throw new AssertionError();
        }
        this.ch.down(new Event(76));
        if (!$assertionsDisabled && !this.barrier_prot.isClosed()) {
            throw new AssertionError();
        }
        this.ch.down(new Event(77));
        if (!$assertionsDisabled && this.barrier_prot.isClosed()) {
            throw new AssertionError();
        }
    }

    public void testThreadsBlockedOnBarrier() {
        MyReceiver myReceiver = new MyReceiver();
        this.ch.setReceiver(myReceiver);
        this.ch.down(new Event(76));
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                this.discovery_prot.up(createMessage());
            }).start();
        }
        Util.sleep(2000L);
        int numberOfInFlightThreads = this.barrier_prot.getNumberOfInFlightThreads();
        if (!$assertionsDisabled && numberOfInFlightThreads != 0) {
            throw new AssertionError();
        }
        this.ch.down(new Event(77));
        Util.sleep(2000L);
        int numberOfInFlightThreads2 = this.barrier_prot.getNumberOfInFlightThreads();
        if (!$assertionsDisabled && numberOfInFlightThreads2 != 0) {
            throw new AssertionError();
        }
        int numberOfReceivedMessages = myReceiver.getNumberOfReceivedMessages();
        if (!$assertionsDisabled && numberOfReceivedMessages != 0) {
            throw new AssertionError("expected 0 messages but got " + numberOfReceivedMessages);
        }
    }

    public void testThreadsBlockedOnMutex() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        this.ch.setReceiver(new BlockingReceiver(cyclicBarrier));
        Thread[] threadArr = new Thread[2];
        for (int i = 1; i <= threadArr.length; i++) {
            Thread thread = new Thread(() -> {
                this.discovery_prot.up(createMessage());
            });
            thread.setName("blocker-" + i);
            thread.start();
        }
        waitUntilNumThreadsAreBlocked(2, 10000L, 500L);
        if (!$assertionsDisabled && this.barrier_prot.getNumberOfInFlightThreads() != 2) {
            throw new AssertionError();
        }
        cyclicBarrier.await();
        waitUntilNumThreadsAreBlocked(0, 10000L, 500L);
        if (!$assertionsDisabled && this.barrier_prot.getNumberOfInFlightThreads() != 0) {
            throw new AssertionError();
        }
    }

    public void testThreadFlushTimeout() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        this.ch.setReceiver(new BlockingReceiver(cyclicBarrier));
        this.barrier_prot.setFlushTimeout(2000L);
        Thread[] threadArr = new Thread[2];
        for (int i = 1; i <= threadArr.length; i++) {
            Thread thread = new Thread(() -> {
                this.discovery_prot.up(createMessage());
            });
            thread.setName("blocker-" + i);
            thread.start();
        }
        waitUntilNumThreadsAreBlocked(2, 10000L, 500L);
        if (!$assertionsDisabled && this.barrier_prot.getNumberOfInFlightThreads() != 2) {
            throw new AssertionError();
        }
        try {
            this.ch.down(new Event(76));
        } catch (Exception e) {
            System.out.println("got exception as expected: " + e);
        }
        if (!$assertionsDisabled) {
            throw new AssertionError("closing BARRIER should have thrown an exception as threads couldn't be flushed");
        }
        cyclicBarrier.await();
    }

    protected Message createMessage() {
        return new EmptyMessage(null).setSrc(this.ch.getAddress()).putHeader(this.tp.getId(), new TpHeader("BARRIERTest"));
    }

    protected void waitUntilNumThreadsAreBlocked(int i, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis && this.barrier_prot.getNumberOfInFlightThreads() != i) {
            Util.sleep(j2);
        }
    }

    static {
        $assertionsDisabled = !BARRIERTest.class.desiredAssertionStatus();
    }
}
