package org.jgroups.protocols.pbcast;

import java.io.DataInput;
import java.io.DataOutput;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils;
import org.hsqldb.persist.LockFile;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Digest;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.Util;

@MBean(description = "Computes the broadcast messages that are stable")
/* loaded from: input_file:WEB-INF/lib/jgroups-3.2.10.Final.jar:org/jgroups/protocols/pbcast/STABLE.class */
public class STABLE extends Protocol {
    private static final long MAX_SUSPEND_TIME = 200000;

    @Property(description = "Average time to send a STABLE message. Default is 20000 msec")
    private long desired_avg_gossip = 20000;

    @Property(description = "Delay before stability message is sent. Default is 6000 msec")
    private long stability_delay = 6000;

    @Property(description = "Maximum number of bytes received in all messages before sending a STABLE message is triggered .If ergonomics is enabled, this value is computed as max(MAX_HEAP * cap, N * max_bytes) where N = number of members")
    protected long max_bytes = 2000000;
    protected long original_max_bytes = this.max_bytes;

    @Property(description = "Max percentage of the max heap (-Xmx) to be used for max_bytes. Only used if ergonomics is enabled. 0 disables setting max_bytes dynamically.")
    protected double cap = 0.1d;
    private int num_stable_msgs_sent = 0;
    private int num_stable_msgs_received = 0;
    private int num_stability_msgs_sent = 0;
    private int num_stability_msgs_received = 0;
    private Address local_addr = null;
    private final Set<Address> mbrs = new LinkedHashSet();
    private final MutableDigest digest = new MutableDigest(10);
    private final Set<Address> votes = new HashSet();
    private final Lock lock = new ReentrantLock();
    private Future<?> stability_task_future = null;
    private final Lock stability_lock = new ReentrantLock();
    private Future<?> stable_task_future = null;
    private final Lock stable_task_lock = new ReentrantLock();
    private TimeScheduler timer = null;

    @ManagedAttribute(description = "Bytes accumulated so far")
    private long num_bytes_received = 0;
    private final Lock received = new ReentrantLock();

    @ManagedAttribute
    protected volatile boolean suspended = false;
    private boolean initialized = false;
    private Future<?> resume_task_future = null;
    private final Object resume_task_mutex = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.2.10.Final.jar:org/jgroups/protocols/pbcast/STABLE$ResumeTask.class */
    public class ResumeTask implements Runnable {
        ResumeTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (STABLE.this.suspended) {
                STABLE.this.log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; check why this event was not received (or increase max_suspend_time for large state transfers)");
            }
            STABLE.this.resume();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.2.10.Final.jar:org/jgroups/protocols/pbcast/STABLE$StabilitySendTask.class */
    public class StabilitySendTask implements Runnable {
        Digest stability_digest;

