package org.jgroups.protocols.pbcast;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.PropertyConverters;
import org.jgroups.protocols.TP;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.ExponentialInterval;
import org.jgroups.stack.NakReceiverWindow;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.Retransmitter;
import org.jgroups.stack.StaticInterval;
import org.jgroups.util.BoundedList;
import org.jgroups.util.Digest;
import org.jgroups.util.SuppressLog;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description = "Reliable transmission multipoint FIFO protocol")
/* loaded from: input_file:WEB-INF/lib/jgroups-3.2.0.CR1.jar:org/jgroups/protocols/pbcast/NAKACK.class */
public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, DiagnosticsHandler.ProbeHandler {
    private static final int NUM_REBROADCAST_MSGS = 3;
    private View view;
    protected BoundedList<Message> become_server_queue;
    protected SuppressLog<Address> suppress_log_non_member;

    @Property(name = "retransmit_timeout", converter = PropertyConverters.IntegerArray.class, description = "Timeout before requesting retransmissions")
    private int[] retransmit_timeouts = {600, 1200, 2400, 4800};

    @Property(description = "Max number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it !")
    private int max_msg_batch_size = 100;

    @Property(description = "Retransmit retransmit responses (messages) using multicast rather than unicast")
    private boolean use_mcast_xmit = true;

    @Property(description = "Use a multicast to request retransmission of missing messages")
    private boolean use_mcast_xmit_req = false;

    @Property(description = "Number of milliseconds to delay the sending of an XMIT request. We pick a random number in the range [1 .. xmit_req_stagger_timeout] and add this to the scheduling time of an XMIT request. When use_mcast_xmit is enabled, if a number of members drop messages from the same member, then chances are that, if staggering is enabled, somebody else already sent the XMIT request (via mcast) and we can cancel the XMIT request once we receive the missing messages. For unicast XMIT responses (use_mcast_xmit=false), we still have an advantage by not overwhelming the receiver with XMIT requests, all at the same time. 0 disabless staggering.")
    protected long xmit_stagger_timeout = 200;

    @Property(description = "Ask a random member for retransmission of a missing message. Default is false")
    private boolean xmit_from_random_member = false;

    @Property(description = "The first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0")
    private int exponential_backoff = 300;

    @Property(description = "Whether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur")
    private boolean use_range_based_retransmitter = true;

    @Property(description = "Should messages delivered to application be discarded")
    private boolean discard_delivered_msgs = true;

    @Property(description = "Timeout to rebroadcast messages. Default is 2000 msec")
    private long max_rebroadcast_timeout = 2000;

    @Property(description = "Should stability history be printed if we fail in retransmission. Default is false")
    protected boolean print_stability_history_on_failed_xmit = false;

    @Property(description = "discards warnings about promiscuous traffic")
    private boolean log_discard_msgs = true;

    @Property(description = "If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
    private boolean log_not_found_msgs = true;

    @Property(description = "Number of rows of the matrix in the retransmission table (only for experts)", writable = false)
    int xmit_table_num_rows = 5;

    @Property(description = "Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row", writable = false)
    int xmit_table_msgs_per_row = 10000;

    @Property(description = "Resize factor of the matrix in the retransmission table (only for experts)", writable = false)
    double xmit_table_resize_factor = 1.2d;

    @Property(description = "Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts)", writable = false)
    long xmit_table_max_compaction_time = 600000;

    @Property(description = "Size of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue.")
    protected int become_server_queue_size = 50;

    @Property(description = "Time during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.")
    protected long suppress_time_non_member_warnings = HornetQClient.DEFAULT_CONNECTION_TTL;

    @ManagedAttribute(description = "Number of retransmit requests received")
    private final AtomicLong xmit_reqs_received = new AtomicLong(0);

    @ManagedAttribute(description = "Number of retransmit requests sent")
    private final AtomicLong xmit_reqs_sent = new AtomicLong(0);

    @ManagedAttribute(description = "Number of retransmit responses received")
    private final AtomicLong xmit_rsps_received = new AtomicLong(0);

    @ManagedAttribute(description = "Number of retransmit responses sent")
    private final AtomicLong xmit_rsps_sent = new AtomicLong(0);

    @ManagedAttribute(description = "Number of messages sent")
    protected int num_messages_sent = 0;

    @ManagedAttribute(description = "Number of messages received")
    protected int num_messages_received = 0;
    private volatile boolean is_server = false;
    private Address local_addr = null;
    private final List<Address> members = new CopyOnWriteArrayList();
    private final AtomicLong seqno = new AtomicLong(0);
    private final ConcurrentMap<Address, NakReceiverWindow> xmit_table = Util.createConcurrentMap();
    private volatile boolean leaving = false;
    private volatile boolean running = false;
    private TimeScheduler timer = null;
    private final Lock rebroadcast_lock = new ReentrantLock();
    private final Condition rebroadcast_done = this.rebroadcast_lock.newCondition();
    private volatile boolean rebroadcasting = false;
    private final Lock rebroadcast_digest_lock = new ReentrantLock();
    private Digest rebroadcast_digest = null;
    protected final BoundedList<Digest> stability_msgs = new BoundedList<>(10);
    protected final BoundedList<String> digest_history = new BoundedList<>(10);

    @ManagedAttribute(description = "Number of messages from non-members")
    public int getNonMemberMessages() {
        if (this.suppress_log_non_member != null) {
            return this.suppress_log_non_member.getCache().size();
        }
        return 0;
    }

    @ManagedOperation(description = "Clears the cache for messages from non-members")
    public void clearNonMemberCache() {
        if (this.suppress_log_non_member != null) {
            this.suppress_log_non_member.getCache().clear();
        }
    }

    public long getXmitRequestsReceived() {
        return this.xmit_reqs_received.get();
    }

    public long getXmitRequestsSent() {
        return this.xmit_reqs_sent.get();
    }

    public long getXmitResponsesReceived() {
        return this.xmit_rsps_received.get();
    }

    public long getXmitResponsesSent() {
        return this.xmit_rsps_sent.get();
    }

    @ManagedAttribute(description = "Total number of missing messages")
    public int getPendingXmitRequests() {
        int i = 0;
        Iterator<NakReceiverWindow> it = this.xmit_table.values().iterator();
        while (it.hasNext()) {
            i += it.next().getPendingXmits();
        }
        return i;
    }

    @ManagedAttribute
    public int getXmitTableSize() {
        int i = 0;
        Iterator<NakReceiverWindow> it = this.xmit_table.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @ManagedAttribute
    public int getXmitTableMissingMessages() {
        int i = 0;
        Iterator<NakReceiverWindow> it = this.xmit_table.values().iterator();
        while (it.hasNext()) {
            i += it.next().getMissingMessages();
        }
        return i;
    }

    @ManagedAttribute(description = "Returns the number of bytes of all messages in all NakReceiverWindows. To compute the size, Message.getLength() is used")
    public long getSizeOfAllMessages() {
        long j = 0;
        Iterator<NakReceiverWindow> it = this.xmit_table.values().iterator();
        while (it.hasNext()) {
            j += it.next().sizeOfAllMessages(false);
        }
        return j;
    }

    @ManagedAttribute(description = "Returns the number of bytes of all messages in all NakReceiverWindows. To compute the size, Message.size() is used")
    public long getSizeOfAllMessagesInclHeaders() {
        long j = 0;
        Iterator<NakReceiverWindow> it = this.xmit_table.values().iterator();
        while (it.hasNext()) {
            j += it.next().sizeOfAllMessages(true);
        }
        return j;
    }

    @ManagedAttribute
    public long getCurrentSeqno() {
        return this.seqno.get();
    }

    @ManagedOperation
    public String printRetransmitStats() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Address, NakReceiverWindow> entry : this.xmit_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().printRetransmitStats()).append("\n");
        }
        return sb.toString();
    }

    public NakReceiverWindow getWindow(Address address) {
        return this.xmit_table.get(address);
    }

    public void setTimer(TimeScheduler timeScheduler) {
        this.timer = timeScheduler;
    }

    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        this.num_messages_received = 0;
        this.num_messages_sent = 0;
        this.xmit_reqs_received.set(0L);
        this.xmit_reqs_sent.set(0L);
        this.xmit_rsps_received.set(0L);
        this.xmit_rsps_sent.set(0L);
        this.stability_msgs.clear();
        this.digest_history.clear();
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        if (this.xmit_from_random_member && this.discard_delivered_msgs) {
            this.discard_delivered_msgs = false;
            this.log.debug("xmit_from_random_member set to true: changed discard_delivered_msgs to false");
        }
        TP transport = getTransport();
        if (transport != null) {
            transport.registerProbeHandler(this);
            if (!transport.supportsMulticasting()) {
                if (this.use_mcast_xmit) {
                    this.log.warn("use_mcast_xmit should not be used because the transport (" + transport.getName() + ") does not support IP multicasting; setting use_mcast_xmit to false");
                    this.use_mcast_xmit = false;
                }
                if (this.use_mcast_xmit_req) {
                    this.log.warn("use_mcast_xmit_req should not be used because the transport (" + transport.getName() + ") does not support IP multicasting; setting use_mcast_xmit_req to false");
                    this.use_mcast_xmit_req = false;
                }
            }
        }
        if (this.become_server_queue_size > 0) {
            this.become_server_queue = new BoundedList<>(this.become_server_queue_size);
        }
        if (this.suppress_time_non_member_warnings > 0) {
            this.suppress_log_non_member = new SuppressLog<>(this.log, "MsgDroppedNak", "SuppressMsg");
        }
    }

    public boolean isUseMcastXmit() {
        return this.use_mcast_xmit;
    }

    public void setUseMcastXmit(boolean z) {
        this.use_mcast_xmit = z;
    }

    public boolean isXmitFromRandomMember() {
        return this.xmit_from_random_member;
    }

    public void setXmitFromRandomMember(boolean z) {
        this.xmit_from_random_member = z;
    }

    public boolean isDiscardDeliveredMsgs() {
        return this.discard_delivered_msgs;
    }

    public void setDiscardDeliveredMsgs(boolean z) {
        this.discard_delivered_msgs = z;
    }

    public void setLogDiscardMessages(boolean z) {
        this.log_discard_msgs = z;
    }

    public void setLogDiscardMsgs(boolean z) {
        setLogDiscardMessages(z);
    }

    public boolean getLogDiscardMessages() {
        return this.log_discard_msgs;
    }

    @Override // org.jgroups.stack.Protocol
    public Map<String, Object> dumpStats() {
        Map<String, Object> dumpStats = super.dumpStats();
        dumpStats.put("msgs", printMessages());
        return dumpStats;
    }

    @Override // org.jgroups.stack.Protocol
    public String printStats() {
        StringBuilder sb = new StringBuilder();
        sb.append("\nStability messages received\n");
        sb.append(printStabilityMessages()).append("\n");
        return sb.toString();
    }

    @ManagedOperation(description = "TODO")
    public String printStabilityMessages() {
        return Util.printListWithDelimiter(this.stability_msgs, "\n");
    }

    public String printStabilityHistory() {
        StringBuilder sb = new StringBuilder();
        int i = 1;
        Iterator<Digest> it = this.stability_msgs.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            sb.append(i2).append(": ").append(it.next()).append("\n");
        }
        return sb.toString();
    }

    @ManagedOperation(description = "Keeps information about the last N times a digest was set or merged")
    public String printDigestHistory() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        Iterator<String> it = this.digest_history.iterator();
        while (it.hasNext()) {
            sb.append(it.next()).append("\n");
        }
        return sb.toString();
    }

    @ManagedOperation(description = "TODO")
    public String printLossRates() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Address, NakReceiverWindow> entry : this.xmit_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().printLossRate()).append("\n");
        }
        return sb.toString();
    }

    @ManagedOperation(description = "Returns the sizes of all NakReceiverWindow.RetransmitTables")
    public String printRetransmitTableSizes() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<Address, NakReceiverWindow> entry : this.xmit_table.entrySet()) {
            NakReceiverWindow value = entry.getValue();
            sb.append(entry.getKey() + ": ").append(value.getRetransmitTableSize()).append(", offset=").append(value.getRetransmitTableOffset()).append(" (capacity=" + value.getRetransmitTableCapacity()).append(", fill factor=" + value.getRetransmitTableFillFactor() + "%)\n");
        }
        return sb.toString();
    }

    @ManagedOperation(description = "Compacts the retransmission tables")
    public void compact() {
        Iterator<Map.Entry<Address, NakReceiverWindow>> it = this.xmit_table.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().compact();
        }
    }

    @Override // org.jgroups.stack.Protocol
    public List<Integer> providedUpServices() {
        ArrayList arrayList = new ArrayList(5);
        arrayList.add(39);
        arrayList.add(41);
        arrayList.add(42);
        arrayList.add(53);
        return arrayList;
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        this.timer = getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        this.running = true;
        this.leaving = false;
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        this.running = false;
        this.is_server = false;
        if (this.become_server_queue != null) {
            this.become_server_queue.clear();
        }
        reset();
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (message.getDest() == null && !message.isFlagSet(Message.NO_RELIABILITY)) {
                    send(event, message);
                    return null;
                }
                break;
            case 4:
                this.leaving = true;
                reset();
                break;
            case 6:
                View view = (View) event.getArg();
                List<Address> members = view.getMembers();
                this.members.clear();
                this.members.addAll(members);
                this.view = view;
                adjustReceivers(this.members);
                this.is_server = true;
                if (this.suppress_log_non_member != null) {
                    this.suppress_log_non_member.removeExpired(this.suppress_time_non_member_warnings);
                    break;
                }
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
            case 15:
                List<Address> members2 = ((View) event.getArg()).getMembers();
                this.members.clear();
                this.members.addAll(members2);
                break;
            case 16:
                this.is_server = true;
                flushBecomeServerQueue();
                break;
            case 30:
                stable((Digest) event.getArg());
                return null;
            case 39:
                return getDigest((Address) event.getArg());
            case 41:
                setDigest((Digest) event.getArg());
                return null;
            case 42:
                overwriteDigest((Digest) event.getArg());
                return null;
            case 53:
                mergeDigest((Digest) event.getArg());
                return null;
            case 78:
                this.rebroadcasting = true;
                this.rebroadcast_digest = (Digest) event.getArg();
                try {
                    rebroadcastMessages();
                    this.rebroadcasting = false;
                    this.rebroadcast_digest_lock.lock();
                    try {
                        this.rebroadcast_digest = null;
                        this.rebroadcast_digest_lock.unlock();
                        return null;
                    } finally {
                    }
                } catch (Throwable th) {
                    this.rebroadcasting = false;
                    this.rebroadcast_digest_lock.lock();
                    try {
                        this.rebroadcast_digest = null;
                        this.rebroadcast_digest_lock.unlock();
                        throw th;
                    } finally {
                    }
                }
        }
        return this.down_prot.down(event);
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        NakAckHeader nakAckHeader;
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (!message.isFlagSet(Message.NO_RELIABILITY) && (nakAckHeader = (NakAckHeader) message.getHeader(this.id)) != null) {
                    if (!this.is_server) {
                        if (this.become_server_queue == null) {
                            if (!this.log.isTraceEnabled()) {
                                return null;
                            }
                            this.log.trace(this.local_addr + ": message " + message.getSrc() + "::" + nakAckHeader.seqno + " was discarded (not yet server)");
                            return null;
                        }
                        this.become_server_queue.add(message);
                        if (!this.log.isTraceEnabled()) {
                            return null;
                        }
                        this.log.trace(this.local_addr + ": message " + message.getSrc() + "::" + nakAckHeader.seqno + " was added to queue (not yet server)");
                        return null;
                    }
                    switch (nakAckHeader.type) {
                        case 1:
                            handleMessage(message, nakAckHeader);
                            return null;
                        case 2:
                            if (nakAckHeader.range != null) {
                                handleXmitReq(message.getSrc(), nakAckHeader.range.low, nakAckHeader.range.high, nakAckHeader.sender);
                                return null;
                            }
                            if (!this.log.isErrorEnabled()) {
                                return null;
                            }
                            this.log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + message.getSrc());
                            return null;
                        case 3:
                            handleXmitRsp(message, nakAckHeader);
                            return null;
                        default:
                            if (!this.log.isErrorEnabled()) {
                                return null;
                            }
                            this.log.error("NakAck header type " + ((int) nakAckHeader.type) + " not known !");
                            return null;
                    }
                }
                break;
            case 9:
                if (this.rebroadcasting) {
                    cancelRebroadcasting();
                    break;
                }
                break;
            case 30:
                stable((Digest) event.getArg());
                return null;
        }
        return this.up_prot.up(event);
    }

    protected void send(Event event, Message message) {
        if (message == null) {
            throw new NullPointerException("msg is null; event is " + event);
        }
        if (!this.running) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": discarded message as we're not in the 'running' state, message: " + message);
                return;
            }
            return;
        }
        NakReceiverWindow nakReceiverWindow = this.xmit_table.get(this.local_addr);
        if (nakReceiverWindow == null) {
            if (this.log.isWarnEnabled() && this.log_discard_msgs) {
                this.log.warn(this.local_addr + ": discarded message to " + this.local_addr + " with no window, my view is " + this.view);
                return;
            }
            return;
        }
        if (message.getSrc() == null) {
            message.setSrc(this.local_addr);
        }
        long incrementAndGet = this.seqno.incrementAndGet();
        long j = 10;
        while (true) {
            long j2 = j;
            if (!this.running) {
                break;
            }
            try {
                message.putHeader(this.id, NakAckHeader.createMessageHeader(incrementAndGet));
                nakReceiverWindow.add(incrementAndGet, message);
                break;
            } catch (Throwable th) {
                if (this.running) {
                    Util.sleep(j2);
                }
                j = Math.min(TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, j2 * 2);
            }
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending " + this.local_addr + "#" + incrementAndGet);
        }
        this.down_prot.down(event);
        this.num_messages_sent++;
    }

    /* JADX WARN: Removed duplicated region for block: B:108:0x02e6  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void handleMessage(org.jgroups.Message r12, org.jgroups.protocols.pbcast.NakAckHeader r13) {
        /*
            Method dump skipped, instructions count: 751
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.jgroups.protocols.pbcast.NAKACK.handleMessage(org.jgroups.Message, org.jgroups.protocols.pbcast.NakAckHeader):void");
    }

    private void handleXmitReq(Address address, long j, long j2, Address address2) {
        if (j > j2) {
            return;
        }
        if (this.log.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append(this.local_addr).append(": received xmit request from ").append(address).append(" for ");
            sb.append(address2).append(" [").append(j).append(" - ").append(j2).append("]");
            this.log.trace(sb.toString());
        }
        if (this.stats) {
            this.xmit_reqs_received.addAndGet((j2 - j) + 1);
        }
        NakReceiverWindow nakReceiverWindow = this.xmit_table.get(address2);
        if (nakReceiverWindow == null) {
            if (this.log.isErrorEnabled()) {
                StringBuilder sb2 = new StringBuilder();
                sb2.append("(requester=").append(address).append(", local_addr=").append(this.local_addr);
                sb2.append(") ").append(address2).append(" not found in retransmission table");
                if (this.log.isTraceEnabled()) {
                    sb2.append(":\n").append(printMessages());
                }
                if (this.print_stability_history_on_failed_xmit) {
                    sb2.append(" (stability history:\n").append(printStabilityHistory());
                }
                this.log.error(sb2.toString());
                return;
            }
            return;
        }
        if ((j2 - j) + 1 >= 10) {
            List<Message> list = nakReceiverWindow.get(j, j2);
            if (list != null) {
                Iterator<Message> it = list.iterator();
                while (it.hasNext()) {
                    sendXmitRsp(address, it.next());
                }
                return;
            }
            return;
        }
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 > j2) {
                return;
            }
            Message message = nakReceiverWindow.get(j4);
            if (message != null) {
                sendXmitRsp(address, message);
            } else if (this.log.isWarnEnabled() && this.log_not_found_msgs && !this.local_addr.equals(address)) {
                StringBuilder sb3 = new StringBuilder();
                sb3.append("(requester=").append(address).append(", local_addr=").append(this.local_addr);
                sb3.append(") message ").append(address2).append("::").append(j4);
                sb3.append(" not found in retransmission table of ").append(address2).append(":\n").append(nakReceiverWindow);
                if (this.print_stability_history_on_failed_xmit) {
                    sb3.append(" (stability history:\n").append(printStabilityHistory());
                }
                this.log.warn(sb3.toString());
            }
            j3 = j4 + 1;
        }
    }

    private void cancelRebroadcasting() {
        this.rebroadcast_lock.lock();
        try {
            this.rebroadcasting = false;
            this.rebroadcast_done.signalAll();
            this.rebroadcast_lock.unlock();
        } catch (Throwable th) {
            this.rebroadcast_lock.unlock();
            throw th;
        }
    }

    protected void flushBecomeServerQueue() {
        if (this.become_server_queue == null || this.become_server_queue.isEmpty()) {
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": flushing become_server_queue (" + this.become_server_queue.size() + " elements)");
        }
        TP transport = getTransport();
        Executor defaultThreadPool = transport.getDefaultThreadPool();
        Executor oOBThreadPool = transport.getOOBThreadPool();
        Iterator<Message> it = this.become_server_queue.iterator();
        while (it.hasNext()) {
            final Message next = it.next();
            (next.isFlagSet(Message.Flag.OOB) ? oOBThreadPool : defaultThreadPool).execute(new Runnable() { // from class: org.jgroups.protocols.pbcast.NAKACK.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NAKACK.this.up(new Event(1, next));
                        NAKACK.this.become_server_queue.remove(next);
                    } catch (Throwable th) {
                        NAKACK.this.become_server_queue.remove(next);
                        throw th;
                    }
                }
            });
        }
    }

    private void sendXmitRsp(Address address, Message message) {
        if (message == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error("message is null, cannot send retransmission");
                return;
            }
            return;
        }
        if (this.stats) {
            this.xmit_rsps_sent.incrementAndGet();
        }
        if (message.getSrc() == null) {
            message.setSrc(this.local_addr);
        }
        if (this.use_mcast_xmit) {
            this.down_prot.down(new Event(1, message));
            return;
        }
        Message copy = message.copy(true, true);
        copy.setDest(address);
        NakAckHeader copy2 = ((NakAckHeader) copy.getHeader(this.id)).copy();
        copy2.type = (byte) 3;
        copy.putHeader(this.id, copy2);
        this.down_prot.down(new Event(1, copy));
    }

    private void handleXmitRsp(Message message, NakAckHeader nakAckHeader) {
        if (message == null) {
            return;
        }
        try {
            if (this.stats) {
                this.xmit_rsps_received.incrementAndGet();
            }
            message.setDest(null);
            NakAckHeader copy = nakAckHeader.copy();
            copy.type = (byte) 1;
            message.putHeader(this.id, copy);
            up(new Event(1, message));
            if (this.rebroadcasting) {
                checkForRebroadcasts();
            }
        } catch (Exception e) {
            if (this.log.isErrorEnabled()) {
                this.log.error("failed reading retransmitted message", e);
            }
        }
    }

    private void rebroadcastMessages() {
        Digest digest;
        long j = this.max_rebroadcast_timeout / 3;
        long j2 = this.max_rebroadcast_timeout;
        long currentTimeMillis = System.currentTimeMillis();
        while (j2 > 0) {
            this.rebroadcast_digest_lock.lock();
            try {
                if (this.rebroadcast_digest == null) {
                    this.rebroadcast_digest_lock.unlock();
                    return;
                }
                Digest copy = this.rebroadcast_digest.copy();
                this.rebroadcast_digest_lock.unlock();
                Digest digest2 = getDigest();
                boolean z = false;
                Iterator<Digest.DigestEntry> it = copy.iterator();
                while (it.hasNext()) {
                    Digest.DigestEntry next = it.next();
                    Address member = next.getMember();
                    long[] jArr = digest2.get(member);
                    if (jArr != null) {
                        long highest = next.getHighest();
                        long max = Math.max(jArr[0], jArr[1]);
                        if (highest > max) {
                            if (this.log.isTraceEnabled()) {
                                this.log.trace("[" + this.local_addr + "] fetching " + max + "-" + highest + " from " + member);
                            }
                            retransmit(max + 1, highest, member, true);
                            z = true;
                        }
                    }
                }
                if (!z) {
                    return;
                }
                this.rebroadcast_lock.lock();
                try {
                    digest = getDigest();
                    this.rebroadcast_digest_lock.lock();
                    try {
                    } finally {
                        this.rebroadcast_digest_lock.unlock();
                    }
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    this.rebroadcast_lock.unlock();
                    throw th;
                }
                if (!this.rebroadcasting || digest.isGreaterThanOrEqual(this.rebroadcast_digest)) {
                    this.rebroadcast_lock.unlock();
                    return;
                }
                this.rebroadcast_digest_lock.unlock();
                this.rebroadcast_done.await(j, TimeUnit.MILLISECONDS);
                j2 -= System.currentTimeMillis() - currentTimeMillis;
                this.rebroadcast_lock.unlock();
            } finally {
                this.rebroadcast_digest_lock.unlock();
            }
        }
    }

    protected void checkForRebroadcasts() {
        Digest digest = getDigest();
        this.rebroadcast_digest_lock.lock();
        try {
            boolean isGreaterThanOrEqual = digest.isGreaterThanOrEqual(this.rebroadcast_digest);
            this.rebroadcast_digest_lock.unlock();
            if (isGreaterThanOrEqual) {
                cancelRebroadcasting();
            }
        } catch (Throwable th) {
            this.rebroadcast_digest_lock.unlock();
            throw th;
        }
    }

    private void adjustReceivers(List<Address> list) {
        for (Address address : this.xmit_table.keySet()) {
            if (!list.contains(address) && (this.local_addr == null || !this.local_addr.equals(address))) {
                NakReceiverWindow remove = this.xmit_table.remove(address);
                if (remove != null) {
                    remove.destroy();
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("removed " + address + " from xmit_table (not member anymore)");
                    }
                }
            }
        }
    }

    public Digest getDigest() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Address, NakReceiverWindow> entry : this.xmit_table.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getDigest());
        }
        return new Digest(hashMap);
    }

    public Digest getDigest(Address address) {
        if (address == null) {
            return getDigest();
        }
        NakReceiverWindow nakReceiverWindow = this.xmit_table.get(address);
        if (nakReceiverWindow == null) {
            return null;
        }
        long[] digest = nakReceiverWindow.getDigest();
        return new Digest(address, digest[0], digest[1]);
    }

    private void setDigest(Digest digest) {
        setDigest(digest, false);
    }

    private void mergeDigest(Digest digest) {
        setDigest(digest, true);
    }

    private void overwriteDigest(Digest digest) {
        if (digest == null) {
            return;
        }
        StringBuilder sb = new StringBuilder("\n[overwriteDigest()]\n");
        sb.append("existing digest:  " + getDigest()).append("\nnew digest:       " + digest);
        Iterator<Digest.DigestEntry> it = digest.iterator();
        while (it.hasNext()) {
            Digest.DigestEntry next = it.next();
            Address member = next.getMember();
            if (member != null) {
                long highestDeliveredSeqno = next.getHighestDeliveredSeqno();
                NakReceiverWindow nakReceiverWindow = this.xmit_table.get(member);
                if (nakReceiverWindow != null) {
                    if (this.local_addr.equals(member)) {
                        nakReceiverWindow.setHighestDelivered(highestDeliveredSeqno);
                    } else {
                        this.xmit_table.remove(member);
                        nakReceiverWindow.destroy();
                    }
                }
                this.xmit_table.put(member, createNakReceiverWindow(member, highestDeliveredSeqno));
            }
        }
        sb.append("\n").append("resulting digest: " + getDigest());
        this.digest_history.add(sb.toString());
        if (this.log.isDebugEnabled()) {
            this.log.debug(sb.toString());
        }
    }

    private void setDigest(Digest digest, boolean z) {
        if (digest == null) {
            return;
        }
        StringBuilder sb = new StringBuilder(z ? "\n[" + this.local_addr + " mergeDigest()]\n" : "\n[" + this.local_addr + " setDigest()]\n");
        sb.append("existing digest:  " + getDigest()).append("\nnew digest:       " + digest);
        boolean z2 = false;
        Iterator<Digest.DigestEntry> it = digest.iterator();
        while (it.hasNext()) {
            Digest.DigestEntry next = it.next();
            Address member = next.getMember();
            if (member != null) {
                long highestDeliveredSeqno = next.getHighestDeliveredSeqno();
                NakReceiverWindow nakReceiverWindow = this.xmit_table.get(member);
                if (nakReceiverWindow != null) {
                    if (z && (this.local_addr == null || !this.local_addr.equals(member))) {
                        if (nakReceiverWindow.getHighestDelivered() < highestDeliveredSeqno) {
                            this.xmit_table.remove(member);
                            nakReceiverWindow.destroy();
                            if (member.equals(this.local_addr)) {
                                this.seqno.set(highestDeliveredSeqno);
                                z2 = true;
                            }
                        }
                    }
                }
                this.xmit_table.put(member, createNakReceiverWindow(member, highestDeliveredSeqno));
            }
        }
        sb.append("\n").append("resulting digest: " + getDigest());
        if (z2) {
            sb.append("\nnew seqno for " + this.local_addr + ": " + this.seqno);
        }
        this.digest_history.add(sb.toString());
        if (this.log.isDebugEnabled()) {
            this.log.debug(sb.toString());
        }
    }

    private NakReceiverWindow createNakReceiverWindow(Address address, long j) {
        NakReceiverWindow nakReceiverWindow = new NakReceiverWindow(address, this, j, this.timer, this.use_range_based_retransmitter, this.xmit_table_num_rows, this.xmit_table_msgs_per_row, this.xmit_table_resize_factor, this.xmit_table_max_compaction_time, false);
        if (this.exponential_backoff > 0) {
            nakReceiverWindow.setRetransmitTimeouts(new ExponentialInterval(this.exponential_backoff));
        } else {
            nakReceiverWindow.setRetransmitTimeouts(new StaticInterval(this.retransmit_timeouts));
        }
        if (this.xmit_stagger_timeout > 0) {
            nakReceiverWindow.setXmitStaggerTimeout(this.xmit_stagger_timeout);
        }
        return nakReceiverWindow;
    }

    private void stable(Digest digest) {
        if (this.members == null || this.local_addr == null || digest == null) {
            if (this.log.isWarnEnabled()) {
                this.log.warn("members, local_addr or digest are null !");
                return;
            }
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("received stable digest " + digest);
        }
        this.stability_msgs.add(digest);
        Iterator<Digest.DigestEntry> it = digest.iterator();
        while (it.hasNext()) {
            Digest.DigestEntry next = it.next();
            Address member = next.getMember();
            if (member != null) {
                long highestDeliveredSeqno = next.getHighestDeliveredSeqno();
                long highestReceivedSeqno = next.getHighestReceivedSeqno();
                NakReceiverWindow nakReceiverWindow = this.xmit_table.get(member);
                if (nakReceiverWindow != null) {
                    long highestReceived = nakReceiverWindow.getHighestReceived();
                    if (highestReceivedSeqno >= 0 && highestReceivedSeqno > highestReceived) {
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("my_highest_rcvd (" + highestReceived + ") < stability_highest_rcvd (" + highestReceivedSeqno + "): requesting retransmission of " + member + '#' + highestReceivedSeqno);
                        }
                        retransmit(highestReceivedSeqno, highestReceivedSeqno, member);
                    }
                }
                if (highestDeliveredSeqno >= 0) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("deleting msgs <= " + highestDeliveredSeqno + " from " + member);
                    }
                    if (nakReceiverWindow != null) {
                        nakReceiverWindow.stable(highestDeliveredSeqno);
                    }
                }
            }
        }
    }

    @Override // org.jgroups.stack.Retransmitter.RetransmitCommand
    public void retransmit(long j, long j2, Address address) {
        if (j <= j2) {
            retransmit(j, j2, address, false);
        }
    }

    protected void retransmit(long j, long j2, Address address, boolean z) {
        Address address2;
        Address address3 = address;
        if (z || this.use_mcast_xmit_req) {
            address3 = null;
        } else if (this.xmit_from_random_member && !this.local_addr.equals(address) && (address2 = (Address) Util.pickRandomElement(this.members)) != null && !this.local_addr.equals(address2)) {
            address3 = address2;
            if (this.log.isTraceEnabled()) {
                this.log.trace("picked random member " + address3 + " to send XMIT request to");
            }
        }
        NakAckHeader createXmitRequestHeader = NakAckHeader.createXmitRequestHeader(j, j2, address);
        Message message = new Message(address3, (Address) null, (byte[]) null);
        message.setFlag(Message.OOB);
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": sending XMIT_REQ ([" + j + RecoveryAdminOperations.SEPARATOR + j2 + "]) to " + address3);
        }
        message.putHeader(this.id, createXmitRequestHeader);
        this.down_prot.down(new Event(1, message));
        if (this.stats) {
            this.xmit_reqs_sent.addAndGet((j2 - j) + 1);
        }
    }

    private void reset() {
        this.seqno.set(0L);
        Iterator<NakReceiverWindow> it = this.xmit_table.values().iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.xmit_table.clear();
    }

    @ManagedOperation(description = "Prints the contents of the receiver windows for all members")
    public String printMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, NakReceiverWindow> entry : this.xmit_table.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().toString()).append('\n');
        }
        return sb.toString();
    }

    @Override // org.jgroups.stack.DiagnosticsHandler.ProbeHandler
    public Map<String, String> handleProbe(String... strArr) {
        HashMap hashMap = new HashMap();
        for (String str : strArr) {
            if (str.equals("digest-history")) {
                hashMap.put(str, printDigestHistory());
            }
            if (str.equals("dump-digest")) {
                hashMap.put(str, "\n" + printMessages());
            }
        }
        return hashMap;
    }

    @Override // org.jgroups.stack.DiagnosticsHandler.ProbeHandler
    public String[] supportedKeys() {
        return new String[]{"digest-history", "dump-digest"};
    }
}
