package org.jgroups.tests;

import java.io.PrintStream;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.Response;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.SHARED_LOOPBACK_PING;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.TIME_SENSITIVE}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/RpcDispatcherAsyncInvocationTest.class */
public class RpcDispatcherAsyncInvocationTest {
    protected JChannel a;
    protected JChannel b;
    protected RpcDispatcher disp1;
    protected RpcDispatcher disp2;
    protected final AtomicInteger count = new AtomicInteger(0);
    protected Method incr_method;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/RpcDispatcherAsyncInvocationTest$MyRequestHandler.class */
    protected class MyRequestHandler implements RequestHandler {
        protected MyRequestHandler() {
        }

        @Override // org.jgroups.blocks.RequestHandler
        public void handle(Message message, Response response) throws Exception {
            if (message.isFlagSet(Message.Flag.OOB)) {
                new Thread(() -> {
                    int incr = RpcDispatcherAsyncInvocationTest.this.incr();
                    if (response != null) {
                        response.send((Object) Integer.valueOf(incr), false);
                    }
                }).start();
                return;
            }
            int incr = RpcDispatcherAsyncInvocationTest.this.incr();
            if (response != null) {
                response.send((Object) Integer.valueOf(incr), false);
            }
        }

        @Override // org.jgroups.blocks.RequestHandler
        public Object handle(Message message) throws Exception {
            return Integer.valueOf(RpcDispatcherAsyncInvocationTest.this.incr());
        }
    }

    @Test(enabled = false)
    public int incr() {
        Util.sleep(500L);
        return this.count.incrementAndGet();
    }

    @AfterMethod
    protected void destroy() {
        this.disp2.stop();
        this.disp1.stop();
        Util.close(this.b, this.a);
    }

    @BeforeMethod
    protected void init() throws Exception {
        this.incr_method = RpcDispatcherAsyncInvocationTest.class.getMethod("incr", new Class[0]);
        this.a = createChannel("A");
        this.b = createChannel("B");
        this.disp1 = new RpcDispatcher(this.a, this);
        this.disp2 = new RpcDispatcher(this.b, this);
        this.a.connect("RpcDispatcherAsyncInvocationTest");
        this.b.connect("RpcDispatcherAsyncInvocationTest");
        this.disp2.setRequestHandler(new MyRequestHandler());
        this.disp2.asyncDispatching(true);
        this.count.set(0);
    }

    public void testRegularInvocation() throws Exception {
        invoke(10, false, 5000L, 8000L);
    }

    public void testRegularAsyncInvocation() throws Exception {
        invoke(10, false, 5000L, 8000L);
    }

    public void testOOBAsyncInvocation() throws Exception {
        invoke(10, true, 500L, 1000L);
    }

    protected void invoke(int i, boolean z, long j, long j2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        invokeRpc(i, z);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        PrintStream printStream = System.out;
        printStream.println("took " + currentTimeMillis2 + " ms: " + printStream);
        if ($assertionsDisabled) {
            return;
        }
        if (currentTimeMillis2 < j || currentTimeMillis2 > j2) {
            AssertionError assertionError = new AssertionError("time was expected to be in range [" + j + " .. " + assertionError + "] but was " + j2);
            throw assertionError;
        }
    }

    protected Collection<Integer> invokeRpc(int i, boolean z) throws Exception {
        RequestOptions SYNC = RequestOptions.SYNC();
        if (z) {
            SYNC.flags(Message.Flag.OOB);
        }
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        MethodCall methodCall = new MethodCall(this.incr_method, new Object[0]);
        for (int i2 = 0; i2 < i; i2++) {
            this.disp1.callRemoteMethodWithFuture(this.b.getAddress(), methodCall, SYNC).whenComplete((obj, th) -> {
                try {
                    concurrentLinkedQueue.add((Integer) obj);
                    System.out.println("<-- " + obj);
                } catch (Exception e) {
                }
            });
        }
        Util.waitUntil(10000L, 50L, () -> {
            return concurrentLinkedQueue.size() == i;
        }, () -> {
            return String.format("expected %d but size is %d: %s", Integer.valueOf(i), Integer.valueOf(concurrentLinkedQueue.size()), concurrentLinkedQueue);
        });
        return concurrentLinkedQueue;
    }

    protected static JChannel createChannel(String str) throws Exception {
        SHARED_LOOPBACK shared_loopback = new SHARED_LOOPBACK();
        shared_loopback.getThreadPool().setMinThreads(10).setMaxThreads(20);
        return new JChannel(shared_loopback, new SHARED_LOOPBACK_PING(), new NAKACK2(), new UNICAST3(), new GMS()).name(str);
    }

    static {
        $assertionsDisabled = !RpcDispatcherAsyncInvocationTest.class.desiredAssertionStatus();
    }
}
