package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javassist.compiler.TokenId;
import org.apache.commons.lang.time.DateUtils;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.PropertyConverters;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AgeOutCache;
import org.jgroups.util.Bits;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Table;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description = "Reliable unicast layer")
@Deprecated
/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-7.1.0.Beta1.jar:org/jgroups/protocols/UNICAST.class */
public class UNICAST extends Protocol implements AgeOutCache.Handler<Address> {
    public static final long DEFAULT_FIRST_SEQNO = 1;
    protected Future<?> xmit_task;
    protected Future<?> connection_reaper;

    @Deprecated
    protected int[] timeout = {TokenId.Identifier, 800, 1600, 3200};

    @Property(description = "Max number of messages to be removed from a retransmit window. This property might get removed anytime, so don't use it !")
    protected int max_msg_batch_size = TokenId.BadToken;

    @Property(description = "Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping")
    protected long conn_expiry_timeout = 0;

    @Property(description = "Size (in bytes) of a Segment in the segments table. Only for experts, do not use !", deprecatedMessage = "not used anymore")
    @Deprecated
    protected int segment_capacity = 1000;

    @Property(description = "Number of rows of the matrix in the retransmission table (only for experts)", writable = false)
    protected int xmit_table_num_rows = 100;

    @Property(description = "Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row", writable = false)
    protected int xmit_table_msgs_per_row = 1000;

    @Property(description = "Resize factor of the matrix in the retransmission table (only for experts)", writable = false)
    protected double xmit_table_resize_factor = 1.2d;

    @Property(description = "Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts)", writable = false)
    protected long xmit_table_max_compaction_time = 600000;
    protected long max_retransmit_time = DateUtils.MILLIS_PER_MINUTE;

    @Property(description = "Interval (in milliseconds) at which messages in the send windows are resent")
    protected long xmit_interval = 2000;
    protected long num_msgs_sent = 0;
    protected long num_msgs_received = 0;
    protected long num_acks_sent = 0;
    protected long num_acks_received = 0;
    protected long num_xmits = 0;
    protected final ConcurrentMap<Address, SenderEntry> send_table = Util.createConcurrentMap();
    protected final ConcurrentMap<Address, ReceiverEntry> recv_table = Util.createConcurrentMap();
    protected final ReentrantLock recv_table_lock = new ReentrantLock();
    protected volatile List<Address> members = new ArrayList(11);
    protected Address local_addr = null;
    protected TimeScheduler timer = null;
    protected volatile boolean running = false;
    protected short last_conn_id = 0;
    protected AgeOutCache<Address> cache = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-7.1.0.Beta1.jar:org/jgroups/protocols/UNICAST$ConnectionReaper.class */
    public class ConnectionReaper implements Runnable {
        protected ConnectionReaper() {
        }

        @Override // java.lang.Runnable
        public void run() {
            UNICAST.this.reapIdleConnections();
        }

