package org.jgroups.protocols;

import java.util.Iterator;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/protocols/UNICAST3_Test.class */
public class UNICAST3_Test {
    protected JChannel a;
    protected JChannel b;
    protected Address a_addr;
    protected Address b_addr;
    protected UNICAST3 uni_a;
    protected UNICAST3 uni_b;
    protected static final short UNICAST3_ID;
    protected static final int CONN_CLOSE_TIMEOUT = 60000;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected MyReceiver receiver = new MyReceiver();
    protected DropUnicastAck drop_ack = new DropUnicastAck(499);

    /* loaded from: input_file:org/jgroups/protocols/UNICAST3_Test$DropUnicastAck.class */
    protected static class DropUnicastAck extends Protocol {
        protected final short start_drop_ack;
        protected volatile boolean discarding;

        public DropUnicastAck(short s) {
            this.start_drop_ack = s;
        }

        @Override // org.jgroups.stack.Protocol
        public Object down(Message message) {
            UnicastHeader3 unicastHeader3 = (UnicastHeader3) message.getHeader(UNICAST3_Test.UNICAST3_ID);
            if (unicastHeader3 != null && unicastHeader3.type() == 1 && unicastHeader3.seqno() == this.start_drop_ack) {
                this.discarding = true;
                unicastHeader3.seqno = this.start_drop_ack - 1;
            }
            return this.down_prot.down(message);
        }

        @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
        public Object up(Message message) {
            if (this.discarding) {
                return null;
            }
            return this.up_prot.up(message);
        }

        @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
        public void up(MessageBatch messageBatch) {
            if (this.discarding) {
                return;
            }
            this.up_prot.up(messageBatch);
        }
    }

    /* loaded from: input_file:org/jgroups/protocols/UNICAST3_Test$MyReceiver.class */
    protected static class MyReceiver implements Receiver {
        protected int count = 0;

        protected MyReceiver() {
        }

        @Override // org.jgroups.Receiver
        public void receive(Message message) {
            this.count++;
        }

        @Override // org.jgroups.Receiver
        public void receive(MessageBatch messageBatch) {
            Iterator<Message> it = messageBatch.iterator();
            while (it.hasNext()) {
                it.next();
                this.count++;
            }
        }
    }

    @BeforeMethod
    protected void setup() throws Exception {
        this.a = create("A").connect(getClass().getSimpleName());
        this.b = create("B").connect(getClass().getSimpleName());
        this.a_addr = this.a.getAddress();
        this.b_addr = this.b.getAddress();
        this.uni_a = (UNICAST3) this.a.getProtocolStack().findProtocol(UNICAST3.class);
        this.uni_b = (UNICAST3) this.b.getProtocolStack().findProtocol(UNICAST3.class);
        this.b.getProtocolStack().insertProtocol(this.drop_ack, ProtocolStack.Position.BELOW, UNICAST3.class);
    }

    @AfterMethod
    protected void destroy() {
        Util.close(this.b, this.a);
    }

    public void testDuplicateMessageDelivery() throws Exception {
        this.b.setReceiver(this.receiver);
        for (int i = 1; i < 500; i++) {
            this.a.send(this.b_addr, Integer.valueOf(i));
        }
        for (int i2 = 0; i2 < 10 && this.receiver.count < 499; i2++) {
            Util.sleep(50L);
        }
        System.out.printf("B: received %d messages from A\n", Integer.valueOf(this.receiver.count));
        if (!$assertionsDisabled && this.receiver.count != 499) {
            throw new AssertionError();
        }
        System.out.printf("-- closing the receive-window for %s:\n", this.a_addr);
        this.uni_b.closeReceiveConnection(this.a_addr);
        this.uni_a.setLevel("trace");
        this.uni_b.setLevel("trace");
        this.uni_b.setConnCloseTimeout(60000L);
        for (int i3 = 0; i3 < 10 && this.uni_b.getNumReceiveConnections() != 0; i3++) {
            Util.sleep(500L);
        }
        System.out.printf("-- removing the %s protocol\n", DropUnicastAck.class.getSimpleName());
        this.b.getProtocolStack().removeProtocol(DropUnicastAck.class);
        for (int i4 = 0; i4 < 20 && this.receiver.count < 500; i4++) {
            Util.sleep(100L);
        }
        System.out.printf("B: received %d messages from A\n", Integer.valueOf(this.receiver.count));
        if (!$assertionsDisabled && this.receiver.count != 499) {
            throw new AssertionError(String.format("received %d messages, but should only have received 499", Integer.valueOf(this.receiver.count)));
        }
    }

    protected static JChannel create(String str) throws Exception {
        return new JChannel(new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), new UNICAST3()).name(str);
    }

    static {
        $assertionsDisabled = !UNICAST3_Test.class.desiredAssertionStatus();
        UNICAST3_ID = ClassConfigurator.getProtocolId(UNICAST3.class);
    }
}
