package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.jgroups.Address;
import org.jgroups.BatchMessage;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.NullAddress;
import org.jgroups.View;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description = "Protocol just below flow control that wraps messages to improve throughput with small messages.")
@Experimental
/* loaded from: input_file:BOOT-INF/lib/jgroups-5.2.10.Final.jar:org/jgroups/protocols/BATCH.class */
public class BATCH extends Protocol {

    @ManagedAttribute(description = "Number of messages sent in BatchMessages", type = AttributeType.SCALAR)
    protected long num_msgs_sent;

    @ManagedAttribute(description = "Number of BatchMessages sent", type = AttributeType.SCALAR)
    protected long num_ebs_sent;

    @ManagedAttribute(description = "Number of BatchMessages sent because the queue was full", type = AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_full_queue;

    @ManagedAttribute(description = "Number of BatchMessages sent because the max number of messages has been reached (max_batch_size)", type = AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_max_number_of_msgs;

    @ManagedAttribute(description = "Number of BatchMessages sent because the timeout kicked in", type = AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_timeout;
    protected TimeScheduler timer;
    protected volatile boolean running;
    protected Future<?> flush_task;
    protected static BatchHeader HEADER = new BatchHeader();

    @Property(description = "Max interval (millis) at which the queued messages are sent")
    protected long flush_interval = 100;

    @Property(description = "The maximum number of messages per batch")
    public int max_batch_size = 100;
    protected final NullAddress nullAddress = new NullAddress();
    protected Map<Address, Buffer> msgMap = Util.createConcurrentMap();

    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.2.10.Final.jar:org/jgroups/protocols/BATCH$BatchHeader.class */
    public static class BatchHeader extends Header {
        @Override // org.jgroups.Header
        public short getMagicId() {
            return (short) 95;
        }

        @Override // org.jgroups.Constructable
        public Supplier<? extends Header> create() {
            return BatchHeader::new;
        }

        @Override // org.jgroups.util.SizeStreamable
        public int serializedSize() {
            return 0;
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws IOException {
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws IOException {
        }

        @Override // org.jgroups.Header
        public String toString() {
            return "BatchHeader";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.2.10.Final.jar:org/jgroups/protocols/BATCH$Buffer.class */
    public class Buffer {
        private final Address dest;
        private Message[] msgs;
        private int index = 0;
        private boolean closed;
        private long total_bytes;

        protected Buffer(Address address) {
            this.dest = address;
            this.msgs = new Message[BATCH.this.max_batch_size];
        }

        protected synchronized boolean addMessage(Message message) {
            if (this.closed) {
                return false;
            }
            int length = message.getLength();
            if (this.total_bytes + length > BATCH.this.getTransport().getBundler().getMaxSize()) {
                BATCH.this.num_ebs_sent_due_to_full_queue++;
                sendBatch(false);
            }
            Message[] messageArr = this.msgs;
            int i = this.index;
            this.index = i + 1;
            messageArr[i] = message;
            this.total_bytes += length;
            if (this.index != this.msgs.length) {
                return true;
            }
            BATCH.this.num_ebs_sent_due_to_max_number_of_msgs++;
            sendBatch(false);
            return true;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void sendBatch(boolean z) {
            if (this.index == 0) {
                return;
            }
            if (this.index == 1) {
                BATCH.this.down_prot.down(this.msgs[0]);
                this.msgs[0] = null;
                this.index = 0;
                this.total_bytes = 0L;
                BATCH.this.num_msgs_sent++;
                return;
            }
            Message src = new BatchMessage(this.dest instanceof NullAddress ? null : this.dest, BATCH.this.local_addr, this.msgs, this.index).putHeader(BATCH.this.id, BATCH.HEADER).setSrc(BATCH.this.local_addr);
            this.msgs = new Message[BATCH.this.max_batch_size];
            BATCH.this.num_msgs_sent += this.index;
            BATCH.this.num_ebs_sent++;
            if (z) {
                BATCH.this.num_ebs_sent_due_to_timeout++;
            }
            this.index = 0;
            this.total_bytes = 0L;
            BATCH.this.down_prot.down(src);
        }

        protected synchronized void close() {
            this.closed = true;
            sendBatch(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.2.10.Final.jar:org/jgroups/protocols/BATCH$FlushTask.class */
    public class FlushTask implements Runnable {
        protected FlushTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            BATCH.this.flush();
        }

        public String toString() {
            return BATCH.class.getSimpleName() + ": FlushTask (interval=" + BATCH.this.flush_interval + " ms)";
        }
    }

    @ManagedAttribute(description = "Average number of messages in an BatchMessage")
    public double avgBatchSize() {
        if (this.num_ebs_sent == 0 || this.num_msgs_sent == 0) {
            return 0.0d;
        }
        return this.num_msgs_sent / this.num_ebs_sent;
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.Lifecycle
    public void init() throws Exception {
        this.msgMap.putIfAbsent(this.nullAddress, new Buffer(this.nullAddress));
    }

    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        super.resetStats();
        this.num_msgs_sent = 0L;
        this.num_ebs_sent = 0L;
        this.num_ebs_sent_due_to_full_queue = 0L;
        this.num_ebs_sent_due_to_timeout = 0L;
        this.num_ebs_sent_due_to_max_number_of_msgs = 0L;
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 6:
                handleViewChange(((View) event.getArg()).getMembers());
                break;
        }
        return this.down_prot.down(event);
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        switch (event.getType()) {
            case 6:
                handleViewChange(((View) event.getArg()).getMembers());
                break;
        }
        return this.up_prot.up(event);
    }

    protected void handleViewChange(List<Address> list) {
        if (list == null) {
            return;
        }
        list.stream().filter(address -> {
            return !this.msgMap.containsKey(address);
        }).forEach(address2 -> {
            this.msgMap.putIfAbsent(address2, new Buffer(address2));
        });
        this.msgMap.keySet().stream().filter(address3 -> {
            return (list.contains(address3) || (address3 instanceof NullAddress)) ? false : true;
        }).forEach(address4 -> {
            this.msgMap.remove(address4).close();
        });
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Message message) {
        if (message.isFlagSet(Message.Flag.OOB)) {
            return this.down_prot.down(message);
        }
        if (message.getSrc() == null) {
            message.setSrc(this.local_addr);
        }
        if (!Objects.equals(message.getSrc(), this.local_addr)) {
            return this.down_prot.down(message);
        }
        Address dest = message.dest() == null ? this.nullAddress : message.dest();
        return !this.msgMap.computeIfAbsent(dest, address -> {
            return new Buffer(dest);
        }).addMessage(message) ? this.down_prot.down(message) : message;
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Message message) {
        if (message.getHeader(getId()) == null) {
            return this.up_prot.up(message);
        }
        BatchMessage batchMessage = (BatchMessage) message;
        Iterator<Message> it = batchMessage.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            next.setDest(batchMessage.getDest());
            if (next.getSrc() == null) {
                next.setSrc(batchMessage.getSrc());
            }
        }
        MessageBatch messageBatch = new MessageBatch();
        messageBatch.set(batchMessage.getDest(), batchMessage.getSrc(), batchMessage.getMessages());
        if (messageBatch.isEmpty()) {
            return null;
        }
        this.up_prot.up(messageBatch);
        return null;
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public void up(MessageBatch messageBatch) {
        int i = 0;
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (next instanceof BatchMessage) {
                i += ((BatchMessage) next).getNumberOfMessages();
            }
        }
        if (i > 0) {
            MessageBatch sender = new MessageBatch(i + 1).setDest(messageBatch.dest()).setSender(messageBatch.getSender());
            Iterator<Message> it2 = messageBatch.iterator();
            while (it2.hasNext()) {
                Message next2 = it2.next();
                if (next2 instanceof BatchMessage) {
                    BatchMessage batchMessage = (BatchMessage) next2;
                    it2.remove();
                    sender.add(batchMessage.getMessages(), batchMessage.getNumberOfMessages());
                }
            }
            if (!sender.isEmpty()) {
                this.up_prot.up(sender);
            }
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.Lifecycle
    public void start() throws Exception {
        this.timer = getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        this.running = true;
        startFlushTask();
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.Lifecycle
    public void stop() {
        this.running = false;
        stopFlushTask();
    }

    public void startFlushTask() {
        if (this.flush_task == null || this.flush_task.isDone()) {
            this.flush_task = this.timer.scheduleWithFixedDelay(new FlushTask(), 0L, this.flush_interval, TimeUnit.MILLISECONDS, true);
        }
    }

    public void stopFlushTask() {
        if (this.flush_task != null) {
            this.flush_task.cancel(true);
            this.flush_task = null;
        }
    }

    public void flush() {
        this.msgMap.forEach((address, buffer) -> {
            buffer.sendBatch(true);
        });
    }
}
