package org.jgroups.protocols;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.Receiver;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/protocols/UNICAST_OOB_Test.class */
public class UNICAST_OOB_Test {
    protected JChannel a;
    protected JChannel b;
    protected static final long XMIT_INTERVAL = 500;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/jgroups/protocols/UNICAST_OOB_Test$MyReceiver.class */
    public static class MyReceiver implements Receiver {
        List<Long> seqnos = Collections.synchronizedList(new LinkedList());

        public List<Long> getSeqnos() {
            return this.seqnos;
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            if (message != null) {
                Long l = (Long) message.getObject();
                System.out.println(">> received " + l);
                this.seqnos.add(l);
            }
        }

        public int size() {
            return this.seqnos.size();
        }
    }

    @BeforeMethod
    void setup() throws Exception {
        this.a = createChannel("A");
        this.b = createChannel("B");
        this.a.connect("UNICAST_OOB_Test");
        this.b.connect("UNICAST_OOB_Test");
    }

    @AfterMethod
    void tearDown() throws Exception {
        Util.close(this.b, this.a);
    }

    public void testRegularMessages() throws Exception {
        sendMessages(false);
    }

    public void testOutOfBandMessages() throws Exception {
        sendMessages(true);
    }

    public void testSecondMessageReceivedFirstRegular() throws Exception {
        _testSecondMessageReceivedFirst(false, false);
    }

    public void testSecondMessageReceivedFirstRegularBatched() throws Exception {
        _testSecondMessageReceivedFirst(false, true);
    }

    public void testSecondMessageReceivedFirstOOB() throws Exception {
        _testSecondMessageReceivedFirst(true, false);
    }

    public void testSecondMessageReceivedFirstOOBBatched() throws Exception {
        _testSecondMessageReceivedFirst(true, true);
    }

    protected void _testSecondMessageReceivedFirst(boolean z, boolean z2) throws Exception {
        Address address = this.a.getAddress();
        Address address2 = this.b.getAddress();
        UNICAST3 unicast3 = (UNICAST3) this.a.getProtocolStack().findProtocol(UNICAST3.class);
        UNICAST3 unicast32 = (UNICAST3) this.b.getProtocolStack().findProtocol(UNICAST3.class);
        unicast3.removeReceiveConnection(address2);
        unicast3.removeSendConnection(address2);
        unicast32.removeReceiveConnection(address);
        unicast32.removeSendConnection(address);
        System.out.println("=============== removed connection between A and B ===========");
        this.a.getProtocolStack().insertProtocol(new REVERSE().numMessagesToReverse(5).filter(message -> {
            return message.getDest() != null && address2.equals(message.getSrc()) && (message.getFlags(false) == 0 || message.isFlagSet(Message.Flag.OOB));
        }), ProtocolStack.Position.BELOW, UNICAST3.class);
        if (z2) {
            MAKE_BATCH make_batch = (MAKE_BATCH) new MAKE_BATCH().unicasts(true).setAddress(address);
            this.a.getProtocolStack().insertProtocol(make_batch, ProtocolStack.Position.BELOW, UNICAST3.class);
            make_batch.start();
        }
        MyReceiver myReceiver = new MyReceiver();
        this.a.setReceiver(myReceiver);
        System.out.println("========== B sends messages 1-5 to A ==========");
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 1; i <= 5; i++) {
            BytesMessage bytesMessage = new BytesMessage(address, Long.valueOf(i));
            if (z) {
                bytesMessage.setFlag(Message.Flag.OOB);
            }
            this.b.send(bytesMessage);
        }
        Util.waitUntil(10000L, 10L, () -> {
            return myReceiver.size() == 5;
        });
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        System.out.printf("===== list: %s (in %d ms)\n", myReceiver.getSeqnos(), Long.valueOf(currentTimeMillis2));
        if (!$assertionsDisabled && currentTimeMillis2 >= 1000) {
            throw new AssertionError();
        }
    }

    private void sendMessages(boolean z) throws Exception {
        DISCARD_PAYLOAD discard_payload = new DISCARD_PAYLOAD();
        MyReceiver myReceiver = new MyReceiver();
        this.b.setReceiver(myReceiver);
        ProtocolStack protocolStack = this.a.getProtocolStack();
        Protocol findProtocol = protocolStack.findProtocol(Util.getUnicastProtocols());
        System.out.println("Found unicast protocol " + findProtocol.getClass().getSimpleName());
        protocolStack.insertProtocolInStack(discard_payload, findProtocol, ProtocolStack.Position.BELOW);
        Util.waitUntilAllChannelsHaveSameView(10000L, 1000L, this.a, this.b);
        Address address = this.b.getAddress();
        for (int i = 1; i <= 5; i++) {
            ObjectMessage objectMessage = new ObjectMessage(address, Long.valueOf(i));
            if (i == 4 && z) {
                objectMessage.setFlag(Message.Flag.OOB);
            }
            System.out.println("-- sending message #" + i);
            this.a.send(objectMessage);
            Util.sleep(100L);
        }
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (myReceiver.size() < 5) {
            Util.sleep(XMIT_INTERVAL);
            if (currentTimeMillis <= System.currentTimeMillis()) {
                break;
            }
        }
        List<Long> seqnos = myReceiver.getSeqnos();
        System.out.println("-- sequence numbers: " + seqnos);
        if (!$assertionsDisabled && seqnos.size() != 5) {
            throw new AssertionError();
        }
        if (!z) {
            for (int i2 = 0; i2 < 5; i2++) {
                if (!$assertionsDisabled && seqnos.get(i2).longValue() != i2 + 1) {
                    throw new AssertionError(" seqno is " + seqnos.get(i2) + ", but expected " + i2 + "1");
                }
            }
            return;
        }
        int i3 = -1;
        int i4 = -1;
        for (int i5 = 0; i5 < 5; i5++) {
            if (seqnos.get(i5).longValue() == 3) {
                i3 = i5;
            }
            if (seqnos.get(i5).longValue() == 4) {
                i4 = i5;
            }
        }
        if (!$assertionsDisabled && i4 >= i3) {
            throw new AssertionError("4 must come before 3 in list " + seqnos);
        }
    }

    protected static JChannel createChannel(String str) throws Exception {
        return new JChannel(new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), new NAKACK2(), new UNICAST3().setXmitInterval(XMIT_INTERVAL), new GMS()).name(str);
    }

    static {
        $assertionsDisabled = !UNICAST_OOB_Test.class.desiredAssertionStatus();
    }
}
