package org.jgroups.tests;

import java.io.DataInput;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.BaseServer;
import org.jgroups.blocks.cs.NioServer;
import org.jgroups.blocks.cs.ReceiverAdapter;
import org.jgroups.blocks.cs.TcpServer;
import org.jgroups.util.Bits;
import org.jgroups.util.CondVar;
import org.jgroups.util.Util;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/ServerUnitTest.class */
public class ServerUnitTest {
    protected static final InetAddress bind_addr;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jgroups/tests/ServerUnitTest$MyReceiver.class */
    public static class MyReceiver extends ReceiverAdapter {
        protected final long num_expected;
        protected long stop_time;
        protected boolean send_response;
        protected final long modulo;
        protected final BaseServer server;
        protected final BooleanSupplier cond;
        protected final AtomicLong num_received = new AtomicLong(0);
        protected final AtomicLong num_sent = new AtomicLong(0);
        protected final CondVar done = new CondVar();
        protected long start_time = System.currentTimeMillis();

        MyReceiver(BaseServer baseServer, long j, boolean z) {
            this.server = baseServer;
            this.num_expected = j;
            this.send_response = z;
            this.modulo = j / 10;
            this.cond = () -> {
                return this.num_received.get() >= j;
            };
        }

        public synchronized long getNumReceived() {
            return this.num_received.get();
        }

        public synchronized long getNumExpected() {
            return this.num_expected;
        }

