package org.jgroups.tests;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.MERGE2;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.RSVP;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.DefaultSocketFactory;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.SocketFactory;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.TimeScheduler2;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, sequential = true)
/* loaded from: input_file:org/jgroups/tests/MessageDispatcherRSVPTest.class */
public class MessageDispatcherRSVPTest {
    protected static final int NUM = 2;
    protected final JChannel[] channels = new JChannel[2];
    protected final MessageDispatcher[] dispatchers = new MessageDispatcher[2];
    protected MyDiagnosticsHandler handler;
    protected ThreadPoolExecutor oob_thread_pool;
    protected ThreadPoolExecutor thread_pool;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/tests/MessageDispatcherRSVPTest$Closer.class */
    protected static class Closer extends Thread {
        protected final JChannel ch;

        public Closer(JChannel jChannel) {
            this.ch = jChannel;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Util.sleep(2000L);
            System.out.println("closing channel");
            Util.close(this.ch);
        }
    }

    /* loaded from: input_file:org/jgroups/tests/MessageDispatcherRSVPTest$MyDiagnosticsHandler.class */
    protected static class MyDiagnosticsHandler extends DiagnosticsHandler {
        protected MyDiagnosticsHandler(InetAddress inetAddress, int i, Log log, SocketFactory socketFactory, ThreadFactory threadFactory) {
            super(inetAddress, i, log, socketFactory, threadFactory);
        }

        @Override // org.jgroups.stack.DiagnosticsHandler
        public void start() throws IOException {
            super.start();
        }

        @Override // org.jgroups.stack.DiagnosticsHandler
        public void stop() {
        }

        public void destroy() {
            super.stop();
        }
    }

    @BeforeMethod
    void setUp() throws Exception {
        this.handler = new MyDiagnosticsHandler(InetAddress.getByName("224.0.75.75"), 7500, LogFactory.getLog(DiagnosticsHandler.class), new DefaultSocketFactory(), new DefaultThreadFactory("", false));
        this.handler.start();
        TimeScheduler2 timeScheduler2 = new TimeScheduler2(new DefaultThreadFactory("Timer", true, true), 5, 20, Global.THREADPOOL_SHUTDOWN_WAIT_TIME, 5000, "abort");
        this.oob_thread_pool = new ThreadPoolExecutor(5, Math.max(5, 0), Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(4));
        this.oob_thread_pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        this.thread_pool = new ThreadPoolExecutor(5, Math.max(5, 0), Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(4));
        this.thread_pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        System.out.print("Connecting channels: ");
        for (int i = 0; i < 2; i++) {
            SHARED_LOOPBACK shared_loopback = (SHARED_LOOPBACK) new SHARED_LOOPBACK().setValue("enable_bundling", false);
            shared_loopback.setLoopback(false);
            shared_loopback.setTimer(timeScheduler2);
            shared_loopback.setOOBThreadPool(this.oob_thread_pool);
            shared_loopback.setDefaultThreadPool(this.thread_pool);
            shared_loopback.setDiagnosticsHandler(this.handler);
            this.channels[i] = Util.createChannel(shared_loopback, new DISCARD(), new PING().setValue("timeout", Integer.valueOf(Event.USER_DEFINED)).setValue("num_initial_members", 2).setValue("force_sending_discovery_rsps", true), new MERGE2().setValue("min_interval", Integer.valueOf(Event.USER_DEFINED)).setValue("max_interval", 3000), new NAKACK2().setValue("use_mcast_xmit", false).setValue("discard_delivered_msgs", true).setValue("log_discard_msgs", false).setValue("log_not_found_msgs", false), new UNICAST3().setValue("xmit_table_num_rows", 5).setValue("xmit_interval", 300), new RSVP().setValue("timeout", 10000).setValue("throw_exception_on_timeout", true), new GMS().setValue("print_local_addr", false).setValue("leave_timeout", 100).setValue("log_view_warnings", false).setValue("view_ack_collection_timeout", 2000).setValue("log_collect_msgs", false));
            this.channels[i].setName(String.valueOf(i + 1));
            this.dispatchers[i] = new MessageDispatcher(this.channels[i], null, null);
            this.channels[i].connect("MessageDispatcherRSVPTest");
            System.out.print((i + 1) + " ");
            if (i == 0) {
                Util.sleep(1000L);
            }
        }
        Util.waitUntilAllChannelsHaveSameSize(30000L, 1000L, this.channels);
        System.out.println("");
    }