        StabilitySendTask(Digest digest) {
            this.stability_digest = null;
            this.stability_digest = digest;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (STABLE.this.suspended) {
                if (STABLE.this.log.isDebugEnabled()) {
                    STABLE.this.log.debug("STABILITY message will not be sent as suspended=" + STABLE.this.suspended);
                }
            } else if (this.stability_digest != null) {
                Message message = new Message();
                message.setFlag(Message.OOB, Message.Flag.NO_RELIABILITY);
                message.putHeader(STABLE.this.id, new StableHeader(2, this.stability_digest));
                if (STABLE.this.log.isTraceEnabled()) {
                    STABLE.this.log.trace(STABLE.this.local_addr + ": sending stability msg " + this.stability_digest.printHighestDeliveredSeqnos());
                }
                STABLE.access$1808(STABLE.this);
                STABLE.this.down_prot.down(new Event(1, message));
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/jgroups-3.2.10.Final.jar:org/jgroups/protocols/pbcast/STABLE$StableHeader.class */
    public static class StableHeader extends Header {
        public static final int STABLE_GOSSIP = 1;
        public static final int STABILITY = 2;
        int type;
        Digest stableDigest;

        public StableHeader() {
            this.type = 0;
            this.stableDigest = null;
        }

        public StableHeader(int i, Digest digest) {
            this.type = 0;
            this.stableDigest = null;
            this.type = i;
            this.stableDigest = digest;
        }

        static String type2String(int i) {
            switch (i) {
                case 1:
                    return "STABLE_GOSSIP";
                case 2:
                    return "STABILITY";
                default:
                    return "<unknown>";
            }
        }

        @Override // org.jgroups.Header
        public String toString() {
            return '[' + type2String(this.type) + "]: digest is " + this.stableDigest;
        }

        @Override // org.jgroups.Header
        public int size() {
            int i = 5;
            if (this.stableDigest != null) {
                i = (int) (5 + this.stableDigest.serializedSize());
            }
            return i;
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeInt(this.type);
            Util.writeStreamable(this.stableDigest, dataOutput);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readInt();
            this.stableDigest = (Digest) Util.readStreamable(Digest.class, dataInput);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.2.10.Final.jar:org/jgroups/protocols/pbcast/STABLE$StableTask.class */
    public class StableTask implements TimeScheduler.Task {
        private StableTask() {
        }

        @Override // org.jgroups.util.TimeScheduler.Task
        public long nextInterval() {
            long computeSleepTime = computeSleepTime();
            return computeSleepTime <= 0 ? LockFile.HEARTBEAT_INTERVAL : computeSleepTime;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (STABLE.this.suspended) {
                if (STABLE.this.log.isTraceEnabled()) {
                    STABLE.this.log.trace("stable task will not run as suspended=" + STABLE.this.suspended);
                    return;
                }
                return;
            }
            Digest digest = STABLE.this.getDigest();
            if (digest == null) {
                if (STABLE.this.log.isWarnEnabled()) {
                    STABLE.this.log.warn("received null digest, skipped sending of stable message");
                }
            } else {
                if (STABLE.this.log.isTraceEnabled()) {
                    STABLE.this.log.trace(STABLE.this.local_addr + ": setting latest_local_digest from NAKACK: " + digest.printHighestDeliveredSeqnos());
                }
                STABLE.this.sendStableMessage(digest);
            }
        }

        long computeSleepTime() {
            return getRandom(STABLE.this.mbrs.size() * STABLE.this.desired_avg_gossip * 2);
        }

        long getRandom(long j) {
            return (long) ((Math.random() * j) % j);
        }
    }

    public long getDesiredAverageGossip() {
        return this.desired_avg_gossip;
    }

    public void setDesiredAverageGossip(long j) {
        this.desired_avg_gossip = j;
    }

    public long getMaxBytes() {
        return this.max_bytes;
    }

    public void setMaxBytes(long j) {
        this.max_bytes = j;
        this.original_max_bytes = j;
    }

    @ManagedAttribute(name = "BytesReceived")
    public long getBytes() {
        return this.num_bytes_received;
    }

    @ManagedAttribute
    public int getStableSent() {
        return this.num_stable_msgs_sent;
    }

    @ManagedAttribute
    public int getStableReceived() {
        return this.num_stable_msgs_received;
    }

    @ManagedAttribute
    public int getStabilitySent() {
        return this.num_stability_msgs_sent;
    }

    @ManagedAttribute
    public int getStabilityReceived() {
        return this.num_stability_msgs_received;
    }

    @ManagedAttribute
    public boolean getStableTaskRunning() {
        boolean z;
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future != null && !this.stable_task_future.isDone()) {
                if (!this.stable_task_future.isCancelled()) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.stable_task_lock.unlock();
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        super.resetStats();
        this.num_stable_msgs_received = 0;
        this.num_stable_msgs_sent = 0;
        this.num_stability_msgs_sent = 0;
        this.num_stability_msgs_received = 0;
    }

    @Override // org.jgroups.stack.Protocol
    public List<Integer> requiredDownServices() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(39);
        return arrayList;
    }

    private void suspend(long j) {
        if (!this.suspended) {
            this.suspended = true;
            if (this.log.isDebugEnabled()) {
                this.log.debug("suspending message garbage collection");
            }
        }
        startResumeTask(j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resume() {
        this.lock.lock();
        try {
            resetDigest();
            this.suspended = false;
            this.lock.unlock();
            if (this.log.isDebugEnabled()) {
                this.log.debug("resuming message garbage collection");
            }
            stopResumeTask();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        super.init();
        this.original_max_bytes = this.max_bytes;
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        this.timer = getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer cannot be retrieved");
        }
        if (this.desired_avg_gossip > 0) {
            startStableTask();
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        stopStableTask();
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                StableHeader stableHeader = (StableHeader) message.getHeader(this.id);
                if (stableHeader == null) {
                    handleRegularMessage(message);
                    return this.up_prot.up(event);
                }
                switch (stableHeader.type) {
                    case 1:
                        handleStableMessage(message.getSrc(), stableHeader.stableDigest);
                        return null;
                    case 2:
                        handleStabilityMessage(stableHeader.stableDigest, message.getSrc());
                        return null;
                    default:
                        if (!this.log.isErrorEnabled()) {
                            return null;
                        }
                        this.log.error("StableHeader type " + stableHeader.type + " not known");
                        return null;
                }
            case 6:
                Object up = this.up_prot.up(event);
                handleViewChange((View) event.getArg());
                return up;
            default:
                return this.up_prot.up(event);
        }
    }

    private void handleRegularMessage(Message message) {
        if (this.max_bytes > 0 && message.getDest() == null) {
            boolean z = false;
            this.received.lock();
            try {
                this.num_bytes_received += message.getLength();
                if (this.num_bytes_received >= this.max_bytes) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(new StringBuilder("max_bytes has been reached (").append(this.max_bytes).append(", bytes received=").append(this.num_bytes_received).append("): triggers stable msg"));
                    }
                    this.num_bytes_received = 0L;
                    z = true;
                }
                if (z) {
                    Digest digest = getDigest();
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("setting latest_local_digest from NAKACK: " + digest.printHighestDeliveredSeqnos());
                    }
                    sendStableMessage(digest);
                }
            } finally {
                this.received.unlock();
            }
        }
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 6:
                Object down = this.down_prot.down(event);
                handleViewChange((View) event.getArg());
                return down;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
            case 65:
                long j = 200000;
                Object arg = event.getArg();
                if (arg != null && (arg instanceof Long)) {
                    j = ((Long) arg).longValue();
                }
                suspend(j);
                break;
            case 66:
                resume();
                break;
        }
        return this.down_prot.down(event);
    }

    @ManagedOperation
    public void runMessageGarbageCollection() {
        sendStableMessage(getDigest());
    }

    @ManagedOperation(description = "Sends a STABLE message; when every member has received a STABLE message from everybody else, a STABILITY message will be sent")
    public void gc() {
        runMessageGarbageCollection();
    }

    private void handleViewChange(View view) {
        List<Address> members = view.getMembers();
        synchronized (this.mbrs) {
            this.mbrs.clear();
            this.mbrs.addAll(members);
        }
        this.lock.lock();
        try {
            resetDigest();
            if (!this.initialized) {
                this.initialized = true;
            }
            if (this.ergonomics && this.cap > 0.0d) {
                this.max_bytes = Math.min((long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * this.cap), members.size() * this.original_max_bytes);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("[ergonomics] setting max_bytes to " + Util.printBytes(this.max_bytes) + " (" + members.size() + " members)");
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private boolean updateLocalDigest(Digest digest, Address address) {
        if (digest == null || digest.size() == 0) {
            return false;
        }
        if (!this.initialized) {
            if (!this.log.isTraceEnabled()) {
                return false;
            }
            this.log.trace("STABLE message will not be handled as I'm not yet initialized");
            return false;
        }
        if (!this.digest.sameSenders(digest)) {
            resetDigest();
            return false;
        }
        StringBuilder sb = null;
        if (this.log.isTraceEnabled()) {
            sb = new StringBuilder().append(this.local_addr).append(": handling digest from ").append(address).append(" (").append(this.votes.size()).append(" votes):\nmine:   ").append(this.digest.printHighestDeliveredSeqnos()).append("\nother:  ").append(digest.printHighestDeliveredSeqnos());
        }
        Iterator<Digest.DigestEntry> it = digest.iterator();
        while (it.hasNext()) {
            Digest.DigestEntry next = it.next();
            Address member = next.getMember();
            long highestDeliveredSeqno = next.getHighestDeliveredSeqno();
            long highestReceivedSeqno = next.getHighestReceivedSeqno();
            long[] jArr = this.digest.get(member);
            if (jArr != null) {
                this.digest.setHighestDeliveredAndSeenSeqnos(member, Math.min(jArr[0], highestDeliveredSeqno), Math.max(jArr[1], highestReceivedSeqno));
            }
        }
        if (sb == null) {
            return true;
        }
        sb.append("\nresult: ").append(this.digest.printHighestDeliveredSeqnos()).append(IOUtils.LINE_SEPARATOR_UNIX);
        this.log.trace(sb);
        return true;
    }

    private void resetDigest() {
        this.digest.replace(getDigest());
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": resetting digest from NAKACK: " + this.digest.printHighestDeliveredSeqnos());
        }
        this.votes.clear();
    }

    private boolean addVote(Address address) {
        return this.votes.add(address) && allVotesReceived(this.votes);
    }

    private boolean allVotesReceived(Set<Address> set) {
        boolean equals;
        synchronized (this.mbrs) {
            equals = set.equals(this.mbrs);
        }
        return equals;
    }

    private void startStableTask() {
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future == null || this.stable_task_future.isDone()) {
                this.stable_task_future = this.timer.scheduleWithDynamicInterval(new StableTask());
                if (this.log.isTraceEnabled()) {
                    this.log.trace("stable task started");
                }
            }
        } finally {
            this.stable_task_lock.unlock();
        }
    }

