package org.jgroups.protocols;

import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {Global.FUNCTIONAL}, singleThreaded = true)
/* loaded from: input_file:org/jgroups/protocols/RED_Test.class */
public class RED_Test {
    protected JChannel ch;
    protected DelayBundler bundler;
    protected RED red;
    protected TP transport;
    protected static final Address TARGET;
    protected static final int NUM_SENDERS = 10;
    protected static final int NUM_MSGS = 2000;
    protected static final int TOT_MSGS = 20000;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jgroups/protocols/RED_Test$DelayBundler.class */
    public static class DelayBundler extends TransferQueueBundler {
        protected final LongAdder sent = new LongAdder();
        protected final LongAdder single = new LongAdder();
        protected final LongAdder batches = new LongAdder();
        protected final AverageMinMax avg_batch_size = new AverageMinMax();

        protected DelayBundler() {
        }

        protected long getSentMessages() {
            return this.sent.sum();
        }

        protected long getSingle() {
            return this.single.sum();
        }

        protected long getBatches() {
            return this.batches.sum();
        }

        protected double getAvgBatchSize() {
            return this.avg_batch_size.getAverage();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.jgroups.protocols.BaseBundler
        public void sendSingleMessage(Message message) {
            this.sent.increment();
            this.single.increment();
            Util.sleepRandom(5L, 100L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.jgroups.protocols.BaseBundler
        public void sendMessageList(Address address, Address address2, List<Message> list) {
            if (list != null) {
                int size = list.size();
                this.batches.increment();
                this.sent.add(size);
                this.avg_batch_size.add(size);
            }
            Util.sleepRandom(2L, 100L);
        }

        public String toString() {
            return String.format("sent=%d (single=%d, batches=%d) avg-batch=%s", Long.valueOf(getSentMessages()), Long.valueOf(getSingle()), Long.valueOf(getBatches()), this.avg_batch_size);
        }
    }

    @BeforeMethod
    protected void setup() throws Exception {
        this.ch = create("A").connect(RED_Test.class.getSimpleName());
    }

    @AfterMethod
    protected void destroy() {
        Util.close(this.ch);
    }

    public void testNoMessageDrops() throws Exception {
        for (int i = 1; i <= 10; i++) {
            this.ch.send(TARGET, Integer.valueOf(i));
        }
        System.out.printf("red: %s\nbundler: %s\n", this.red, this.bundler);
        Util.waitUntil(10000L, 500L, () -> {
            return this.bundler.getSentMessages() + this.red.getDroppedMessages() >= 10;
        }, () -> {
            return String.format("sent msgs (%d) and dropped msgs (%d) need to be >= %d", Long.valueOf(this.bundler.getSentMessages()), Long.valueOf(this.red.getDroppedMessages()), 10);
        });
    }

    public void testMessageDrops() throws TimeoutException {
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = new Thread(() -> {
                long currentTimeMillis = System.currentTimeMillis();
                for (int i2 = 0; i2 < NUM_MSGS; i2++) {
                    try {
                        JChannel jChannel = this.ch;
                        jChannel.send(TARGET, Thread.currentThread().getId() + "-" + jChannel);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                System.out.printf("%s: sent %d messages in %d ms\n", Thread.currentThread(), Integer.valueOf(NUM_MSGS), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            });
        }
        ((Stream) Stream.of((Object[]) threadArr).parallel()).forEach((v0) -> {
            v0.start();
        });
        Stream.of((Object[]) threadArr).forEach(thread -> {
            try {
                thread.join(30000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        if (!$assertionsDisabled && !Stream.of((Object[]) threadArr).noneMatch((v0) -> {
            return v0.isAlive();
        })) {
            throw new AssertionError();
        }
        Util.waitUntil(10000L, 500L, () -> {
            return this.bundler.getSentMessages() + this.red.getDroppedMessages() >= 20000;
        }, () -> {
            return String.format("sent msgs (%d) and dropped msgs (%d) need to be >= %d", Long.valueOf(this.bundler.getSentMessages()), Long.valueOf(this.red.getDroppedMessages()), Integer.valueOf(TOT_MSGS));
        });
        System.out.printf("red: %s\nbundler: %s\n", this.red, this.bundler);
        if (!$assertionsDisabled && this.red.getDroppedMessages() <= 0) {
            throw new AssertionError();
        }
    }

    protected JChannel create(String str) throws Exception {
        JChannel name = new JChannel(Util.getTestStack(new Protocol[0])).name(str);
        this.red = new RED();
        this.transport = name.getProtocolStack().getTransport();
        ((BaseBundler) this.transport.getBundler()).setCapacity(1024);
        this.transport.getProtocolStack().removeProtocol(UNICAST3.class);
        name.getProtocolStack().insertProtocolInStack(this.red, this.transport, ProtocolStack.Position.ABOVE);
        this.bundler = new DelayBundler();
        this.bundler.init(this.transport);
        this.transport.setBundler(this.bundler);
        ((GMS) name.getProtocolStack().findProtocol(GMS.class)).setJoinTimeout(5L);
        return name;
    }

    static {
        $assertionsDisabled = !RED_Test.class.desiredAssertionStatus();
        TARGET = Util.createRandomAddress("B");
    }
}
