package org.jgroups.protocols;

import java.util.Objects;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import org.apache.sshd.common.util.io.IoUtils;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.annotations.Property;
import org.jgroups.logging.Log;
import org.jgroups.util.RingBuffer;
import org.jgroups.util.Runner;
import org.jgroups.util.Util;

/* loaded from: input_file:org/jgroups/protocols/RingBufferBundler.class */
public class RingBufferBundler extends BaseBundler {
    protected RingBuffer<Message> rb;
    protected Runner bundler_thread;

    @Property(description = "Number of spins before a real lock is acquired")
    protected int num_spins;
    protected static final String THREAD_NAME = "RingBufferBundler";
    protected BiConsumer<Integer, Integer> wait_strategy;
    protected final Runnable run_function;
    protected static final BiConsumer<Integer, Integer> SPIN = (num, num2) -> {
    };
    protected static final BiConsumer<Integer, Integer> YIELD = (num, num2) -> {
        Thread.yield();
    };
    protected static final BiConsumer<Integer, Integer> PARK = (num, num2) -> {
        LockSupport.parkNanos(1L);
    };
    protected static final BiConsumer<Integer, Integer> SPIN_PARK = (num, num2) -> {
        if (num.intValue() < num2.intValue() / 10) {
            return;
        }
        LockSupport.parkNanos(1L);
    };
    protected static final BiConsumer<Integer, Integer> SPIN_YIELD = (num, num2) -> {
        if (num.intValue() < num2.intValue() / 10) {
            return;
        }
        Thread.yield();
    };

    public RingBufferBundler() {
        this.num_spins = 40;
        this.wait_strategy = SPIN_PARK;
        this.run_function = this::readMessages;
    }

    protected RingBufferBundler(RingBuffer<Message> ringBuffer) {
        this.num_spins = 40;
        this.wait_strategy = SPIN_PARK;
        this.run_function = this::readMessages;
        this.rb = ringBuffer;
        this.capacity = ringBuffer.capacity();
    }

    public RingBufferBundler(int i) {
        this((RingBuffer<Message>) new RingBuffer(Message.class, assertPositive(i, "bundler capacity cannot be " + i)));
    }

    public RingBuffer<Message> buf() {
        return this.rb;
    }

    public Thread getThread() {
        return this.bundler_thread.getThread();
    }

    @Override // org.jgroups.protocols.BaseBundler, org.jgroups.protocols.Bundler
    public int size() {
        return this.rb.size();
    }

    @Override // org.jgroups.protocols.BaseBundler, org.jgroups.protocols.Bundler
    public int getQueueSize() {
        return this.rb.size();
    }

    public int numSpins() {
        return this.num_spins;
    }

    public RingBufferBundler numSpins(int i) {
        this.num_spins = i;
        return this;
    }

    @Property(description = "The wait strategy: spin, yield, park, spin-park, spin-yield", writable = false)
    public String waitStrategy() {
        return print(this.wait_strategy);
    }

    @Property
    public RingBufferBundler waitStrategy(String str) {
        this.wait_strategy = createWaitStrategy(str, YIELD);
        return this;
    }

    @Override // org.jgroups.protocols.BaseBundler, org.jgroups.protocols.Bundler
    public void init(TP tp) {
        super.init(tp);
        if (this.rb == null) {
            this.rb = new RingBuffer<>(Message.class, assertPositive(this.capacity, "bundler capacity cannot be " + this.capacity));
            this.capacity = this.rb.capacity();
        }
        this.bundler_thread = new Runner(tp.getThreadFactory(), THREAD_NAME, this.run_function, () -> {
            this.rb.clear();
        });
    }

    @Override // org.jgroups.protocols.BaseBundler, org.jgroups.protocols.Bundler
    public void start() {
        this.bundler_thread.start();
    }

    @Override // org.jgroups.protocols.BaseBundler, org.jgroups.protocols.Bundler
    public void stop() {
        this.bundler_thread.stop();
    }

    @Override // org.jgroups.protocols.BaseBundler, org.jgroups.protocols.Bundler
    public void send(Message message) throws Exception {
        this.rb.put(message);
    }