    private void stopStableTask() {
        this.stable_task_lock.lock();
        try {
            if (this.stable_task_future != null) {
                this.stable_task_future.cancel(false);
                this.stable_task_future = null;
            }
        } finally {
            this.stable_task_lock.unlock();
        }
    }

    private void startResumeTask(long j) {
        long j2 = (long) (j * 1.1d);
        if (j2 <= 0) {
            j2 = 200000;
        }
        synchronized (this.resume_task_mutex) {
            if (this.resume_task_future == null || this.resume_task_future.isDone()) {
                this.resume_task_future = this.timer.schedule(new ResumeTask(), j2, TimeUnit.MILLISECONDS);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("resume task started, max_suspend_time=" + j2);
                }
            }
        }
    }

    private void stopResumeTask() {
        synchronized (this.resume_task_mutex) {
            if (this.resume_task_future != null) {
                this.resume_task_future.cancel(false);
                this.resume_task_future = null;
            }
        }
    }

    private void startStabilityTask(Digest digest, long j) {
        this.stability_lock.lock();
        try {
            if (this.stability_task_future == null || this.stability_task_future.isDone()) {
                this.stability_task_future = this.timer.schedule(new StabilitySendTask(digest), j, TimeUnit.MILLISECONDS);
            }
        } finally {
            this.stability_lock.unlock();
        }
    }