        public String toString() {
            return UNICAST.class.getSimpleName() + ": ConnectionReaper (interval=" + UNICAST.this.conn_expiry_timeout + " ms)";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-7.1.0.Beta1.jar:org/jgroups/protocols/UNICAST$ReceiverEntry.class */
    public static final class ReceiverEntry {
        protected final Table<Message> received_msgs;
        protected final short recv_conn_id;
        protected final AtomicLong timestamp = new AtomicLong(0);

        public ReceiverEntry(Table<Message> table, short s) {
            this.received_msgs = table;
            this.recv_conn_id = s;
            update();
        }

        void update() {
            this.timestamp.set(System.currentTimeMillis());
        }

        long age() {
            return System.currentTimeMillis() - this.timestamp.longValue();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.received_msgs != null) {
                sb.append(this.received_msgs).append(RecoveryAdminOperations.SEPARATOR);
            }
            sb.append("recv_conn_id=" + ((int) this.recv_conn_id));
            sb.append(" (" + age() + " ms old)");
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-7.1.0.Beta1.jar:org/jgroups/protocols/UNICAST$RetransmitTask.class */
    public class RetransmitTask implements Runnable {
        protected RetransmitTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            List<Message> list;
            Iterator<SenderEntry> it = UNICAST.this.send_table.values().iterator();
            while (it.hasNext()) {
                SenderEntry next = it.next();
                Table<Message> table = next != null ? next.sent_msgs : null;
                if (table != null && !table.isEmpty() && (list = table.get(table.getHighestDelivered() + 1, table.getHighestReceived())) != null) {
                    Iterator<Message> it2 = list.iterator();
                    while (it2.hasNext()) {
                        UNICAST.this.retransmit(it2.next());
                    }
                }
            }
        }

        public String toString() {
            return UNICAST.class.getSimpleName() + ": RetransmitTask (interval=" + UNICAST.this.xmit_interval + " ms)";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-7.1.0.Beta1.jar:org/jgroups/protocols/UNICAST$SenderEntry.class */
    public final class SenderEntry {
        final Table<Message> sent_msgs;
        final short send_conn_id;
        final AtomicLong sent_msgs_seqno = new AtomicLong(1);
        protected final AtomicLong timestamp = new AtomicLong(0);
        final Lock lock = new ReentrantLock();

        public SenderEntry(short s) {
            this.send_conn_id = s;
            this.sent_msgs = new Table<>(UNICAST.this.xmit_table_num_rows, UNICAST.this.xmit_table_msgs_per_row, 0L, UNICAST.this.xmit_table_resize_factor, UNICAST.this.xmit_table_max_compaction_time);
            update();
        }

        void update() {
            this.timestamp.set(System.currentTimeMillis());
        }

        long age() {
            return System.currentTimeMillis() - this.timestamp.longValue();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.sent_msgs != null) {
                sb.append(this.sent_msgs).append(RecoveryAdminOperations.SEPARATOR);
            }
            sb.append("send_conn_id=" + ((int) this.send_conn_id)).append(" (" + age() + " ms old)");
            return sb.toString();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-7.1.0.Beta1.jar:org/jgroups/protocols/UNICAST$UnicastHeader.class */
    public static class UnicastHeader extends Header {
        public static final byte DATA = 0;
        public static final byte ACK = 1;
        public static final byte SEND_FIRST_SEQNO = 2;
        byte type;
        long seqno;
        short conn_id;
        boolean first;

        public UnicastHeader() {
        }

        public static UnicastHeader createDataHeader(long j, short s, boolean z) {
            return new UnicastHeader((byte) 0, j, s, z);
        }

        public static UnicastHeader createAckHeader(long j, short s) {
            return new UnicastHeader((byte) 1, j, s, false);
        }

        public static UnicastHeader createSendFirstSeqnoHeader(long j) {
            return new UnicastHeader((byte) 2, j);
        }

        protected UnicastHeader(byte b, long j) {
            this.type = b;
            this.seqno = j;
        }

        protected UnicastHeader(byte b, long j, short s, boolean z) {
            this.type = b;
            this.seqno = j;
            this.conn_id = s;
            this.first = z;
        }

        public long getSeqno() {
            return this.seqno;
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(type2Str(this.type)).append(", seqno=").append(this.seqno);
            if (this.conn_id != 0) {
                sb.append(", conn_id=").append((int) this.conn_id);
            }
            if (this.first) {
                sb.append(", first");
            }
            return sb.toString();
        }

        public static String type2Str(byte b) {
            switch (b) {
                case 0:
                    return "DATA";
                case 1:
                    return "ACK";
                case 2:
                    return "SEND_FIRST_SEQNO";
                default:
                    return "<unknown>";
            }
        }

        @Override // org.jgroups.Header
        public final int size() {
            int i = 1;
            switch (this.type) {
                case 0:
                    i = 1 + Bits.size(this.seqno) + 2 + 1;
                    break;
                case 1:
                    i = 1 + Bits.size(this.seqno) + 2;
                    break;
                case 2:
                    i = 1 + Bits.size(this.seqno);
                    break;
            }
            return i;
        }

        public UnicastHeader copy() {
            return new UnicastHeader(this.type, this.seqno, this.conn_id, this.first);
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeByte(this.type);
            switch (this.type) {
                case 0:
                    Bits.writeLong(this.seqno, dataOutput);
                    dataOutput.writeShort(this.conn_id);
                    dataOutput.writeBoolean(this.first);
                    return;
                case 1:
                    Bits.writeLong(this.seqno, dataOutput);
                    dataOutput.writeShort(this.conn_id);
                    return;
                case 2:
                    Bits.writeLong(this.seqno, dataOutput);
                    return;
                default:
                    return;
            }
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readByte();
            switch (this.type) {
                case 0:
                    this.seqno = Bits.readLong(dataInput);
                    this.conn_id = dataInput.readShort();
                    this.first = dataInput.readBoolean();
                    return;
                case 1:
                    this.seqno = Bits.readLong(dataInput);
                    this.conn_id = dataInput.readShort();
                    return;
                case 2:
                    this.seqno = Bits.readLong(dataInput);
                    return;
                default:
                    return;
            }
        }
    }

    public int[] getTimeout() {
        return this.timeout;
    }

    @Property(name = "timeout", converter = PropertyConverters.IntegerArray.class, deprecatedMessage = "not used anymore")
    @Deprecated
    public void setTimeout(int[] iArr) {
        if (iArr != null) {
            this.timeout = iArr;
        }
    }

    public void setMaxMessageBatchSize(int i) {
        if (i >= 1) {
            this.max_msg_batch_size = i;
        }
    }

    @ManagedAttribute
    public String getLocalAddress() {
        return this.local_addr != null ? this.local_addr.toString() : "null";
    }

    @ManagedAttribute
    public String getMembers() {
        return this.members.toString();
    }

    @ManagedAttribute(description = "Whether the ConnectionReaper task is running")
    public boolean isConnectionReaperRunning() {
        return (this.connection_reaper == null || this.connection_reaper.isDone()) ? false : true;
    }

    @ManagedAttribute(description = "Returns the number of outgoing (send) connections")
    public int getNumSendConnections() {
        return this.send_table.size();
    }

    @ManagedAttribute(description = "Returns the number of incoming (receive) connections")
    public int getNumReceiveConnections() {
        return this.recv_table.size();
    }

    @ManagedAttribute(description = "Returns the total number of outgoing (send) and incoming (receive) connections")
    public int getNumConnections() {
        return getNumReceiveConnections() + getNumSendConnections();
    }

    @ManagedOperation
    public String printConnections() {
        StringBuilder sb = new StringBuilder();
        if (!this.send_table.isEmpty()) {
            sb.append("\nsend connections:\n");
            for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
            }
        }
        if (!this.recv_table.isEmpty()) {
            sb.append("\nreceive connections:\n");
            for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
                sb.append(entry2.getKey()).append(": ").append(entry2.getValue()).append("\n");
            }
        }
        return sb.toString();
    }

    @ManagedAttribute
    public long getNumMessagesSent() {
        return this.num_msgs_sent;
    }

    @ManagedAttribute
    public long getNumMessagesReceived() {
        return this.num_msgs_received;
    }

    @ManagedAttribute
    public long getNumAcksSent() {
        return this.num_acks_sent;
    }

    @ManagedAttribute
    public long getNumAcksReceived() {
        return this.num_acks_received;
    }

    @ManagedAttribute
    public long getNumXmits() {
        return this.num_xmits;
    }

    public long getMaxRetransmitTime() {
        return this.max_retransmit_time;
    }

    @Property(description = "Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this")
    public void setMaxRetransmitTime(long j) {
        this.max_retransmit_time = j;
        if (this.cache == null || j <= 0) {
            return;
        }
        this.cache.setTimeout(j);
    }

    @ManagedAttribute(description = "Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return (this.xmit_task == null || this.xmit_task.isDone()) ? false : true;
    }

    @ManagedAttribute
    public int getAgeOutCacheSize() {
        if (this.cache != null) {
            return this.cache.size();
        }
        return 0;
    }

    @ManagedOperation
    public String printAgeOutCache() {
        return this.cache != null ? this.cache.toString() : "n/a";
    }

    public AgeOutCache<Address> getAgeOutCache() {
        return this.cache;
    }

    public boolean hasSendConnectionTo(Address address) {
        return this.send_table.containsKey(address);
    }

    @ManagedAttribute
    public int getNumUnackedMessages() {
        int i = 0;
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.size();
            }
        }
        return i;
    }

