package org.jgroups.protocols;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.Message;
import org.jgroups.PhysicalAddress;
import org.jgroups.View;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.NakAckHeader2;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.Table;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.TimeScheduler3;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL})
/* loaded from: input_file:org/jgroups/protocols/NAKACK_Delivery_Test.class */
public class NAKACK_Delivery_Test {
    protected NAKACK2 nak;
    protected Address a;
    protected Address b;
    protected MyReceiver receiver = new MyReceiver();
    protected Executor pool;
    protected static final short NAKACK_ID;
    protected static final int NUM_MSGS = 50;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jgroups/protocols/NAKACK_Delivery_Test$MyReceiver.class */
    public static class MyReceiver extends Protocol {
        final ConcurrentMap<Address, Collection<Message>> msgs = new ConcurrentHashMap();

        MyReceiver() {
        }

        public ConcurrentMap<Address, Collection<Message>> getMsgs() {
            return this.msgs;
        }

        public void init(Address... addressArr) {
            for (Address address : addressArr) {
                this.msgs.putIfAbsent(address, new ConcurrentLinkedQueue());
            }
        }

        @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
        public Object up(Message message) {
            Address src = message.getSrc();
            Collection<Message> collection = this.msgs.get(src);
            if (collection == null) {
                collection = new ConcurrentLinkedQueue();
                Collection<Message> putIfAbsent = this.msgs.putIfAbsent(src, collection);
                if (putIfAbsent != null) {
                    collection = putIfAbsent;
                }
            }
            collection.add(message);
            return null;
        }

        @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
        public void up(MessageBatch messageBatch) {
            Address sender = messageBatch.sender();
            Iterator<Message> it = messageBatch.iterator();
            while (it.hasNext()) {
                Message next = it.next();
                Collection<Message> collection = this.msgs.get(sender);
                if (collection == null) {
                    collection = new ConcurrentLinkedQueue();
                    Collection<Message> putIfAbsent = this.msgs.putIfAbsent(sender, collection);
                    if (putIfAbsent != null) {
                        collection = putIfAbsent;
                    }
                }
                collection.add(next);
            }
        }
    }

    /* loaded from: input_file:org/jgroups/protocols/NAKACK_Delivery_Test$Sender.class */
    class Sender implements Runnable {
        final Address sender;
        final long seqno;
        final int number;
        final boolean oob;

        public Sender(Address address, long j, int i, boolean z) {
            this.sender = address;
            this.seqno = j;
            this.number = i;
            this.oob = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            NAKACK_Delivery_Test.this.send(this.sender, this.seqno, this.number, this.oob);
        }
    }