    public void sendBundledMessages(Message[] messageArr, int i, int i2) {
        byte[] chars = this.transport.cluster_name.chars();
        int i3 = i;
        int index = index((i3 + i2) - 1);
        while (true) {
            Message message = messageArr[i3];
            if (message != null) {
                Address dest = message.getDest();
                try {
                    this.output.position(0);
                    Util.writeMessageListHeader(dest, message.getSrc(), chars, 1, this.output, dest == null);
                    int position = this.output.position() - 4;
                    int marshalMessagesToSameDestination = marshalMessagesToSameDestination(dest, messageArr, i3, index, this.max_size);
                    if (marshalMessagesToSameDestination > 1) {
                        int position2 = this.output.position();
                        this.output.position(position);
                        this.output.writeInt(marshalMessagesToSameDestination);
                        this.output.position(position2);
                    }
                    this.transport.doSend(this.output.buffer(), 0, this.output.position(), dest);
                    if (this.transport.statsEnabled()) {
                        this.transport.getMessageStats().incrNumBatchesSent(marshalMessagesToSameDestination);
                    }
                } catch (Exception e) {
                    Log log = this.log;
                    Object[] objArr = new Object[2];
                    objArr[0] = dest == null ? IoUtils.GROUP_VIEW_ATTR : dest;
                    objArr[1] = e.getMessage();
                    log.trace("failed to send message(s) to %s: %s", objArr);
                }
                if (i3 == index) {
                    return;
                } else {
                    i3 = advance(i3);
                }
            } else if (i3 == index) {
                return;
            } else {
                i3 = advance(i3);
            }
        }
    }

    protected int marshalMessagesToSameDestination(Address address, Message[] messageArr, int i, int i2, int i3) throws Exception {
        int i4 = 0;
        int i5 = 0;
        while (true) {
            Message message = messageArr[i];
            if (message != null && Objects.equals(address, message.getDest())) {
                int size = message.size() + 2;
                if (i5 + size > i3) {
                    break;
                }
                i5 += size;
                i4++;
                messageArr[i] = null;
                this.output.writeShort(message.getType());
                message.writeToNoAddrs(message.getSrc(), this.output, this.transport.getId());
            }
            if (i == i2) {
                break;
            }
            i = advance(i);
        }
        return i4;
    }

    protected void readMessages() {
        try {
            int waitForMessages = this.rb.waitForMessages(this.num_spins, this.wait_strategy);
            sendBundledMessages(this.rb.buf(), this.rb.readIndexLockless(), waitForMessages);
            this.rb.publishReadIndex(waitForMessages);
        } catch (Throwable th) {
        }
    }

    protected final int advance(int i) {
        if (i + 1 == this.capacity) {
            return 0;
        }
        return i + 1;
    }

    protected final int index(int i) {
        return i & (this.capacity - 1);
    }

    protected static String print(BiConsumer<Integer, Integer> biConsumer) {
        if (biConsumer == null) {
            return null;
        }
        return biConsumer == SPIN ? "spin" : biConsumer == YIELD ? "yield" : biConsumer == PARK ? "park" : biConsumer == SPIN_PARK ? "spin-park" : biConsumer == SPIN_YIELD ? "spin-yield" : biConsumer.getClass().getSimpleName();
    }

    protected BiConsumer<Integer, Integer> createWaitStrategy(String str, BiConsumer<Integer, Integer> biConsumer) {
        if (str == null) {
            return biConsumer;
        }
        boolean z = -1;
        switch (str.hashCode()) {
            case -2133590585:
                if (str.equals("spin_park")) {
                    z = 3;
                    break;
                }
                break;
            case -1708261040:
                if (str.equals("spin_yield")) {
                    z = 5;
                    break;
                }
                break;
            case 3433450:
                if (str.equals("park")) {
                    z = 2;
                    break;
                }
                break;
            case 3536962:
                if (str.equals("spin")) {
                    z = false;
                    break;
                }
                break;
            case 114974605:
                if (str.equals("yield")) {
                    z = true;
                    break;
                }
                break;
            case 1155248706:
                if (str.equals("spin-yield")) {
                    z = 6;
                    break;
                }
                break;
            case 2115200661:
                if (str.equals("spin-park")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                BiConsumer<Integer, Integer> biConsumer2 = SPIN;
                this.wait_strategy = biConsumer2;
                return biConsumer2;
            case true:
                BiConsumer<Integer, Integer> biConsumer3 = YIELD;
                this.wait_strategy = biConsumer3;
                return biConsumer3;
            case true:
                BiConsumer<Integer, Integer> biConsumer4 = PARK;
                this.wait_strategy = biConsumer4;
                return biConsumer4;
            case true:
            case true:
                BiConsumer<Integer, Integer> biConsumer5 = SPIN_PARK;
                this.wait_strategy = biConsumer5;
                return biConsumer5;
            case true:
            case true:
                BiConsumer<Integer, Integer> biConsumer6 = SPIN_YIELD;
                this.wait_strategy = biConsumer6;
                return biConsumer6;
            default:
                try {
                    return (BiConsumer) Util.loadClass(str, getClass()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                } catch (Throwable th) {
                    this.log.error("failed creating wait_strategy " + str, th);
                    return biConsumer;
                }
        }
    }

    protected static int assertPositive(int i, String str) {
        if (i <= 0) {
            throw new IllegalArgumentException(str);
        }
        return i;
    }
}