    @AfterMethod
    void tearDown() throws Exception {
        for (int i = 1; i >= 0; i--) {
            ProtocolStack protocolStack = this.channels[i].getProtocolStack();
            protocolStack.stopStack(this.channels[i].getClusterName());
            protocolStack.destroy();
        }
        this.handler.destroy();
    }

    public void testCancellationByClosingChannel() throws Exception {
        testCancellationByClosing(false, new Closer(this.channels[0]));
    }

    public void testCancellationByClosingChannelUnicast() throws Exception {
        testCancellationByClosing(true, new Closer(this.channels[0]));
    }

    public void testSendingMessageOnClosedChannel() throws Exception {
        sendMessageOnClosedChannel(new Message(this.channels[1].getAddress(), "bla"));
        sendMessageOnClosedChannel(new Message((Address) null, "bla"));
    }

    public void testSendingMessageOnClosedChannelRSVP() throws Exception {
        Message message = new Message(this.channels[1].getAddress(), (Address) null, "bla");
        message.setFlag(Message.Flag.RSVP);
        sendMessageOnClosedChannel(message);
        Message message2 = new Message((Address) null, "bla");
        message2.setFlag(Message.Flag.RSVP);
        sendMessageOnClosedChannel(message2);
    }

    protected void testCancellationByClosing(boolean z, Thread thread) throws Exception {
        Address address;
        ((DISCARD) this.channels[0].getProtocolStack().findProtocol(DISCARD.class)).setDiscardAll(true);
        if (z) {
            try {
                address = this.channels[1].getAddress();
            } catch (IllegalStateException e) {
                System.out.println("received \"" + e + "\" as expected");
                return;
            }
        } else {
            address = null;
        }
        Address address2 = address;
        Message message = new Message(address2, "bla");
        message.setFlag(Message.Flag.RSVP);
        thread.start();
        if (z) {
            System.out.println("sending unicast message to " + address2);
            this.dispatchers[0].sendMessage(message, RequestOptions.SYNC());
            if (!$assertionsDisabled) {
                throw new AssertionError("sending the message on a closed channel should have thrown an exception");
            }
        } else {
            System.out.println("sending multicast message");
            RspList castMessage = this.dispatchers[0].castMessage(Collections.singleton(this.channels[1].getAddress()), message, RequestOptions.SYNC());
            System.out.println("rsps = " + castMessage);
            if (!$assertionsDisabled && castMessage.size() != 1) {
                throw new AssertionError();
            }
            Rsp rsp = (Rsp) castMessage.iterator().next();
            System.out.println("rsp = " + rsp);
            if (!$assertionsDisabled && !rsp.hasException()) {
                throw new AssertionError();
            }
            Throwable exception = rsp.getException();
            if (!$assertionsDisabled && !(exception instanceof IllegalStateException)) {
                throw new AssertionError();
            }
        }
    }

    protected void sendMessageOnClosedChannel(Message message) throws Exception {
        this.channels[0].close();
        try {
            if (message.getDest() == null) {
                this.dispatchers[0].castMessage(Collections.singleton(this.channels[1].getAddress()), message, RequestOptions.SYNC());
            } else {
                this.dispatchers[0].sendMessage(message, RequestOptions.SYNC());
            }
            if ($assertionsDisabled) {
            } else {
                throw new AssertionError("sending the message on a closed channel should have thrown an exception");
            }
        } catch (IllegalStateException e) {
            System.out.println("received \"" + e + "\" as expected");
        }
    }

    static {
        $assertionsDisabled = !MessageDispatcherRSVPTest.class.desiredAssertionStatus();
    }
}