    @BeforeMethod
    protected void setUp() throws Exception {
        this.a = Util.createRandomAddress("A");
        this.b = Util.createRandomAddress("B");
        this.nak = new NAKACK2();
        TP tp = new TP() { // from class: org.jgroups.protocols.NAKACK_Delivery_Test.1
            @Override // org.jgroups.protocols.TP
            public boolean supportsMulticasting() {
                return false;
            }

            @Override // org.jgroups.protocols.TP
            public void sendUnicast(PhysicalAddress physicalAddress, byte[] bArr, int i, int i2) throws Exception {
            }

            @Override // org.jgroups.protocols.TP
            public String getInfo() {
                return null;
            }

            @Override // org.jgroups.protocols.TP, org.jgroups.stack.Protocol
            public Object down(Event event) {
                return null;
            }

            @Override // org.jgroups.protocols.TP, org.jgroups.stack.Protocol
            public Object down(Message message) {
                return null;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.jgroups.protocols.TP
            public PhysicalAddress getPhysicalAddress() {
                return null;
            }

            @Override // org.jgroups.protocols.TP
            public TimeScheduler getTimer() {
                return new TimeScheduler3();
            }
        };
        tp.setId((short) 100);
        this.nak.setDownProtocol(tp);
        this.receiver.init(this.a, this.b);
        this.nak.setUpProtocol(this.receiver);
        this.nak.start();
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(this.a);
        arrayList.add(this.b);
        View view = new View(this.a, 1L, arrayList);
        Protocol protocol = this.nak;
        while (true) {
            Protocol protocol2 = protocol;
            if (protocol2 == null) {
                MutableDigest mutableDigest = new MutableDigest(View.create(this.a, 1L, this.a, this.b).getMembersRaw());
                mutableDigest.set(this.a, 0L, 0L);
                mutableDigest.set(this.b, 0L, 0L);
                this.nak.down(new Event(41, mutableDigest));
                this.nak.down(new Event(6, view));
                this.nak.down(new Event(16));
                this.pool = new ThreadPoolExecutor(1, 100, 1000L, TimeUnit.MILLISECONDS, new SynchronousQueue());
                ((ThreadPoolExecutor) this.pool).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                return;
            }
            protocol2.setAddress(this.a);
            protocol = protocol2.getDownProtocol();
        }
    }

    @AfterMethod
    protected void tearDown() {
        this.nak.stop();
        if (this.pool instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) this.pool).shutdownNow();
        }
    }

    public void testSendingOfRandomMessages() {
        List<Integer> generateRandomNumbers = generateRandomNumbers(1, NUM_MSGS);
        generateRandomNumbers.addAll(generateRandomNumbers(1, NUM_MSGS));
        generateRandomNumbers.addAll(generateRandomNumbers(Math.min(15, 25), 25));
        generateRandomNumbers.addAll(generateRandomNumbers(2, NUM_MSGS));
        generateRandomNumbers.addAll(generateRandomNumbers(5, Math.max(5, 40)));
        System.out.println("sending " + generateRandomNumbers.size() + " msgs (including duplicates); size excluding duplicates=" + new HashSet(generateRandomNumbers).size());
        Table<Message> window = this.nak.getWindow(this.a);
        for (int i = 1; i <= NUM_MSGS; i++) {
            window.add(i, (long) msg(this.a, i, i, true));
        }
        Iterator<Integer> it = generateRandomNumbers.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            boolean z = Util.tossWeightedCoin(0.5d);
            this.pool.execute(new Sender(this.b, intValue, intValue, z));
            this.pool.execute(new Sender(this.a, intValue, intValue, z));
        }
        ConcurrentMap<Address, Collection<Message>> msgs = this.receiver.getMsgs();
        Collection<Message> collection = msgs.get(this.a);
        Collection<Message> collection2 = msgs.get(this.b);
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            int size = collection.size();
            int size2 = collection2.size();
            System.out.println("size A = " + size + ", size B=" + size2);
            if (size == NUM_MSGS && size2 == NUM_MSGS) {
                break;
            } else {
                Util.sleep(1000L);
            }
        }
        System.out.println("A received " + collection.size() + " messages (expected=50)");
        System.out.println("B received " + collection2.size() + " messages (expected=50)");
        if (!$assertionsDisabled && collection.size() != NUM_MSGS) {
            throw new AssertionError("[A] expected 50 messages, but got " + collection.size());
        }
        if (!$assertionsDisabled && collection2.size() != NUM_MSGS) {
            throw new AssertionError("[B] expected 50 messages, but got " + collection2.size());
        }
    }

    public void testBatchDeliveredWithTrace() {
        doBatchDeliverTest(true);
    }

    public void testBatchDeliveredWithoutTrace() {
        doBatchDeliverTest(false);
    }

    private void doBatchDeliverTest(boolean z) {
        try {
            this.nak.isTrace(z);
            this.receiver.getMsgs().get(this.b).clear();
            if (!$assertionsDisabled && !this.receiver.getMsgs().get(this.b).isEmpty()) {
                throw new AssertionError();
            }
            MessageBatch dest = new MessageBatch(2).setMode(MessageBatch.Mode.OOB).setSender(this.b).setDest(null);
            dest.add(new EmptyMessage().setFlag(Message.Flag.NO_RELIABILITY, Message.Flag.OOB).src(this.b));
            dest.add(msg(this.b, 1L, 1, true));
            this.nak.up(dest);
            System.out.println(this.receiver.getMsgs().get(this.b));
            if (!$assertionsDisabled && this.receiver.getMsgs().get(this.b).size() != 2) {
                throw new AssertionError();
            }
            this.receiver.getMsgs().get(this.b).clear();
            if (!$assertionsDisabled && !this.receiver.getMsgs().get(this.b).isEmpty()) {
                throw new AssertionError();
            }
            MessageBatch dest2 = new MessageBatch(2).setMode(MessageBatch.Mode.OOB).setSender(this.b).setDest(null);
            dest2.add(msg(this.b, 2L, 1, true));
            dest2.add(new EmptyMessage().setFlag(Message.Flag.NO_RELIABILITY, Message.Flag.OOB).src(this.b));
            this.nak.up(dest2);
            System.out.println(this.receiver.getMsgs().get(this.b));
            if (!$assertionsDisabled && this.receiver.getMsgs().get(this.b).size() != 2) {
                throw new AssertionError();
            }
        } finally {
            this.nak.isTrace(false);
        }
    }

    private static List<Integer> generateRandomNumbers(int i, int i2) {
        ArrayList arrayList = new ArrayList(20);
        for (int i3 = i; i3 <= i2; i3++) {
            arrayList.add(Integer.valueOf(i3));
        }
        Collections.shuffle(arrayList);
        return arrayList;
    }

    private void send(Address address, long j, int i, boolean z) {
        if (!$assertionsDisabled && address == null) {
            throw new AssertionError();
        }
        this.nak.up(msg(address, j, i, z));
    }

    private static Message msg(Address address, long j, int i, boolean z) {
        Message src = new BytesMessage((Address) null, Integer.valueOf(i)).setSrc(address);
        if (z) {
            src.setFlag(Message.Flag.OOB);
        }
        if (j != -1) {
            src.putHeader(NAKACK_ID, NakAckHeader2.createMessageHeader(j));
        }
        return src;
    }

    static {
        $assertionsDisabled = !NAKACK_Delivery_Test.class.desiredAssertionStatus();
        NAKACK_ID = ClassConfigurator.getProtocolId(NAKACK2.class);
    }
}