    private void stopStabilityTask() {
        this.stability_lock.lock();
        try {
            if (this.stability_task_future != null) {
                this.stability_task_future.cancel(false);
                this.stability_task_future = null;
            }
        } finally {
            this.stability_lock.unlock();
        }
    }

    private void handleStableMessage(Address address, Digest digest) {
        if (digest == null || address == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error("digest or sender is null");
                return;
            }
            return;
        }
        if (!this.initialized) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("STABLE message will not be handled as I'm not yet initialized");
                return;
            }
            return;
        }
        if (this.suspended) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("STABLE message will not be handled as I'm suspended");
                return;
            }
            return;
        }
        MutableDigest mutableDigest = null;
        this.lock.lock();
        try {
            if (this.votes.contains(address)) {
                return;
            }
            this.num_stable_msgs_received++;
            if (!updateLocalDigest(digest, address)) {
                this.lock.unlock();
                return;
            }
            if (addVote(address)) {
                mutableDigest = this.digest.copy();
            }
            this.lock.unlock();
            if (mutableDigest != null) {
                sendStabilityMessage(mutableDigest);
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void handleStabilityMessage(Digest digest, Address address) {
        if (digest == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error("stability digest is null");
                return;
            }
            return;
        }
        if (!this.initialized) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("STABLE message will not be handled as I'm not yet initialized");
                return;
            }
            return;
        }
        if (this.suspended) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("stability message will not be handled as I'm suspended");
                return;
            }
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(new StringBuilder(this.local_addr + ": received stability msg from ").append(address).append(": ").append(digest.printHighestDeliveredSeqnos()));
        }
        stopStabilityTask();
        this.lock.lock();
        try {
            if (!this.digest.sameSenders(digest)) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug(this.local_addr + ": received digest from " + address + " (digest=" + digest + ") which does not match my own digest (" + this.digest + "): ignoring digest and re-initializing own digest");
                }
                resetDigest();
                this.lock.unlock();
                return;
            }
            this.num_stability_msgs_received++;
            resetDigest();
            this.lock.unlock();
            this.down_prot.down(new Event(30, digest));
            this.num_bytes_received = 0L;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendStableMessage(Digest digest) {
        if (this.suspended) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("will not send STABLE message as I'm suspended");
            }
        } else {
            if (digest == null || digest.size() <= 0) {
                return;
            }
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": sending stable msg " + digest.printHighestDeliveredSeqnos());
            }
            this.num_stable_msgs_sent++;
            final Message message = new Message();
            message.setFlag(Message.OOB, Message.Flag.NO_RELIABILITY);
            message.putHeader(this.id, new StableHeader(1, digest));
            this.timer.execute(new Runnable() { // from class: org.jgroups.protocols.pbcast.STABLE.1
                @Override // java.lang.Runnable
                public void run() {
                    STABLE.this.down_prot.down(new Event(1, message));
                }
            });
        }
    }

    private void sendStabilityMessage(Digest digest) {
        if (this.suspended) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("STABILITY message will not be sent as I'm suspended");
            }
        } else {
            long random = Util.random(this.stability_delay);
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": sending stability msg (in " + random + " ms) " + digest.printHighestDeliveredSeqnos());
            }
            startStabilityTask(digest, random);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Digest getDigest() {
        return (Digest) this.down_prot.down(Event.GET_DIGEST_EVT);
    }

    static /* synthetic */ int access$1808(STABLE stable) {
        int i = stable.num_stability_msgs_sent;
        stable.num_stability_msgs_sent = i + 1;
        return i;
    }
}
