package org.jgroups.tests;

import java.util.Arrays;
import java.util.Properties;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.TCP_NIO2;
import org.jgroups.protocols.TP;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.STACK_DEPENDENT}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/tests/DiscardTest.class */
public class DiscardTest extends ChannelTestBase {
    JChannel a;
    JChannel b;
    static final long NUM_MSGS = 10000;
    static final int MSG_SIZE = 1000;
    private static final String GROUP = "DiscardTest";
    final Promise<Long> ch1_all_received = new Promise<>();
    final Promise<Long> ch2_all_received = new Promise<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jgroups/tests/DiscardTest$MyReceiver.class */
    public static class MyReceiver implements Receiver {
        final Promise<Long> p;
        final long num_msgs_expected;
        String channel_name;
        long num_msgs = 0;
        boolean operational = true;

        public MyReceiver(Promise<Long> promise, long j, String str) {
            this.p = promise;
            this.num_msgs_expected = j;
            this.channel_name = str;
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            if (this.operational) {
                this.num_msgs++;
                if (this.num_msgs > 0 && this.num_msgs % 1000 == 0) {
                    System.out.printf("-- received %d on %s\n", Long.valueOf(this.num_msgs), this.channel_name);
                }
                if (this.num_msgs >= this.num_msgs_expected) {
                    System.out.printf("SUCCESS: received all %d messages on %s\n", Long.valueOf(this.num_msgs_expected), this.channel_name);
                    this.operational = false;
                    this.p.setResult(Long.valueOf(this.num_msgs));
                }
            }
        }

        @Override // org.jgroups.Receiver
        public void viewAccepted(View view) {
            System.out.printf("-- view (%s): %s\n", this.channel_name, view);
        }
    }

    @BeforeMethod
    protected void setUp() throws Exception {
        this.ch1_all_received.reset();
        this.ch2_all_received.reset();
    }

    @AfterMethod
    protected void tearDown() throws Exception {
        TP transport = this.a.getProtocolStack().getTransport();
        TP transport2 = this.b.getProtocolStack().getTransport();
        if (transport instanceof TCP_NIO2) {
            System.out.printf("partial writes in A: %d, partial writes in B: %d\n", Integer.valueOf(((TCP_NIO2) transport).numPartialWrites()), Integer.valueOf(((TCP_NIO2) transport2).numPartialWrites()));
        }
        Util.close(this.b, this.a);
    }

    public void testDiscardProperties() throws Exception {
        _testLosslessReception(true);
    }

    public void testFastProperties() throws Exception {
        _testLosslessReception(false);
    }

    private void _testLosslessReception(boolean z) throws Exception {
        this.a = createChannel().name("A");
        this.a.setReceiver(new MyReceiver(this.ch1_all_received, NUM_MSGS, "A"));
        this.b = createChannel().name("B");
        this.b.setReceiver(new MyReceiver(this.ch2_all_received, NUM_MSGS, "B"));
        makeUnique(this.a, this.b);
        this.a.connect(GROUP);
        this.b.connect(GROUP);
        Util.waitUntilAllChannelsHaveSameView(NUM_MSGS, 500L, this.a, this.b);
        if (z) {
            DISCARD discard = new DISCARD();
            new Properties().setProperty("down", "0.1");
            this.a.getProtocolStack().insertProtocol(discard, ProtocolStack.Position.BELOW, MERGE3.class);
            DISCARD discard2 = new DISCARD();
            new Properties().setProperty("down", "0.1");
            this.b.getProtocolStack().insertProtocol(discard2, ProtocolStack.Position.BELOW, MERGE3.class);
        }
        System.out.printf("sending %d %d-byte messages to all members (including myself)\n", Long.valueOf(NUM_MSGS), 1000);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < NUM_MSGS; i++) {
            this.a.send(createMessage(1000));
            if (i % 1000 == 0) {
                System.out.println("-- sent " + i + " messages");
            }
        }
        System.out.println("-- waiting for ch1 and ch2 to receive 10000 messages");
        System.out.println("-- received " + this.ch1_all_received.getResultWithTimeout(NUM_MSGS) + " messages on ch1");
        Long resultWithTimeout = this.ch2_all_received.getResultWithTimeout(NUM_MSGS);
        long currentTimeMillis2 = System.currentTimeMillis();
        System.out.println("-- received " + resultWithTimeout + " messages on ch2");
        long j = currentTimeMillis2 - currentTimeMillis;
        System.out.printf("== Sent and received %d in %d ms, %.2f msgs/sec\n", Long.valueOf(NUM_MSGS), Long.valueOf(j), Double.valueOf(10000.0d / (j / 1000.0d)));
    }

    private static Message createMessage(int i) {
        byte[] bArr = new byte[i];
        Arrays.fill(bArr, (byte) 120);
        return new BytesMessage((Address) null, bArr);
    }
}
