package org.jgroups.protocols;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.NullAddress;
import org.jgroups.View;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.logging.Log;
import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.Util;

/* loaded from: input_file:BOOT-INF/lib/jgroups-5.2.10.Final.jar:org/jgroups/protocols/PerDestinationBundler.class */
public class PerDestinationBundler implements Bundler {

    @ManagedAttribute(description = "Number of messages sent in BatchMessages", type = AttributeType.SCALAR)
    protected long num_msgs_sent;

    @ManagedAttribute(description = "Number of BatchMessages sent", type = AttributeType.SCALAR)
    protected long num_batches_sent;

    @ManagedAttribute(description = "Number of BatchMessages sent because the queue was full", type = AttributeType.SCALAR)
    protected long num_batches_sent_due_to_max_size;

    @ManagedAttribute(description = "Number of BatchMessages sent because the max number of messages has been reached (max_queue_size)", type = AttributeType.SCALAR)
    protected long num_batches_sent_due_to_full_queue;

    @ManagedAttribute(description = "Number of MessageBatches sent because the last sender thread returned", type = AttributeType.SCALAR)
    protected long num_batches_sent_due_to_last_thread;
    protected TP transport;
    protected Log log;
    protected Address local_addr;
    protected static final Address NULL = new NullAddress();

    @Property(name = "max_size", type = AttributeType.BYTES, description = "Maximum number of bytes for messages to be queued until they are sent")
    protected int max_size = 64000;

    @Property(description = "The maximum number of queued messages per destination. When the queue is full, a new batch will be sent")
    public int max_queue_size = 128;
    protected final Map<Address, SendBuffer> dests = Util.createConcurrentMap();

    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.2.10.Final.jar:org/jgroups/protocols/PerDestinationBundler$SendBuffer.class */
    protected class SendBuffer {
        private final Message[] msgs;
        private long total_bytes;
        private final ByteArrayDataOutputStream output;
        private final AtomicInteger thread_count = new AtomicInteger();
        private final Lock lock = new ReentrantLock(false);
        private int index = 0;

        protected SendBuffer() {
            this.msgs = new Message[PerDestinationBundler.this.max_queue_size];
            this.output = new ByteArrayDataOutputStream(PerDestinationBundler.this.max_size + 3);
        }

        protected void addMessage(Address address, Message message) {
            int length = message.getLength();
            this.thread_count.incrementAndGet();
            this.lock.lock();
            try {
                if (this.total_bytes + length >= PerDestinationBundler.this.max_size) {
                    PerDestinationBundler.this.num_batches_sent_due_to_max_size++;
                    sendBatch(address);
                }
                Message[] messageArr = this.msgs;
                int i = this.index;
                this.index = i + 1;
                messageArr[i] = message;
                this.total_bytes += length;
                if (this.index == this.msgs.length) {
                    PerDestinationBundler.this.num_batches_sent_due_to_full_queue++;
                    sendBatch(address);
                }
                if (this.thread_count.decrementAndGet() == 0) {
                    PerDestinationBundler.this.num_batches_sent_due_to_last_thread++;
                    sendBatch(address);
                }
            } finally {
                this.lock.unlock();
            }
        }

        protected void sendBatch(Address address) {
            if (this.index == 0) {
                return;
            }
            Address address2 = address instanceof NullAddress ? null : address;
            if (this.index == 1) {
                sendSingleMessage(address2, this.msgs[0]);
                this.msgs[0] = null;
                this.index = 0;
                this.total_bytes = 0L;
                PerDestinationBundler.this.num_msgs_sent++;
                return;
            }
            sendMessageList(address2, PerDestinationBundler.this.local_addr, this.msgs, this.index);
            PerDestinationBundler.this.num_msgs_sent += this.index;
            PerDestinationBundler.this.num_batches_sent++;
            this.index = 0;
            this.total_bytes = 0L;
        }

