package org.jgroups.protocols;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.PhysicalAddress;
import org.jgroups.View;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.pbcast.NAKACK;
import org.jgroups.protocols.pbcast.NakAckHeader;
import org.jgroups.stack.NakReceiverWindow;
import org.jgroups.stack.Protocol;
import org.jgroups.util.DefaultTimeScheduler;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL})
/* loaded from: input_file:WEB-INF/lib/jgroups-2.12.0.Alpha1.jar:org/jgroups/protocols/NAKACK_Delivery_Test.class */
public class NAKACK_Delivery_Test {
    private NAKACK nak;
    private Address c1;
    private Address c2;
    static final short NAKACK_ID;
    MyReceiver receiver = new MyReceiver();
    Executor pool;
    static final int NUM_MSGS = 50;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:WEB-INF/lib/jgroups-2.12.0.Alpha1.jar:org/jgroups/protocols/NAKACK_Delivery_Test$MyReceiver.class */
    static class MyReceiver extends Protocol {
        final ConcurrentMap<Address, Collection<Message>> msgs = new ConcurrentHashMap();

        MyReceiver() {
        }

        public ConcurrentMap<Address, Collection<Message>> getMsgs() {
            return this.msgs;
        }

        public void init(Address... addressArr) {
            for (Address address : addressArr) {
                this.msgs.putIfAbsent(address, new ConcurrentLinkedQueue());
            }
        }

        @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
        public Object up(Event event) {
            if (event.getType() != 1) {
                return null;
            }
            Message message = (Message) event.getArg();
            Address src = message.getSrc();
            Collection<Message> collection = this.msgs.get(src);
            if (collection == null) {
                collection = new ConcurrentLinkedQueue();
                Collection<Message> putIfAbsent = this.msgs.putIfAbsent(src, collection);
                if (putIfAbsent != null) {
                    collection = putIfAbsent;
                }
            }
            collection.add(message);
            return null;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jgroups-2.12.0.Alpha1.jar:org/jgroups/protocols/NAKACK_Delivery_Test$Sender.class */
    class Sender implements Runnable {
        final Address sender;
        final long seqno;
        final int number;
        final boolean oob;

        public Sender(Address address, long j, int i, boolean z) {
            this.sender = address;
            this.seqno = j;
            this.number = i;
            this.oob = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            NAKACK_Delivery_Test.this.send(this.sender, this.seqno, this.number, this.oob);
        }
    }

    @BeforeMethod
    protected void setUp() throws Exception {
        this.c1 = Util.createRandomAddress("C1");
        this.c2 = Util.createRandomAddress("C2");
        this.nak = new NAKACK();
        TP tp = new TP() { // from class: org.jgroups.protocols.NAKACK_Delivery_Test.1
            @Override // org.jgroups.protocols.TP
            public boolean supportsMulticasting() {
                return false;
            }

            @Override // org.jgroups.protocols.TP
            public void sendMulticast(byte[] bArr, int i, int i2) throws Exception {
            }

            @Override // org.jgroups.protocols.TP
            public void sendUnicast(PhysicalAddress physicalAddress, byte[] bArr, int i, int i2) throws Exception {
            }

            @Override // org.jgroups.protocols.TP
            public String getInfo() {
                return null;
            }

            @Override // org.jgroups.protocols.TP, org.jgroups.stack.Protocol
            public Object down(Event event) {
                return null;
            }

            @Override // org.jgroups.protocols.TP
            protected PhysicalAddress getPhysicalAddress() {
                return null;
            }

            @Override // org.jgroups.protocols.TP
            public TimeScheduler getTimer() {
                return new DefaultTimeScheduler(1);
            }
        };
        tp.setId((short) 100);
        this.nak.setDownProtocol(tp);
        this.receiver.init(this.c1, this.c2);
        this.nak.setUpProtocol(this.receiver);
        this.nak.start();
        Vector vector = new Vector(2);
        vector.add(this.c1);
        vector.add(this.c2);
        View view = new View(this.c1, 1L, vector);
        this.nak.down(new Event(8, this.c1));
        MutableDigest mutableDigest = new MutableDigest(2);
        mutableDigest.add(this.c1, 0L, 0L, 0L);
        mutableDigest.add(this.c2, 0L, 0L, 0L);
        this.nak.down(new Event(41, mutableDigest));
        this.nak.down(new Event(6, view));
        this.pool = new ThreadPoolExecutor(1, 100, 1000L, TimeUnit.MILLISECONDS, new SynchronousQueue());
        ((ThreadPoolExecutor) this.pool).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @AfterMethod
    protected void tearDown() {
        this.nak.stop();
        if (this.pool instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) this.pool).shutdownNow();
        }
    }

    public void testSendingOfRandomMessages() {
        List<Integer> generateRandomNumbers = generateRandomNumbers(1, 50);
        generateRandomNumbers.addAll(generateRandomNumbers(1, 50));
        generateRandomNumbers.addAll(generateRandomNumbers(Math.min(15, 25), 25));
        generateRandomNumbers.addAll(generateRandomNumbers(2, 50));
        generateRandomNumbers.addAll(generateRandomNumbers(5, Math.max(5, 40)));
        System.out.println("sending " + generateRandomNumbers.size() + " msgs (including duplicates); size excluding duplicates=" + new HashSet(generateRandomNumbers).size());
        NakReceiverWindow window = this.nak.getWindow(this.c1);
        for (int i = 1; i <= 50; i++) {
            window.add(i, msg(this.c1, i, i, true));
        }
        Iterator<Integer> it = generateRandomNumbers.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            boolean z = Util.tossWeightedCoin(0.5d);
            this.pool.execute(new Sender(this.c2, intValue, intValue, z));
            this.pool.execute(new Sender(this.c1, intValue, intValue, z));
        }
        ConcurrentMap<Address, Collection<Message>> msgs = this.receiver.getMsgs();
        Collection<Message> collection = msgs.get(this.c1);
        Collection<Message> collection2 = msgs.get(this.c2);
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            int size = collection.size();
            int size2 = collection2.size();
            System.out.println("size C1 = " + size + ", size C2=" + size2);
            if (size == 50 && size2 == 50) {
                break;
            } else {
                Util.sleep(1000L);
            }
        }
        if (!$assertionsDisabled && collection.size() != 50) {
            throw new AssertionError("[C1] expected 50 messages, but got " + collection.size());
        }
        if (!$assertionsDisabled && collection2.size() != 50) {
            throw new AssertionError("[C2] expected 50 messages, but got " + collection2.size());
        }
    }

    private static List<Integer> generateRandomNumbers(int i, int i2) {
        ArrayList arrayList = new ArrayList(20);
        for (int i3 = i; i3 <= i2; i3++) {
            arrayList.add(Integer.valueOf(i3));
        }
        Collections.shuffle(arrayList);
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(Address address, long j, int i, boolean z) {
        if (!$assertionsDisabled && address == null) {
            throw new AssertionError();
        }
        this.nak.up(new Event(1, msg(address, j, i, z)));
    }

    private static Message msg(Address address, long j, int i, boolean z) {
        Message message = new Message((Address) null, address, Integer.valueOf(i));
        if (z) {
            message.setFlag((byte) 1);
        }
        if (j != -1) {
            message.putHeader(NAKACK_ID, NakAckHeader.createMessageHeader(j));
        }
        return message;
    }

    static {
        $assertionsDisabled = !NAKACK_Delivery_Test.class.desiredAssertionStatus();
        NAKACK_ID = ClassConfigurator.getProtocolId(NAKACK.class);
    }
}