    @ManagedAttribute
    public int getNumberOfMessagesInReceiveWindows() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.size();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Total number of undelivered messages in all receive windows")
    public long getXmitTableUndeliveredMessages() {
        long j = 0;
        Iterator<ReceiverEntry> it = this.recv_table.values().iterator();
        while (it.hasNext()) {
            if (it.next().received_msgs != null) {
                j += r0.received_msgs.size();
            }
        }
        return j;
    }

    @ManagedAttribute(description = "Total number of missing messages in all receive windows")
    public long getXmitTableMissingMessages() {
        long j = 0;
        Iterator<ReceiverEntry> it = this.recv_table.values().iterator();
        while (it.hasNext()) {
            if (it.next().received_msgs != null) {
                j += r0.received_msgs.getNumMissing();
            }
        }
        return j;
    }

    @ManagedAttribute(description = "Number of compactions in all (receive and send) windows")
    public int getXmitTableNumCompactions() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumCompactions();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumCompactions();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Number of moves in all (receive and send) windows")
    public int getXmitTableNumMoves() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumMoves();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumMoves();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Number of resizes in all (receive and send) windows")
    public int getXmitTableNumResizes() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumResizes();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumResizes();
            }
        }
        return i;
    }

    @ManagedAttribute(description = "Number of purges in all (receive and send) windows")
    public int getXmitTableNumPurges() {
        int i = 0;
        for (ReceiverEntry receiverEntry : this.recv_table.values()) {
            if (receiverEntry.received_msgs != null) {
                i += receiverEntry.received_msgs.getNumPurges();
            }
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            if (senderEntry.sent_msgs != null) {
                i += senderEntry.sent_msgs.getNumPurges();
            }
        }
        return i;
    }

    @ManagedOperation(description = "Prints the contents of the receive windows for all members")
    public String printReceiveWindowMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().received_msgs.toString()).append('\n');
        }
        return sb.toString();
    }

    @ManagedOperation(description = "Prints the contents of the send windows for all members")
    public String printSendWindowMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().sent_msgs.toString()).append('\n');
        }
        return sb.toString();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v0, types: [org.jgroups.protocols.UNICAST] */
    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        ?? r4 = 0;
        this.num_acks_received = 0L;
        this.num_acks_sent = 0L;
        r4.num_msgs_received = this;
        this.num_msgs_sent = this;
        this.num_xmits = 0L;
    }

    @Override // org.jgroups.stack.Protocol
    public Map<String, Object> dumpStats() {
        Map<String, Object> dumpStats = super.dumpStats();
        dumpStats.put("num_unacked_msgs", Integer.valueOf(getNumUnackedMessages()));
        dumpStats.put("num_msgs_in_recv_windows", Integer.valueOf(getNumberOfMessagesInReceiveWindows()));
        return dumpStats;
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        this.timer = getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        if (this.max_retransmit_time > 0) {
            this.cache = new AgeOutCache<>(this.timer, this.max_retransmit_time, this);
        }
        this.running = true;
        if (this.conn_expiry_timeout > 0) {
            startConnectionReaper();
        }
        startRetransmitTask();
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        this.running = false;
        stopRetransmitTask();
        stopConnectionReaper();
        removeAllConnections();
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        UnicastHeader unicastHeader;
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (message.getDest() != null && !message.isFlagSet(Message.Flag.NO_RELIABILITY) && (unicastHeader = (UnicastHeader) message.getHeader(this.id)) != null) {
                    Address src = message.getSrc();
                    switch (unicastHeader.type) {
                        case 0:
                            handleDataReceived(src, unicastHeader.seqno, unicastHeader.conn_id, unicastHeader.first, message, event);
                            return null;
                        default:
                            handleUpEvent(src, unicastHeader);
                            return null;
                    }
                }
                break;
        }
        return this.up_prot.up(event);
    }

    protected void handleUpEvent(Address address, UnicastHeader unicastHeader) {
        switch (unicastHeader.type) {
            case 0:
                throw new IllegalStateException("header of type DATA is not supposed to be handled by this method");
            case 1:
                handleAckReceived(address, unicastHeader.seqno, unicastHeader.conn_id);
                return;
            case 2:
                handleResendingOfFirstMessage(address, unicastHeader.seqno);
                return;
            default:
                this.log.error("UnicastHeader type " + ((int) unicastHeader.type) + " not known !");
                return;
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void up(MessageBatch messageBatch) {
        UnicastHeader unicastHeader;
        if (messageBatch.dest() == null) {
            this.up_prot.up(messageBatch);
            return;
        }
        int size = messageBatch.size();
        TreeMap treeMap = new TreeMap();
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (next != null && !next.isFlagSet(Message.Flag.NO_RELIABILITY) && (unicastHeader = (UnicastHeader) next.getHeader(this.id)) != null) {
                messageBatch.remove(next);
                if (unicastHeader.type != 0) {
                    try {
                        handleUpEvent(next.getSrc(), unicastHeader);
                    } catch (Throwable th) {
                        this.log.error(this.local_addr + ": failed handling event", th);
                    }
                } else {
                    List<Message> list = treeMap.get(Short.valueOf(unicastHeader.conn_id));
                    if (list == null) {
                        Short valueOf = Short.valueOf(unicastHeader.conn_id);
                        ArrayList arrayList = new ArrayList(size);
                        list = arrayList;
                        treeMap.put(valueOf, arrayList);
                    }
                    list.add(next);
                }
            }
        }
        if (!treeMap.isEmpty()) {
            handleBatchReceived(messageBatch.sender(), treeMap);
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    /* JADX WARN: Removed duplicated region for block: B:41:0x0198  */
    @Override // org.jgroups.stack.Protocol
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.lang.Object down(org.jgroups.Event r11) {
        /*
            Method dump skipped, instructions count: 711
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.jgroups.protocols.UNICAST.down(org.jgroups.Event):java.lang.Object");
    }

    public void removeConnection(Address address) {
        removeSendConnection(address);
        removeReceiveConnection(address);
    }

    public void removeSendConnection(Address address) {
        this.send_table.remove(address);
    }

    public void removeReceiveConnection(Address address) {
        this.recv_table.remove(address);
    }

    @ManagedOperation(description = "Trashes all connections to other nodes. This is only used for testing")
    public void removeAllConnections() {
        this.send_table.clear();
        this.recv_table.clear();
    }

    public void retransmit(Message message) {
        if (this.log.isTraceEnabled()) {
            UnicastHeader unicastHeader = (UnicastHeader) message.getHeader(this.id);
            this.log.trace(this.local_addr + " --> XMIT(" + message.getDest() + ": #" + (unicastHeader != null ? unicastHeader.seqno : -1L) + ')');
        }
        this.down_prot.down(new Event(1, message));
        this.num_xmits++;
    }

    @Override // org.jgroups.util.AgeOutCache.Handler
    public void expired(Address address) {
        if (address != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("removing connection to " + address + " because it expired");
            }
            removeConnection(address);
        }
    }

    protected void handleDataReceived(Address address, long j, short s, boolean z, Message message, Event event) {
        if (this.log.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append(this.local_addr).append(" <-- DATA(").append(address).append(": #").append(j);
            if (s != 0) {
                sb.append(", conn_id=").append((int) s);
            }
            if (z) {
                sb.append(", first");
            }
            sb.append(')');
            this.log.trace(sb);
        }
        ReceiverEntry receiverEntry = getReceiverEntry(address, j, z, s);
        if (receiverEntry == null) {
            return;
        }
        if (this.conn_expiry_timeout > 0) {
            receiverEntry.update();
        }
        Table<Message> table = receiverEntry.received_msgs;
        boolean add = table.add(j, (long) message);
        this.num_msgs_received++;
        if (message.isFlagSet(Message.Flag.OOB) && add) {
            try {
                this.up_prot.up(event);
            } catch (Throwable th) {
                this.log.error("couldn't deliver OOB message " + message, th);
            }
        }
        AtomicBoolean processing = table.getProcessing();
        if (processing.compareAndSet(false, true)) {
            removeAndDeliver(processing, table, address);
            sendAck(address, table.getHighestDelivered(), s);
        }
    }

    protected void handleBatchReceived(Address address, Map<Short, List<Message>> map) {
        for (Map.Entry<Short, List<Message>> entry : map.entrySet()) {
            List<Message> value = entry.getValue();
            if (this.log.isTraceEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append(this.local_addr).append(" <-- DATA(").append(address).append(": " + printMessageList(value)).append(')');
                this.log.trace(sb);
            }
            short shortValue = entry.getKey().shortValue();
            ReceiverEntry receiverEntry = null;
            for (Message message : value) {
                UnicastHeader unicastHeader = (UnicastHeader) message.getHeader(this.id);
                receiverEntry = getReceiverEntry(address, unicastHeader.seqno, unicastHeader.first, shortValue);
                if (receiverEntry != null) {
                    boolean add = receiverEntry.received_msgs.add(unicastHeader.seqno, (long) message);
                    this.num_msgs_received++;
                    if (unicastHeader.first && add) {
                        sendAck(address, unicastHeader.seqno, shortValue);
                    }
                    if (message.isFlagSet(Message.Flag.OOB) && add) {
                        try {
                            this.up_prot.up(new Event(1, message));
                        } catch (Throwable th) {
                            this.log.error("couldn't deliver OOB message " + message, th);
                        }
                    }
                }
            }
            if (receiverEntry != null && this.conn_expiry_timeout > 0) {
                receiverEntry.update();
            }
        }
        ReceiverEntry receiverEntry2 = this.recv_table.get(address);
        Table<Message> table = receiverEntry2 != null ? receiverEntry2.received_msgs : null;
        if (table != null) {
            AtomicBoolean processing = table.getProcessing();
            if (processing.compareAndSet(false, true)) {
                removeAndDeliver(processing, table, address);
                sendAck(address, table.getHighestDeliverable(), receiverEntry2.recv_conn_id);
            }
        }
    }

    protected int removeAndDeliver(AtomicBoolean atomicBoolean, Table<Message> table, Address address) {
        while (true) {
            try {
                List<Message> removeMany = table.removeMany(atomicBoolean, true, this.max_msg_batch_size);
                if (removeMany == null) {
                    break;
                }
                MessageBatch messageBatch = new MessageBatch(this.local_addr, address, null, false, removeMany);
                Iterator<Message> it = messageBatch.iterator();
                while (it.hasNext()) {
                    Message next = it.next();
                    if (next.isFlagSet(Message.Flag.OOB)) {
                        messageBatch.remove(next);
                    }
                }
                try {
                    if (this.log.isTraceEnabled()) {
                        Message first = messageBatch.first();
                        Message last = messageBatch.last();
                        StringBuilder sb = new StringBuilder(this.local_addr + ": delivering");
                        if (first != null && last != null) {
                            sb.append(" #").append(((UnicastHeader) first.getHeader(this.id)).seqno).append(" - #").append(((UnicastHeader) last.getHeader(this.id)).seqno);
                        }
                        sb.append(" (" + messageBatch.size()).append(" messages)");
                        this.log.trace(sb);
                    }
                    this.up_prot.up(messageBatch);
                } catch (Throwable th) {
                    this.log.error("failed to deliver batch " + messageBatch, th);
                }
            } catch (Throwable th2) {
                if (0 == 0) {
                    atomicBoolean.set(false);
                }
                throw th2;
            }
        }
        if (1 == 0) {
            atomicBoolean.set(false);
        }
        return 0;
    }

    protected ReceiverEntry getReceiverEntry(Address address, long j, boolean z, short s) {
        ReceiverEntry receiverEntry = this.recv_table.get(address);
        if (receiverEntry != null && receiverEntry.recv_conn_id == s) {
            return receiverEntry;
        }
        this.recv_table_lock.lock();
        try {
            ReceiverEntry receiverEntry2 = this.recv_table.get(address);
            if (z) {
                if (receiverEntry2 == null) {
                    receiverEntry2 = getOrCreateReceiverEntry(address, j, s);
                } else if (s != receiverEntry2.recv_conn_id) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": conn_id=" + ((int) s) + " != " + ((int) receiverEntry2.recv_conn_id) + "; resetting receiver window");
                    }
                    this.recv_table.remove(address);
                    receiverEntry2 = getOrCreateReceiverEntry(address, j, s);
                }
            } else if (receiverEntry2 == null || receiverEntry2.recv_conn_id != s) {
                this.recv_table_lock.unlock();
                sendRequestForFirstSeqno(address, j);
                if (this.recv_table_lock.isHeldByCurrentThread()) {
                    this.recv_table_lock.unlock();
                }
                return null;
            }
            return receiverEntry2;
        } finally {
            if (this.recv_table_lock.isHeldByCurrentThread()) {
                this.recv_table_lock.unlock();
            }
        }
    }

    protected ReceiverEntry getOrCreateReceiverEntry(Address address, long j, short s) {
        ReceiverEntry receiverEntry = new ReceiverEntry(new Table(this.xmit_table_num_rows, this.xmit_table_msgs_per_row, j - 1, this.xmit_table_resize_factor, this.xmit_table_max_compaction_time), s);
        ReceiverEntry putIfAbsent = this.recv_table.putIfAbsent(address, receiverEntry);
        if (putIfAbsent != null) {
            return putIfAbsent;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": created receiver window for " + address + " at seqno=#" + j + " for conn-id=" + ((int) s));
        }
        return receiverEntry;
    }

    protected void handleAckReceived(Address address, long j, short s) {
        if (this.log.isTraceEnabled()) {
            this.log.trace(new StringBuilder().append(this.local_addr).append(" <-- ACK(").append(address).append(": #").append(j).append(", conn-id=").append((int) s).append(')'));
        }
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry != null && senderEntry.send_conn_id != s) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": my conn_id (" + ((int) senderEntry.send_conn_id) + ") != received conn_id (" + ((int) s) + "); discarding ACK");
            }
        } else {
            Table<Message> table = senderEntry != null ? senderEntry.sent_msgs : null;
            if (table != null) {
                table.purge(j, true);
                this.num_acks_received++;
            }
        }
    }

    protected void handleResendingOfFirstMessage(Address address, long j) {
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + " <-- SEND_FIRST_SEQNO(" + address + "," + j + ")");
        }
        SenderEntry senderEntry = this.send_table.get(address);
        Table<Message> table = senderEntry != null ? senderEntry.sent_msgs : null;
        if (table == null) {
            if (this.log.isWarnEnabled()) {
                this.log.warn(this.local_addr + ": sender window for " + address + " not found");
                return;
            }
            return;
        }
        boolean z = false;
        long low = table.getLow();
        while (true) {
            long j2 = low + 1;
            if (j2 > j) {
                return;
            }
            Message message = table.get(j2);
            if (message != null) {
                if (z) {
                    this.down_prot.down(new Event(1, message));
                } else {
                    z = true;
                    Message copy = message.copy();
                    UnicastHeader copy2 = ((UnicastHeader) copy.getHeader(this.id)).copy();
                    copy2.first = true;
                    copy.putHeader(this.id, copy2);
                    this.down_prot.down(new Event(1, copy));
                }
            }
            low = j2;
        }
    }

    protected void startRetransmitTask() {
        if (this.xmit_task == null || this.xmit_task.isDone()) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS);
        }
    }

    protected void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    protected void sendAck(Address address, long j, short s) {
        if (this.running) {
            Message putHeader = new Message(address).setFlag(Message.Flag.INTERNAL).putHeader(this.id, UnicastHeader.createAckHeader(j, s));
            if (this.log.isTraceEnabled()) {
                this.log.trace(new StringBuilder().append(this.local_addr).append(" --> ACK(").append(address).append(": #").append(j).append(')'));
            }
            try {
                this.down_prot.down(new Event(1, putHeader));
                this.num_acks_sent++;
            } catch (Throwable th) {
                this.log.error("failed sending ACK(" + j + ") to " + address, th);
            }
        }
    }

    protected synchronized void startConnectionReaper() {
        if (this.connection_reaper == null || this.connection_reaper.isDone()) {
            this.connection_reaper = this.timer.scheduleWithFixedDelay(new ConnectionReaper(), this.conn_expiry_timeout, this.conn_expiry_timeout, TimeUnit.MILLISECONDS);
        }
    }

    protected synchronized void stopConnectionReaper() {
        if (this.connection_reaper != null) {
            this.connection_reaper.cancel(false);
        }
    }

    protected synchronized short getNewConnectionId() {
        short s = this.last_conn_id;
        if (this.last_conn_id >= Short.MAX_VALUE || this.last_conn_id < 0) {
            this.last_conn_id = (short) 0;
        } else {
            this.last_conn_id = (short) (this.last_conn_id + 1);
        }
        return s;
    }

    protected void sendRequestForFirstSeqno(Address address, long j) {
        Message flag = new Message(address).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL);
        flag.putHeader(this.id, UnicastHeader.createSendFirstSeqnoHeader(j));
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + " --> SEND_FIRST_SEQNO(" + address + "," + j + ")");
        }
        this.down_prot.down(new Event(1, flag));
    }

    @ManagedOperation(description = "Closes connections that have been idle for more than conn_expiry_timeout ms")
    public void reapIdleConnections() {
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            long age = entry.getValue().age();
            if (age >= this.conn_expiry_timeout) {
                removeSendConnection(entry.getKey());
                if (this.log.isDebugEnabled()) {
                    this.log.debug(this.local_addr + ": removed expired connection for " + entry.getKey() + " (" + age + " ms old) from send_table");
                }
            }
        }
        for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
            long age2 = entry2.getValue().age();
            if (age2 >= this.conn_expiry_timeout) {
                removeReceiveConnection(entry2.getKey());
                if (this.log.isDebugEnabled()) {
                    this.log.debug(this.local_addr + ": removed expired connection for " + entry2.getKey() + " (" + age2 + " ms old) from recv_table");
                }
            }
        }
    }

    protected String printMessageList(List<Message> list) {
        UnicastHeader unicastHeader;
        UnicastHeader unicastHeader2;
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        Message message = size > 0 ? list.get(0) : null;
        Message message2 = size > 1 ? list.get(size - 1) : message;
        if (message != null && (unicastHeader2 = (UnicastHeader) message.getHeader(this.id)) != null) {
            sb.append("#" + unicastHeader2.seqno);
        }
        if (message2 != null && (unicastHeader = (UnicastHeader) message2.getHeader(this.id)) != null) {
            sb.append(" - #" + unicastHeader.seqno);
        }
        return sb.toString();
    }
}