        @Override // org.jgroups.blocks.cs.ReceiverAdapter, org.jgroups.blocks.cs.Receiver
        public synchronized void receive(Address address, byte[] bArr, int i, int i2) {
            long incrementAndGet = this.num_received.incrementAndGet();
            if (incrementAndGet >= this.num_expected) {
                if (this.stop_time == 0) {
                    this.stop_time = System.currentTimeMillis();
                }
                this.done.signal(true);
            }
            if (!this.send_response || incrementAndGet > this.num_expected) {
                return;
            }
            try {
                ServerUnitTest.send(new byte[0], this.server, address);
                this.num_sent.incrementAndGet();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override // org.jgroups.blocks.cs.ReceiverAdapter, org.jgroups.blocks.cs.Receiver
        public synchronized void receive(Address address, DataInput dataInput) throws Exception {
            dataInput.readFully(new byte[dataInput.readInt()]);
            long incrementAndGet = this.num_received.incrementAndGet();
            if (incrementAndGet >= this.num_expected) {
                if (this.stop_time == 0) {
                    this.stop_time = System.currentTimeMillis();
                }
                this.done.signal(true);
            }
            if (!this.send_response || incrementAndGet > this.num_expected) {
                return;
            }
            try {
                ServerUnitTest.send(new byte[0], this.server, address);
                this.num_sent.incrementAndGet();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void waitForCompletion(long j) throws Exception {
            this.done.waitFor(this.cond, j, TimeUnit.MILLISECONDS);
        }

        public String toString() {
            return String.format("expected=%d, received=%d\n", Long.valueOf(this.num_expected), Long.valueOf(this.num_received.get()));
        }
    }

    public void testSetup() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                BaseServer create2 = create(z, 0);
                try {
                    Assert.assertNotSame(create.localAddress(), create2.localAddress());
                    if (create2 != null) {
                        create2.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    public void testSendEmptyData() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                Address localAddress = create.localAddress();
                create.receiver(new ReceiverAdapter() { // from class: org.jgroups.tests.ServerUnitTest.1
                });
                send(new byte[0], create, localAddress);
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public void testSendNullData() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                create.send(create.localAddress(), null, 0, 0);
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public void testSendToSelf() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                Address localAddress = create.localAddress();
                MyReceiver myReceiver = new MyReceiver(create, 1000L, false);
                byte[] bytes = "hello world".getBytes();
                create.receiver(myReceiver);
                for (int i = 0; i < 1000; i++) {
                    send(bytes, create, localAddress);
                }
                log("sent " + 1000 + " msgs");
                myReceiver.waitForCompletion(20000L);
                long j = myReceiver.stop_time - myReceiver.start_time;
                long numExpected = myReceiver.getNumExpected();
                double numReceived = j / myReceiver.getNumReceived();
                log("number expected=" + numExpected + ", number received=" + numExpected + ", total time=" + myReceiver.getNumReceived() + " (" + numExpected + " ms/msg)");
                Assert.assertEquals(myReceiver.getNumExpected(), myReceiver.getNumReceived());
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public void testSendToAll() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                BaseServer create2 = create(z, 0);
                try {
                    MyReceiver myReceiver = new MyReceiver(create, 1000L, false);
                    MyReceiver myReceiver2 = new MyReceiver(create2, 1000L, false);
                    byte[] bytes = "hello world".getBytes();
                    send(bytes, create, create2.localAddress());
                    Util.sleep(1000L);
                    create.receiver(myReceiver);
                    create2.receiver(myReceiver2);
                    for (int i = 0; i < 1000; i++) {
                        send(bytes, create, null);
                    }
                    log("sent " + 1000 + " msgs");
                    myReceiver2.waitForCompletion(20000L);
                    long j = myReceiver2.stop_time - myReceiver2.start_time;
                    long numExpected = myReceiver2.getNumExpected();
                    double numReceived = j / myReceiver2.getNumReceived();
                    log("number expected=" + numExpected + ", number received=" + numExpected + ", total time=" + myReceiver2.getNumReceived() + " (" + numExpected + " ms/msg)");
                    Assert.assertEquals(myReceiver2.getNumExpected(), myReceiver2.getNumReceived());
                    if (!$assertionsDisabled && myReceiver.getNumReceived() != 0 && myReceiver.getNumReceived() <= 0) {
                        throw new AssertionError();
                    }
                    if (create2 != null) {
                        create2.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    public void testSendToOther() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                BaseServer create2 = create(z, 0);
                try {
                    Address localAddress = create2.localAddress();
                    MyReceiver myReceiver = new MyReceiver(create2, 1000L, false);
                    byte[] bytes = "hello world".getBytes();
                    create2.receiver(myReceiver);
                    for (int i = 0; i < 1000; i++) {
                        send(bytes, create, localAddress);
                    }
                    log("sent " + 1000 + " msgs");
                    myReceiver.waitForCompletion(20000L);
                    long j = myReceiver.stop_time - myReceiver.start_time;
                    long numExpected = myReceiver.getNumExpected();
                    double numReceived = j / myReceiver.getNumReceived();
                    log("number expected=" + numExpected + ", number received=" + numExpected + ", total time=" + myReceiver.getNumReceived() + " (" + numExpected + " ms/msg)");
                    Assert.assertEquals(myReceiver.getNumExpected(), myReceiver.getNumReceived());
                    if (create2 != null) {
                        create2.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    public void testSendToOtherGetResponse() throws Exception {
        byte[] bytes = "hello world".getBytes();
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                BaseServer create2 = create(z, 0);
                try {
                    Address localAddress = create2.localAddress();
                    MyReceiver myReceiver = new MyReceiver(create, 1000L, false);
                    MyReceiver myReceiver2 = new MyReceiver(create2, 1000L, true);
                    create.receiver(myReceiver);
                    create2.receiver(myReceiver2);
                    for (int i = 0; i < 1000; i++) {
                        send(bytes, create, localAddress);
                    }
                    System.out.printf("\n\n%s sent %d msgs to %s\n", "A", 1000L, "B");
                    create.flushAll();
                    Util.waitUntilTrue(10000L, 100L, () -> {
                        return myReceiver2.getNumReceived() >= myReceiver2.getNumExpected();
                    });
                    create2.flushAll();
                    Util.waitUntil(10000L, 100L, () -> {
                        return myReceiver.getNumReceived() == myReceiver.getNumExpected();
                    });
                    long j = myReceiver.stop_time - myReceiver.start_time;
                    System.out.printf("A: expected=%d, received=%d\nB: expected=%d, received=%d, sent=%d\ntotal time=%d (%.2f ms/msg)", Long.valueOf(myReceiver.getNumExpected()), Long.valueOf(myReceiver.getNumReceived()), Long.valueOf(myReceiver.getNumExpected()), Long.valueOf(myReceiver2.getNumReceived()), Long.valueOf(myReceiver2.num_sent.get()), Long.valueOf(j), Double.valueOf(j / myReceiver.getNumReceived()));
                    Assert.assertEquals(myReceiver.getNumReceived(), myReceiver.getNumExpected());
                    if (create2 != null) {
                        create2.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    public void testReuseOfConnection() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                BaseServer create2 = create(z, 0);
                try {
                    int numConnections = create.getNumConnections();
                    if (!$assertionsDisabled && numConnections != 0) {
                        throw new AssertionError();
                    }
                    int numConnections2 = create2.getNumConnections();
                    if (!$assertionsDisabled && numConnections2 != 0) {
                        throw new AssertionError();
                    }
                    Address localAddress = create.localAddress();
                    Address localAddress2 = create2.localAddress();
                    byte[] bytes = "hello world".getBytes();
                    send(bytes, create, localAddress2);
                    send(bytes, create2, localAddress);
                    System.out.println("A: " + create + "\nB: " + create2);
                    waitForOpenConns(1, create, create2);
                    connectionEstablished(create, localAddress2);
                    connectionEstablished(create2, localAddress);
                    if (create2 != null) {
                        create2.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    public void testConnectionCountOnStop() throws Exception {
        for (boolean z : new boolean[]{false, true}) {
            BaseServer create = create(z, 0);
            try {
                BaseServer create2 = create(z, 0);
                try {
                    Address localAddress = create.localAddress();
                    Address localAddress2 = create2.localAddress();
                    byte[] bytes = "hello world".getBytes();
                    send(bytes, create, localAddress);
                    if (!$assertionsDisabled && create.getNumConnections() != 0) {
                        throw new AssertionError();
                    }
                    send(bytes, create, localAddress2);
                    send(bytes, create2, localAddress2);
                    send(bytes, create2, localAddress);
                    System.out.println("A:\n" + create + "\nB:\n" + create2);
                    int numConnections = create.getNumConnections();
                    int numConnections2 = create2.getNumConnections();
                    if (!$assertionsDisabled && numConnections != 1) {
                        throw new AssertionError("A should have 1 connection, but has " + numConnections + ": " + create);
                    }
                    if (!$assertionsDisabled && numConnections2 != 1) {
                        throw new AssertionError("B should have 1 connection, but has " + numConnections2 + ": " + create2);
                    }
                    Util.close(create2, create);
                    waitForOpenConns(0, create, create2);
                    if (create2 != null) {
                        create2.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    public void testAsyncConnectThenSend() throws Exception {
        NioServer nioServer = (NioServer) create(true, 0);
        try {
            NioServer nioServer2 = (NioServer) create(true, 0);
            try {
                nioServer.start();
                nioServer2.start();
                Address localAddress = nioServer2.localAddress();
                MyReceiver myReceiver = new MyReceiver(nioServer2, 2L, false);
                nioServer2.receiver(myReceiver);
                byte[] bytes = "hello world".getBytes();
                send(bytes, nioServer, localAddress);
                send(bytes, nioServer, localAddress);
                myReceiver.waitForCompletion(20000L);
                if (!$assertionsDisabled && myReceiver.getNumReceived() != 2) {
                    throw new AssertionError();
                }
                if (nioServer2 != null) {
                    nioServer2.close();
                }
                if (nioServer != null) {
                    nioServer.close();
                }
            } catch (Throwable th) {
                if (nioServer2 != null) {
                    try {
                        nioServer2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (nioServer != null) {
                try {
                    nioServer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    protected static void send(byte[] bArr, BaseServer baseServer, Address address) {
        byte[] bArr2 = new byte[bArr.length + 4];
        Bits.writeInt(bArr.length, bArr2, 0);
        System.arraycopy(bArr, 0, bArr2, 4, bArr.length);
        try {
            baseServer.send(address, bArr2, 0, bArr2.length);
        } catch (Exception e) {
            System.err.println("Failed sending a request to " + address + ": " + e);
        }
    }

    static void log(String str) {
        System.out.println("-- [" + Thread.currentThread() + "]: " + str);
    }

    protected static BaseServer create(boolean z, int i) {
        try {
            BaseServer maxSendBuffers = z ? new NioServer(bind_addr, i).maxSendBuffers(1024) : new TcpServer(bind_addr, i);
            maxSendBuffers.usePeerConnections(true);
            maxSendBuffers.start();
            return maxSendBuffers;
        } catch (Exception e) {
            return null;
        }
    }

    protected static void waitForOpenConns(int i, BaseServer... baseServerArr) throws Exception {
        for (int i2 = 0; i2 < 20; i2++) {
            if (!Arrays.stream(baseServerArr).allMatch(baseServer -> {
                return baseServer.getNumOpenConnections() == i;
            })) {
                Util.sleep(1000L);
            }
        }
        if (!Arrays.stream(baseServerArr).allMatch(baseServer2 -> {
            return baseServer2.getNumOpenConnections() == i;
        })) {
            throw new Exception(String.format("expected connections: %d, actual:\n%s\n", Integer.valueOf(i), Stream.of((Object[]) baseServerArr).map(baseServer3 -> {
                return String.format("%s: %s", Integer.valueOf(baseServer3.getNumOpenConnections()), baseServer3.printConnections());
            }).collect(Collectors.joining("\n"))));
        }
    }

    protected static void connectionEstablished(BaseServer baseServer, Address address) {
        for (int i = 0; i < 10 && !baseServer.connectionEstablishedTo(address); i++) {
            Util.sleep(500L);
        }
        if (!$assertionsDisabled && !baseServer.connectionEstablishedTo(address)) {
            throw new AssertionError();
        }
    }

    static {
        $assertionsDisabled = !ServerUnitTest.class.desiredAssertionStatus();
        try {
            bind_addr = Util.getLoopback();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}