        protected void sendSingleMessage(Address address, Message message) {
            try {
                this.output.position(0);
                Util.writeMessage(message, this.output, address == null);
                PerDestinationBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), address);
                if (PerDestinationBundler.this.transport.statsEnabled()) {
                    PerDestinationBundler.this.transport.getMessageStats().incrNumSingleMsgsSent(1);
                }
            } catch (Throwable th) {
                PerDestinationBundler.this.log.error("%s: failed sending message: %s", PerDestinationBundler.this.local_addr, th);
            }
        }

        protected void sendMessageList(Address address, Address address2, Message[] messageArr, int i) {
            try {
                this.output.position(0);
                Util.writeMessageList(address, address2, PerDestinationBundler.this.transport.cluster_name.chars(), messageArr, 0, i, this.output, address == null, PerDestinationBundler.this.transport.getId());
                PerDestinationBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), address);
                if (PerDestinationBundler.this.transport.statsEnabled()) {
                    PerDestinationBundler.this.transport.getMessageStats().incrNumBatchesSent(1);
                }
            } catch (Throwable th) {
                PerDestinationBundler.this.log.trace(Util.getMessage("FailureSendingMsgBundle"), PerDestinationBundler.this.transport.getAddress(), th);
            }
        }

        protected int size() {
            this.lock.lock();
            try {
                return this.index;
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // org.jgroups.protocols.Bundler
    public int size() {
        return ((Integer) this.dests.values().stream().map((v0) -> {
            return v0.size();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    @Override // org.jgroups.protocols.Bundler
    public int getQueueSize() {
        return -1;
    }

    @Override // org.jgroups.protocols.Bundler
    public int getMaxSize() {
        return this.max_size;
    }

    @Override // org.jgroups.protocols.Bundler
    public Bundler setMaxSize(int i) {
        this.max_size = i;
        return this;
    }

    public int getMaxQueueSize() {
        return this.max_queue_size;
    }

    public Bundler setMaxQueueSize(int i) {
        this.max_queue_size = i;
        return this;
    }

    @ManagedAttribute(description = "Average number of messages in an BatchMessage")
    public double avgBatchSize() {
        if (this.num_batches_sent == 0 || this.num_msgs_sent == 0) {
            return 0.0d;
        }
        return this.num_msgs_sent / this.num_batches_sent;
    }

    @Override // org.jgroups.protocols.Bundler
    public void resetStats() {
        this.num_msgs_sent = 0L;
        this.num_batches_sent = 0L;
        this.num_batches_sent_due_to_max_size = 0L;
        this.num_batches_sent_due_to_full_queue = 0L;
        this.num_batches_sent_due_to_last_thread = 0L;
    }

    @Override // org.jgroups.protocols.Bundler
    public void init(TP tp) {
        this.transport = (TP) Objects.requireNonNull(tp);
        this.log = tp.getLog();
    }

    @Override // org.jgroups.protocols.Bundler
    public void start() {
        this.local_addr = (Address) Objects.requireNonNull(this.transport.getAddress());
    }

    @Override // org.jgroups.protocols.Bundler
    public void stop() {
    }

    @Override // org.jgroups.protocols.Bundler
    public void send(Message message) throws Exception {
        if (message.getSrc() == null) {
            message.setSrc(this.local_addr);
        }
        Address dest = message.dest() == null ? NULL : message.dest();
        this.dests.computeIfAbsent(dest, address -> {
            return new SendBuffer();
        }).addMessage(dest, message);
    }

    @Override // org.jgroups.protocols.Bundler
    public void viewChange(View view) {
        List<Address> members = view.getMembers();
        if (members == null) {
            return;
        }
        members.stream().filter(address -> {
            return !this.dests.containsKey(address);
        }).forEach(address2 -> {
            this.dests.putIfAbsent(address2, new SendBuffer());
        });
        Stream<Address> filter = this.dests.keySet().stream().filter(address3 -> {
            return (members.contains(address3) || (address3 instanceof NullAddress)) ? false : true;
        });
        Map<Address, SendBuffer> map = this.dests;
        Objects.requireNonNull(map);
        filter.forEach((v1) -> {
            r1.remove(v1);
        });
    }
}
