package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.logging.Log;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AgeOutCache;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.Table;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Tuple;
import org.jgroups.util.Util;
import org.modeshape.webdav.WebdavStatus;

@MBean(description = "Reliable unicast layer")
/* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3.class */
public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
    public static final long DEFAULT_FIRST_SEQNO = 1;
    protected Future<?> xmit_task;
    protected Address local_addr;
    protected TimeScheduler timer;
    protected short last_conn_id;
    protected AgeOutCache<Address> cache;

    @Property(description = "Max number of messages to be removed from a retransmit window. This property might get removed anytime, so don't use it !")
    protected int max_msg_batch_size = WebdavStatus.SC_INTERNAL_SERVER_ERROR;

    @Property(description = "Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping")
    protected long conn_expiry_timeout = 120000;

    @Property(description = "Time (in ms) until a connection marked to be closed will get removed. 0 disables this")
    protected long conn_close_timeout = 60000;

    @Property(description = "Number of rows of the matrix in the retransmission table (only for experts)", writable = false)
    protected int xmit_table_num_rows = 100;

    @Property(description = "Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row", writable = false)
    protected int xmit_table_msgs_per_row = 1000;

    @Property(description = "Resize factor of the matrix in the retransmission table (only for experts)", writable = false)
    protected double xmit_table_resize_factor = 1.2d;

    @Property(description = "Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts)", writable = false)
    protected long xmit_table_max_compaction_time = 600000;
    protected long max_retransmit_time = 60000;

    @Property(description = "Interval (in milliseconds) at which messages in the send windows are resent")
    protected long xmit_interval = 500;

    @Property(description = "If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
    protected boolean log_not_found_msgs = true;

    @Property(description = "Send an ack for a batch immediately instead of using a delayed ack", deprecatedMessage = "replaced by ack_threshold")
    @Deprecated
    protected boolean ack_batches_immediately = true;

    @Property(description = "Send an ack immediately when a batch of ack_threshold (or more) messages is received. Otherwise send delayed acks. If 1, ack single messages (similar to UNICAST)")
    protected int ack_threshold = 5;
    protected long num_msgs_sent = 0;
    protected long num_msgs_received = 0;
    protected long num_acks_sent = 0;
    protected long num_acks_received = 0;
    protected long num_xmits = 0;

    @ManagedAttribute(description = "Number of retransmit requests received")
    protected final AtomicLong xmit_reqs_received = new AtomicLong(0);

    @ManagedAttribute(description = "Number of retransmit requests sent")
    protected final AtomicLong xmit_reqs_sent = new AtomicLong(0);

    @ManagedAttribute(description = "Number of retransmit responses sent")
    protected final AtomicLong xmit_rsps_sent = new AtomicLong(0);
    protected final ConcurrentMap<Address, SenderEntry> send_table = Util.createConcurrentMap();
    protected final ConcurrentMap<Address, ReceiverEntry> recv_table = Util.createConcurrentMap();
    protected final ReentrantLock recv_table_lock = new ReentrantLock();
    protected final Map<Address, Long> xmit_task_map = new HashMap();
    protected volatile List<Address> members = new ArrayList(11);
    protected volatile boolean running = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3$Entry.class */
    public abstract class Entry {
        final short conn_id;
        protected final AtomicLong timestamp = new AtomicLong(0);
        protected volatile State state = State.OPEN;

        protected Entry(short s) {
            this.conn_id = s;
            update();
        }

        short connId() {
            return this.conn_id;
        }

        void update() {
            this.timestamp.set(System.currentTimeMillis());
        }

        long age() {
            return System.currentTimeMillis() - this.timestamp.longValue();
        }

        State state() {
            return this.state;
        }

        Entry state(State state) {
            this.state = state;
            return this;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3$Header.class */
    public static class Header extends org.jgroups.Header {
        public static final byte DATA = 0;
        public static final byte ACK = 1;
        public static final byte SEND_FIRST_SEQNO = 2;
        public static final byte XMIT_REQ = 3;
        public static final byte CLOSE = 4;
        byte type;
        long seqno;
        short conn_id;
        boolean first;

        public Header() {
        }

        protected Header(byte b) {
            this.type = b;
        }

        protected Header(byte b, long j) {
            this.type = b;
            this.seqno = j;
        }

        protected Header(byte b, long j, short s, boolean z) {
            this.type = b;
            this.seqno = j;
            this.conn_id = s;
            this.first = z;
        }

        public static Header createDataHeader(long j, short s, boolean z) {
            return new Header((byte) 0, j, s, z);
        }

        public static Header createAckHeader(long j, short s) {
            return new Header((byte) 1, j, s, false);
        }

        public static Header createSendFirstSeqnoHeader() {
            return new Header((byte) 2);
        }

        public static Header createXmitReqHeader() {
            return new Header((byte) 3);
        }

        public static Header createCloseHeader(short s) {
            return new Header((byte) 4, 0L, s, false);
        }

        public long seqno() {
            return this.seqno;
        }

        public short connId() {
            return this.conn_id;
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(type2Str(this.type)).append(", seqno=").append(this.seqno);
            if (this.conn_id != 0) {
                sb.append(", conn_id=").append((int) this.conn_id);
            }
            if (this.first) {
                sb.append(", first");
            }
            return sb.toString();
        }

        public static String type2Str(byte b) {
            switch (b) {
                case 0:
                    return "DATA";
                case 1:
                    return "ACK";
                case 2:
                    return "SEND_FIRST_SEQNO";
                case 3:
                    return "XMIT_REQ";
                case 4:
                    return "CLOSE";
                default:
                    return "<unknown>";
            }
        }

        @Override // org.jgroups.Header
        public final int size() {
            int i = 1;
            switch (this.type) {
                case 0:
                    i = 1 + Util.size(this.seqno) + 2 + 1;
                    break;
                case 1:
                    i = 1 + Util.size(this.seqno) + 2;
                    break;
                case 4:
                    i = 1 + 2;
                    break;
            }
            return i;
        }

        public Header copy() {
            return new Header(this.type, this.seqno, this.conn_id, this.first);
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeByte(this.type);
            switch (this.type) {
                case 0:
                    Util.writeLong(this.seqno, dataOutput);
                    dataOutput.writeShort(this.conn_id);
                    dataOutput.writeBoolean(this.first);
                    return;
                case 1:
                    Util.writeLong(this.seqno, dataOutput);
                    dataOutput.writeShort(this.conn_id);
                    return;
                case 2:
                case 3:
                default:
                    return;
                case 4:
                    dataOutput.writeShort(this.conn_id);
                    return;
            }
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readByte();
            switch (this.type) {
                case 0:
                    this.seqno = Util.readLong(dataInput);
                    this.conn_id = dataInput.readShort();
                    this.first = dataInput.readBoolean();
                    return;
                case 1:
                    this.seqno = Util.readLong(dataInput);
                    this.conn_id = dataInput.readShort();
                    return;
                case 2:
                case 3:
                default:
                    return;
                case 4:
                    this.conn_id = dataInput.readShort();
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3$ReceiverEntry.class */
    public final class ReceiverEntry extends Entry {
        protected final Table<Message> received_msgs;
        protected volatile boolean send_ack;

        public ReceiverEntry(Table<Message> table, short s) {
            super(s);
            this.received_msgs = table;
        }

        ReceiverEntry sendAck(boolean z) {
            this.send_ack = z;
            return this;
        }

        boolean sendAck() {
            boolean z = this.send_ack;
            this.send_ack = false;
            return z;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.received_msgs != null) {
                sb.append(this.received_msgs).append(RecoveryAdminOperations.SEPARATOR);
            }
            sb.append("recv_conn_id=" + ((int) this.conn_id));
            sb.append(" (" + age() + " ms old) - " + this.state);
            if (this.send_ack) {
                sb.append(" [ack pending]");
            }
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3$RetransmitTask.class */
    public class RetransmitTask implements Runnable {
        protected RetransmitTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            UNICAST3.this.triggerXmit();
        }

        public String toString() {
            return UNICAST3.class.getSimpleName() + ": RetransmitTask (interval=" + UNICAST3.this.xmit_interval + " ms)";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3$SenderEntry.class */
    public final class SenderEntry extends Entry {
        final Table<Message> sent_msgs;
        final AtomicLong sent_msgs_seqno;
        protected final long[] watermark;

        public SenderEntry(short s) {
            super(s);
            this.sent_msgs_seqno = new AtomicLong(1L);
            this.watermark = new long[]{0, 0};
            this.sent_msgs = new Table<>(UNICAST3.this.xmit_table_num_rows, UNICAST3.this.xmit_table_msgs_per_row, 0L, UNICAST3.this.xmit_table_resize_factor, UNICAST3.this.xmit_table_max_compaction_time);
        }

        long[] watermark() {
            return this.watermark;
        }

        SenderEntry watermark(long j, long j2) {
            this.watermark[0] = j;
            this.watermark[1] = j2;
            return this;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.sent_msgs != null) {
                sb.append(this.sent_msgs).append(RecoveryAdminOperations.SEPARATOR);
            }
            sb.append("send_conn_id=" + ((int) this.conn_id)).append(" (" + age() + " ms old) - " + this.state);
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.4.3.Final.jar:org/jgroups/protocols/UNICAST3$State.class */
    public enum State {
        OPEN,
        CLOSING,
        CLOSED
    }

    public void setMaxMessageBatchSize(int i) {
        if (i >= 1) {
            this.max_msg_batch_size = i;
        }
    }

    @ManagedAttribute
    public String getLocalAddress() {
        return this.local_addr != null ? this.local_addr.toString() : "null";
    }

    @ManagedAttribute
    public String getMembers() {
        return this.members.toString();
    }

    @ManagedAttribute(description = "Returns the number of outgoing (send) connections")
    public int getNumSendConnections() {
        return this.send_table.size();
    }

    @ManagedAttribute(description = "Returns the number of incoming (receive) connections")
    public int getNumReceiveConnections() {
        return this.recv_table.size();
    }

    @ManagedAttribute(description = "Returns the total number of outgoing (send) and incoming (receive) connections")
    public int getNumConnections() {
        return getNumReceiveConnections() + getNumSendConnections();
    }

    @ManagedOperation
    public String printConnections() {
        StringBuilder sb = new StringBuilder();
        if (!this.send_table.isEmpty()) {
            sb.append("\nsend connections:\n");
            for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
            }
        }
        if (!this.recv_table.isEmpty()) {
            sb.append("\nreceive connections:\n");
            for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
                sb.append(entry2.getKey()).append(": ").append(entry2.getValue()).append("\n");
            }
        }
        return sb.toString();
    }

    @ManagedAttribute
    public long getNumMessagesSent() {
        return this.num_msgs_sent;
    }

    @ManagedAttribute
    public long getNumMessagesReceived() {
        return this.num_msgs_received;
    }

    @ManagedAttribute
    public long getNumAcksSent() {
        return this.num_acks_sent;
    }

    @ManagedAttribute
    public long getNumAcksReceived() {
        return this.num_acks_received;
    }

    @ManagedAttribute
    public long getNumXmits() {
        return this.num_xmits;
    }

    public long getMaxRetransmitTime() {
        return this.max_retransmit_time;
    }

    @Property(description = "Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this")
    public void setMaxRetransmitTime(long j) {
        this.max_retransmit_time = j;
        if (this.cache == null || j <= 0) {
            return;
        }
        this.cache.setTimeout(j);
    }

    @ManagedAttribute(description = "Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return (this.xmit_task == null || this.xmit_task.isDone()) ? false : true;
    }

    @ManagedAttribute
    public int getAgeOutCacheSize() {
        if (this.cache != null) {
            return this.cache.size();
        }
        return 0;
    }

    @ManagedOperation
    public String printAgeOutCache() {
        return this.cache != null ? this.cache.toString() : "n/a";
    }

    public AgeOutCache<Address> getAgeOutCache() {
        return this.cache;
    }

    public boolean hasSendConnectionTo(Address address) {
        SenderEntry senderEntry = this.send_table.get(address);
        return senderEntry != null && senderEntry.state() == State.OPEN;
    }

    @ManagedAttribute
    public int getNumUnackedMessages() {
        int i = 0;
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.size();
            }
        }
        return i;
    }

    @ManagedAttribute
    public int getNumberOfMessagesInReceiveWindows() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.size();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Total number of undelivered messages in all receive windows")
    public long getXmitTableUndeliveredMessages() {
        long j = 0;
        Iterator<ReceiverEntry> it = this.recv_table.values().iterator();
        while (it.hasNext()) {
            if (it.next().received_msgs != null) {
                j += r0.received_msgs.size();
            }
        }
        return j;
    }

    @ManagedAttribute(description = "Total number of missing messages in all receive windows")
    public long getXmitTableMissingMessages() {
        long j = 0;
        Iterator<ReceiverEntry> it = this.recv_table.values().iterator();
        while (it.hasNext()) {
            if (it.next().received_msgs != null) {
                j += r0.received_msgs.getNumMissing();
            }
        }
        return j;
    }

    @ManagedAttribute(description = "Number of compactions in all (receive and send) windows")
    public int getXmitTableNumCompactions() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumCompactions();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumCompactions();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Number of moves in all (receive and send) windows")
    public int getXmitTableNumMoves() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumMoves();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumMoves();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Number of resizes in all (receive and send) windows")
    public int getXmitTableNumResizes() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumResizes();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumResizes();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Number of purges in all (receive and send) windows")
    public int getXmitTableNumPurges() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumPurges();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumPurges();
            }
        }
        return i;
    }

    @ManagedOperation(description = "Prints the contents of the receive windows for all members")
    public String printReceiveWindowMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().received_msgs.toString()).append('\n');
        }
        return sb.toString();
    }

    @ManagedOperation(description = "Prints the contents of the send windows for all members")
    public String printSendWindowMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().sent_msgs.toString()).append('\n');
        }
        return sb.toString();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v0, types: [org.jgroups.protocols.UNICAST3] */
    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        ?? r4 = 0;
        this.num_acks_received = 0L;
        this.num_acks_sent = 0L;
        r4.num_msgs_received = this;
        this.num_msgs_sent = this;
        this.num_xmits = 0L;
    }

    @Override // org.jgroups.stack.Protocol
    public Map<String, Object> dumpStats() {
        Map<String, Object> dumpStats = super.dumpStats();
        dumpStats.put("num_unacked_msgs", Integer.valueOf(getNumUnackedMessages()));
        dumpStats.put("num_msgs_in_recv_windows", Integer.valueOf(getNumberOfMessagesInReceiveWindows()));
        return dumpStats;
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        this.timer = getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        if (this.max_retransmit_time > 0) {
            this.cache = new AgeOutCache<>(this.timer, this.max_retransmit_time, this);
        }
        this.running = true;
        startRetransmitTask();
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        sendPendingAcks();
        this.running = false;
        stopRetransmitTask();
        this.xmit_task_map.clear();
        removeAllConnections();
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        Header header;
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (message.getDest() != null && !message.isFlagSet(Message.Flag.NO_RELIABILITY) && (header = (Header) message.getHeader(this.id)) != null) {
                    Address src = message.getSrc();
                    switch (header.type) {
                        case 0:
                            handleDataReceived(src, header.seqno, header.conn_id, header.first, message, event);
                            return null;
                        default:
                            handleUpEvent(src, message, header);
                            return null;
                    }
                }
                break;
        }
        return this.up_prot.up(event);
    }

    protected void handleUpEvent(Address address, Message message, Header header) {
        switch (header.type) {
            case 0:
                throw new IllegalStateException("header of type DATA is not supposed to be handled by this method");
            case 1:
                handleAckReceived(address, header.seqno, header.conn_id);
                return;
            case 2:
                handleResendingOfFirstMessage(address);
                return;
            case 3:
                handleXmitRequest(address, (SeqnoList) message.getObject());
                return;
            case 4:
                this.log.trace(this.local_addr + "%s <-- CLOSE(%s: conn-id=%s)", this.local_addr, address, Short.valueOf(header.conn_id));
                ReceiverEntry receiverEntry = this.recv_table.get(address);
                if (receiverEntry == null || receiverEntry.connId() != header.conn_id) {
                    return;
                }
                this.recv_table.remove(address, receiverEntry);
                this.log.trace("%s: removed receive connection for %s", this.local_addr, address);
                return;
            default:
                this.log.error(Util.getMessage("TypeNotKnown"), this.local_addr, Byte.valueOf(header.type));
                return;
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void up(MessageBatch messageBatch) {
        Header header;
        if (messageBatch.dest() == null) {
            this.up_prot.up(messageBatch);
            return;
        }
        int size = messageBatch.size();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ReceiverEntry receiverEntry = this.recv_table.get(messageBatch.sender());
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (next != null && !next.isFlagSet(Message.Flag.NO_RELIABILITY) && (header = (Header) next.getHeader(this.id)) != null) {
                messageBatch.remove(next);
                if (header.type != 0) {
                    try {
                        handleUpEvent(next.getSrc(), next, header);
                    } catch (Throwable th) {
                        this.log.error(Util.getMessage("FailedHandlingEvent"), this.local_addr, th);
                    }
                } else {
                    List list = (List) linkedHashMap.get(Short.valueOf(header.conn_id));
                    if (list == null) {
                        Short valueOf = Short.valueOf(header.conn_id);
                        ArrayList arrayList = new ArrayList(size);
                        list = arrayList;
                        linkedHashMap.put(valueOf, arrayList);
                    }
                    list.add(new Tuple(Long.valueOf(header.seqno()), next));
                    if (header.first) {
                        receiverEntry = getReceiverEntry(messageBatch.sender(), header.seqno(), header.first, header.connId());
                    }
                }
            }
        }
        if (!linkedHashMap.isEmpty()) {
            if (receiverEntry == null) {
                sendRequestForFirstSeqno(messageBatch.sender());
            } else {
                if (linkedHashMap.keySet().retainAll(Arrays.asList(Short.valueOf(receiverEntry.connId())))) {
                    sendRequestForFirstSeqno(messageBatch.sender());
                }
                List<Tuple<Long, Message>> list2 = (List) linkedHashMap.get(Short.valueOf(receiverEntry.connId()));
                if (list2 != null && !list2.isEmpty()) {
                    handleBatchReceived(receiverEntry, messageBatch.sender(), list2, messageBatch.mode() == MessageBatch.Mode.OOB);
                }
            }
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                Address dest = message.getDest();
                if (dest != null && !message.isFlagSet(Message.Flag.NO_RELIABILITY)) {
                    if (!this.running) {
                        this.log.trace("%s: discarded message as start() has not yet been called, message: %s", this.local_addr, message);
                        return null;
                    }
                    SenderEntry senderEntry = this.send_table.get(dest);
                    if (senderEntry == null || senderEntry.state() == State.CLOSED) {
                        if (senderEntry != null) {
                            this.send_table.remove(dest, senderEntry);
                        }
                        senderEntry = new SenderEntry(getNewConnectionId());
                        SenderEntry putIfAbsent = this.send_table.putIfAbsent(dest, senderEntry);
                        if (putIfAbsent != null) {
                            senderEntry = putIfAbsent;
                        } else {
                            this.log.trace("%s: created sender window for %s (conn-id=%s)", this.local_addr, dest, Short.valueOf(senderEntry.connId()));
                            if (this.cache != null && !this.members.contains(dest)) {
                                this.cache.add(dest);
                            }
                        }
                    }
                    if (senderEntry.state() == State.CLOSING) {
                        senderEntry.state(State.OPEN);
                    }
                    short connId = senderEntry.connId();
                    long andIncrement = senderEntry.sent_msgs_seqno.getAndIncrement();
                    long j = 10;
                    while (true) {
                        long j2 = j;
                        if (this.running) {
                            try {
                                message.putHeader(this.id, Header.createDataHeader(andIncrement, connId, andIncrement == 1));
                                senderEntry.sent_msgs.add(andIncrement, (long) message);
                                if (this.conn_expiry_timeout > 0) {
                                    senderEntry.update();
                                    break;
                                } else {
                                    break;
                                }
                            } catch (Throwable th) {
                                if (this.running) {
                                    Util.sleep(j2);
                                    j = Math.min(5000L, j2 * 2);
                                }
                            }
                        }
                    }
                    if (this.log.isTraceEnabled()) {
                        StringBuilder sb = new StringBuilder();
                        sb.append(this.local_addr).append(" --> DATA(").append(dest).append(": #").append(andIncrement).append(", conn_id=").append((int) connId);
                        if (andIncrement == 1) {
                            sb.append(", first");
                        }
                        sb.append(')');
                        this.log.trace(sb);
                    }
                    this.num_msgs_sent++;
                    return this.down_prot.down(event);
                }
                break;
            case 6:
                List<Address> members = ((View) event.getArg()).getMembers();
                HashSet hashSet = new HashSet(this.send_table.keySet());
                hashSet.addAll(this.recv_table.keySet());
                this.members = members;
                hashSet.removeAll(members);
                if (this.cache != null) {
                    this.cache.removeAll(members);
                }
                if (!hashSet.isEmpty()) {
                    this.log.trace("%s: removing non members %s", this.local_addr, hashSet);
                    Iterator it = hashSet.iterator();
                    while (it.hasNext()) {
                        closeConnection((Address) it.next());
                    }
                }
                if (!members.isEmpty()) {
                    for (Address address : members) {
                        SenderEntry senderEntry2 = this.send_table.get(address);
                        if (senderEntry2 != null && senderEntry2.state() == State.CLOSING) {
                            senderEntry2.state(State.OPEN);
                        }
                        ReceiverEntry receiverEntry = this.recv_table.get(address);
                        if (receiverEntry != null && receiverEntry.state() == State.CLOSING) {
                            receiverEntry.state(State.OPEN);
                        }
                    }
                }
                this.xmit_task_map.keySet().retainAll(members);
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
        }
        return this.down_prot.down(event);
    }

    public void closeConnection(Address address) {
        closeSendConnection(address);
        closeReceiveConnection(address);
    }

    public void closeSendConnection(Address address) {
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry != null) {
            senderEntry.state(State.CLOSING).update();
        }
    }

    protected void removeSendConnection(Address address) {
        SenderEntry remove = this.send_table.remove(address);
        if (remove != null) {
            remove.state(State.CLOSED);
            sendClose(address, remove.connId());
        }
    }

    public void closeReceiveConnection(Address address) {
        ReceiverEntry receiverEntry = this.recv_table.get(address);
        if (receiverEntry != null) {
            receiverEntry.state(State.CLOSING).update();
        }
    }

    protected void removeReceiveConnection(Address address) {
        ReceiverEntry remove = this.recv_table.remove(address);
        if (remove != null) {
            remove.state(State.CLOSED);
        }
    }

    @ManagedOperation(description = "Trashes all connections to other nodes. This is only used for testing")
    public void removeAllConnections() {
        this.send_table.clear();
        this.recv_table.clear();
    }

    protected void retransmit(SeqnoList seqnoList, Address address) {
        Message putHeader = new Message(address, seqnoList).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(this.id, Header.createXmitReqHeader());
        this.log.trace("%s: sending XMIT_REQ (%s) to %s", this.local_addr, seqnoList, address);
        this.down_prot.down(new Event(1, putHeader));
        this.xmit_reqs_sent.addAndGet(seqnoList.size());
    }

    protected void retransmit(Message message) {
        if (this.log.isTraceEnabled()) {
            Header header = (Header) message.getHeader(this.id);
            this.log.trace("%s --> XMIT(%s: #%d)", this.local_addr, message.getDest(), Long.valueOf(header != null ? header.seqno : -1L));
        }
        this.down_prot.down(new Event(1, message));
        this.num_xmits++;
    }

    @Override // org.jgroups.util.AgeOutCache.Handler
    public void expired(Address address) {
        if (address != null) {
            this.log.debug("%s: removing connection to %s because it expired", this.local_addr, address);
            closeConnection(address);
        }
    }

    protected void handleDataReceived(Address address, long j, short s, boolean z, Message message, Event event) {
        if (this.log.isTraceEnabled()) {
            Log log = this.log;
            Object[] objArr = new Object[5];
            objArr[0] = this.local_addr;
            objArr[1] = address;
            objArr[2] = Long.valueOf(j);
            objArr[3] = Short.valueOf(s);
            objArr[4] = z ? ", first" : "";
            log.trace("%s <-- DATA(%s: #%d, conn_id=%d%s)", objArr);
        }
        ReceiverEntry receiverEntry = getReceiverEntry(address, j, z, s);
        if (receiverEntry == null) {
            return;
        }
        if (this.conn_expiry_timeout > 0) {
            receiverEntry.update();
        }
        if (receiverEntry.state() == State.CLOSING) {
            receiverEntry.state(State.OPEN);
        }
        boolean isFlagSet = message.isFlagSet(Message.Flag.OOB);
        if (isFlagSet) {
            message.setTransientFlag(Message.TransientFlag.OOB_DELIVERED);
        }
        Table<Message> table = receiverEntry.received_msgs;
        boolean add = table.add(j, (long) message);
        this.num_msgs_received++;
        if (this.ack_threshold <= 1) {
            sendAck(address, table.getHighestDeliverable(), receiverEntry.connId());
        } else {
            receiverEntry.sendAck(true);
        }
        if (isFlagSet && add) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("%s: delivering %s#%s", this.local_addr, address, Long.valueOf(j));
            }
            try {
                this.up_prot.up(event);
            } catch (Throwable th) {
                this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "OOB message", message, th);
            }
        }
        AtomicBoolean processing = table.getProcessing();
        if (processing.compareAndSet(false, true)) {
            removeAndDeliver(processing, table, address);
        }
    }

    protected void handleBatchReceived(ReceiverEntry receiverEntry, Address address, List<Tuple<Long, Message>> list, boolean z) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("%s <-- DATA(%s: %s)", this.local_addr, address, printMessageList(list));
        }
        int size = list.size();
        Table<Message> table = receiverEntry.received_msgs;
        this.num_msgs_received += size;
        boolean add = z ? table.add(list, true) : table.add(list);
        if (this.conn_expiry_timeout > 0) {
            receiverEntry.update();
        }
        if (receiverEntry.state() == State.CLOSING) {
            receiverEntry.state(State.OPEN);
        }
        if (size >= this.ack_threshold) {
            sendAck(address, table.getHighestDeliverable(), receiverEntry.connId());
        } else {
            receiverEntry.sendAck(true);
        }
        if (add && z) {
            for (Tuple<Long, Message> tuple : list) {
                long longValue = tuple.getVal1().longValue();
                Message val2 = tuple.getVal2();
                if (val2.isFlagSet(Message.Flag.OOB) && val2.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("%s: delivering %s#%s", this.local_addr, address, Long.valueOf(longValue));
                    }
                    try {
                        this.up_prot.up(new Event(1, val2));
                    } catch (Throwable th) {
                        this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "OOB message", val2, th);
                    }
                }
            }
        }
        AtomicBoolean processing = table.getProcessing();
        if (processing.compareAndSet(false, true)) {
            removeAndDeliver(processing, table, address);
        }
    }

    protected int removeAndDeliver(AtomicBoolean atomicBoolean, Table<Message> table, Address address) {
        while (true) {
            try {
                List<Message> removeMany = table.removeMany(atomicBoolean, true, this.max_msg_batch_size);
                if (removeMany == null) {
                    break;
                }
                MessageBatch messageBatch = new MessageBatch(this.local_addr, address, null, false, removeMany);
                Iterator<Message> it = messageBatch.iterator();
                while (it.hasNext()) {
                    Message next = it.next();
                    if (next.isFlagSet(Message.Flag.OOB) && !next.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
                        messageBatch.remove(next);
                    }
                }
                if (!messageBatch.isEmpty()) {
                    try {
                        if (this.log.isTraceEnabled()) {
                            Message first = messageBatch.first();
                            Message last = messageBatch.last();
                            StringBuilder sb = new StringBuilder(this.local_addr + ": delivering");
                            if (first != null && last != null) {
                                sb.append(" #").append(((Header) first.getHeader(this.id)).seqno).append(" - #").append(((Header) last.getHeader(this.id)).seqno);
                            }
                            sb.append(" (" + messageBatch.size()).append(" messages)");
                            this.log.trace(sb);
                        }
                        this.up_prot.up(messageBatch);
                    } catch (Throwable th) {
                        this.log.error(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "batch", messageBatch, th);
                    }
                }
            } catch (Throwable th2) {
                if (0 == 0) {
                    atomicBoolean.set(false);
                }
                throw th2;
            }
        }
        if (1 == 0) {
            atomicBoolean.set(false);
        }
        return 0;
    }

    protected String printMessageList(List<Tuple<Long, Message>> list) {
        Header header;
        Header header2;
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        Message val2 = size > 0 ? list.get(0).getVal2() : null;
        Message val22 = size > 1 ? list.get(size - 1).getVal2() : val2;
        if (val2 != null && (header2 = (Header) val2.getHeader(this.id)) != null) {
            sb.append("#" + header2.seqno);
        }
        if (val22 != null && (header = (Header) val22.getHeader(this.id)) != null) {
            sb.append(" - #" + header.seqno);
        }
        return sb.toString();
    }

    protected ReceiverEntry getReceiverEntry(Address address, long j, boolean z, short s) {
        ReceiverEntry receiverEntry = this.recv_table.get(address);
        if (receiverEntry != null && receiverEntry.connId() == s) {
            return receiverEntry;
        }
        this.recv_table_lock.lock();
        try {
            ReceiverEntry receiverEntry2 = this.recv_table.get(address);
            if (z) {
                if (receiverEntry2 == null) {
                    receiverEntry2 = createReceiverEntry(address, j, s);
                } else if (s != receiverEntry2.connId()) {
                    this.log.trace("%s: conn_id=%d != %d; resetting receiver window", this.local_addr, Short.valueOf(s), Short.valueOf(receiverEntry2.connId()));
                    this.recv_table.remove(address);
                    receiverEntry2 = createReceiverEntry(address, j, s);
                }
            } else if (receiverEntry2 == null || receiverEntry2.connId() != s) {
                this.recv_table_lock.unlock();
                sendRequestForFirstSeqno(address);
                if (this.recv_table_lock.isHeldByCurrentThread()) {
                    this.recv_table_lock.unlock();
                }
                return null;
            }
            return receiverEntry2;
        } finally {
            if (this.recv_table_lock.isHeldByCurrentThread()) {
                this.recv_table_lock.unlock();
            }
        }
    }

    protected ReceiverEntry createReceiverEntry(Address address, long j, short s) {
        ReceiverEntry receiverEntry = new ReceiverEntry(new Table(this.xmit_table_num_rows, this.xmit_table_msgs_per_row, j - 1, this.xmit_table_resize_factor, this.xmit_table_max_compaction_time), s);
        ReceiverEntry putIfAbsent = this.recv_table.putIfAbsent(address, receiverEntry);
        if (putIfAbsent != null) {
            return putIfAbsent;
        }
        this.log.trace("%s: created receiver window for %s at seqno=#%d for conn-id=%d", this.local_addr, address, Long.valueOf(j), Short.valueOf(s));
        return receiverEntry;
    }

    protected void handleAckReceived(Address address, long j, short s) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("%s <-- ACK(%s: #%d, conn-id=%d)", this.local_addr, address, Long.valueOf(j), Short.valueOf(s));
        }
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry != null && senderEntry.connId() != s) {
            this.log.trace("%s: my conn_id (%d) != received conn_id (%d); discarding ACK", this.local_addr, Short.valueOf(senderEntry.connId()), Short.valueOf(s));
            return;
        }
        Table<Message> table = senderEntry != null ? senderEntry.sent_msgs : null;
        if (table != null) {
            table.purge(j, true);
            this.num_acks_received++;
        }
    }

    protected void handleResendingOfFirstMessage(Address address) {
        this.log.trace("%s <-- SEND_FIRST_SEQNO(%s)", this.local_addr, address);
        SenderEntry senderEntry = this.send_table.get(address);
        Table<Message> table = senderEntry != null ? senderEntry.sent_msgs : null;
        if (table == null) {
            this.log.warn(Util.getMessage("SenderNotFound"), this.local_addr, address);
            return;
        }
        Message message = table.get(table.getLow() + 1);
        if (message != null) {
            Message copy = message.copy();
            Header copy2 = ((Header) copy.getHeader(this.id)).copy();
            copy2.first = true;
            copy.putHeader(this.id, copy2);
            this.down_prot.down(new Event(1, copy));
        }
    }

    protected void handleXmitRequest(Address address, SeqnoList seqnoList) {
        this.log.trace("%s <-- XMIT(%s: #%s)", this.local_addr, address, seqnoList);
        SenderEntry senderEntry = this.send_table.get(address);
        this.xmit_reqs_received.addAndGet(seqnoList.size());
        Table<Message> table = senderEntry != null ? senderEntry.sent_msgs : null;
        if (table != null) {
            Iterator<Long> it = seqnoList.iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                Message message = table.get(longValue);
                if (message != null) {
                    this.down_prot.down(new Event(1, message));
                    this.xmit_rsps_sent.incrementAndGet();
                } else if (this.log.isWarnEnabled() && this.log_not_found_msgs && !this.local_addr.equals(address) && longValue > table.getLow()) {
                    this.log.warn(Util.getMessage("MessageNotFound"), this.local_addr, address, Long.valueOf(longValue));
                }
            }
        }
    }

    protected void startRetransmitTask() {
        if (this.xmit_task == null || this.xmit_task.isDone()) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS);
        }
    }

    protected void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    protected void sendAck(Address address, long j, short s) {
        if (this.running) {
            Message putHeader = new Message(address).setFlag(Message.Flag.INTERNAL).putHeader(this.id, Header.createAckHeader(j, s));
            if (this.log.isTraceEnabled()) {
                this.log.trace("%s --> ACK(%s: #%d)", this.local_addr, address, Long.valueOf(j));
            }
            try {
                this.down_prot.down(new Event(1, putHeader));
                this.num_acks_sent++;
            } catch (Throwable th) {
                this.log.error(Util.getMessage("FailedSendingAck"), this.local_addr, Long.valueOf(j), address, th);
            }
        }
    }

    protected synchronized short getNewConnectionId() {
        short s = this.last_conn_id;
        if (this.last_conn_id >= Short.MAX_VALUE || this.last_conn_id < 0) {
            this.last_conn_id = (short) 0;
        } else {
            this.last_conn_id = (short) (this.last_conn_id + 1);
        }
        return s;
    }

    protected void sendRequestForFirstSeqno(Address address) {
        Message putHeader = new Message(address).setFlag(Message.Flag.OOB).putHeader(this.id, Header.createSendFirstSeqnoHeader());
        this.log.trace("%s --> SEND_FIRST_SEQNO(%s)", this.local_addr, address);
        this.down_prot.down(new Event(1, putHeader));
    }

    public void sendClose(Address address, short s) {
        Message putHeader = new Message(address).setFlag(Message.Flag.INTERNAL).putHeader(this.id, Header.createCloseHeader(s));
        this.log.trace("%s --> CLOSE(%s, conn-id=%d)", this.local_addr, address, Short.valueOf(s));
        this.down_prot.down(new Event(1, putHeader));
    }

    @ManagedOperation(description = "Closes connections that have been idle for more than conn_expiry_timeout ms")
    public void closeIdleConnections() {
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            SenderEntry value = entry.getValue();
            if (value.state() == State.OPEN) {
                long age = value.age();
                if (age >= this.conn_expiry_timeout) {
                    this.log.debug("%s: closing expired connection for %s (%d ms old) in send_table", this.local_addr, entry.getKey(), Long.valueOf(age));
                    closeSendConnection(entry.getKey());
                }
            }
        }
        for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
            ReceiverEntry value2 = entry2.getValue();
            if (value2.state() == State.OPEN) {
                long age2 = value2.age();
                if (age2 >= this.conn_expiry_timeout) {
                    this.log.debug("%s: closing expired connection for %s (%d ms old) in recv_table", this.local_addr, entry2.getKey(), Long.valueOf(age2));
                    closeReceiveConnection(entry2.getKey());
                }
            }
        }
    }

    @ManagedOperation(description = "Removes connections that have been closed for more than conn_close_timeout ms")
    public void removeExpiredConnections() {
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            SenderEntry value = entry.getValue();
            if (value.state() != State.OPEN) {
                long age = value.age();
                if (age >= this.conn_close_timeout) {
                    this.log.debug("%s: removing expired connection for %s (%d ms old) from send_table", this.local_addr, entry.getKey(), Long.valueOf(age));
                    removeSendConnection(entry.getKey());
                }
            }
        }
        for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
            ReceiverEntry value2 = entry2.getValue();
            if (value2.state() != State.OPEN) {
                long age2 = value2.age();
                if (age2 >= this.conn_close_timeout) {
                    this.log.debug("%s: removing expired connection for %s (%d ms old) from recv_table", this.local_addr, entry2.getKey(), Long.valueOf(age2));
                    removeReceiveConnection(entry2.getKey());
                }
            }
        }
    }

    @ManagedOperation(description = "Triggers the retransmission task")
    public void triggerXmit() {
        SeqnoList missing;
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            Address key = entry.getKey();
            ReceiverEntry value = entry.getValue();
            Table<Message> table = value != null ? value.received_msgs : null;
            if (table != null && value.sendAck()) {
                sendAck(key, table.getHighestDeliverable(), value.connId());
            }
            if (table != null && table.getNumMissing() > 0 && (missing = table.getMissing()) != null) {
                long last = missing.getLast();
                Long l = this.xmit_task_map.get(key);
                if (l == null) {
                    this.xmit_task_map.put(key, Long.valueOf(last));
                } else {
                    missing.removeHigherThan(l.longValue());
                    if (last > l.longValue()) {
                        this.xmit_task_map.put(key, Long.valueOf(last));
                    }
                    if (missing.size() > 0) {
                        retransmit(missing, key);
                    }
                }
            } else if (!this.xmit_task_map.isEmpty()) {
                this.xmit_task_map.remove(key);
            }
        }
        Iterator<SenderEntry> it = this.send_table.values().iterator();
        while (it.hasNext()) {
            SenderEntry next = it.next();
            Table<Message> table2 = next != null ? next.sent_msgs : null;
            if (table2 != null) {
                long highestDelivered = table2.getHighestDelivered();
                long highestReceived = table2.getHighestReceived();
                if (highestDelivered < highestReceived && next.watermark[0] == highestDelivered && next.watermark[1] == highestReceived) {
                    Message message = table2.get(highestReceived);
                    if (message != null) {
                        retransmit(message);
                    }
                } else {
                    next.watermark(highestDelivered, highestReceived);
                }
            }
        }
        if (this.conn_expiry_timeout > 0) {
            closeIdleConnections();
        }
        if (this.conn_close_timeout > 0) {
            removeExpiredConnections();
        }
    }

    protected void sendPendingAcks() {
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            Address key = entry.getKey();
            ReceiverEntry value = entry.getValue();
            Table<Message> table = value != null ? value.received_msgs : null;
            if (table != null && value.sendAck()) {
                sendAck(key, table.getHighestDeliverable(), value.connId());
            }
        }
    }
}
