/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Bits;
import org.jgroups.util.BoundedHashMap;
import org.jgroups.util.Buffer;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;

@MBean(description="Implementation of total order protocol using a sequencer")
public class SEQUENCER
extends Protocol {
    protected Address local_addr;
    protected volatile Address coord;
    protected volatile View view;
    @ManagedAttribute
    protected volatile boolean is_coord;
    protected final AtomicLong seqno = new AtomicLong(0L);
    protected final NavigableMap<Long, Message> forward_table = new ConcurrentSkipListMap<Long, Message>();
    protected final Lock send_lock = new ReentrantLock();
    protected final Condition send_cond = this.send_lock.newCondition();
    @ManagedAttribute(description="is ack-mode enabled or not")
    protected volatile boolean ack_mode = true;
    protected volatile boolean flushing;
    protected volatile boolean running = true;
    protected final AtomicInteger in_flight_sends = new AtomicInteger(0);
    protected final ConcurrentMap<Address, BoundedHashMap<Long, Long>> delivery_table = Util.createConcurrentMap();
    protected volatile Flusher flusher;
    protected final Promise<Long> ack_promise = new Promise();
    @Property(description="Size of the set to store received seqnos (for duplicate checking)")
    protected int delivery_table_max_size = 2000;
    @Property(description="Number of acks needed before going from ack-mode to normal mode. 0 disables this, which means that ack-mode is always on")
    protected int threshold = 10;
    @Property(description="If true, all messages in the forward-table are sent to the new coord, else thye're dropped (https://issues.jboss.org/browse/JGRP-2268)")
    protected boolean flush_forward_table = true;
    @ManagedAttribute
    protected int num_acks;
    @ManagedAttribute
    protected long forwarded_msgs;
    @ManagedAttribute
    protected long bcast_msgs;
    @ManagedAttribute
    protected long received_forwards;
    @ManagedAttribute
    protected long received_bcasts;
    @ManagedAttribute
    protected long delivered_bcasts;

    @ManagedAttribute
    public boolean isCoordinator() {
        return this.is_coord;
    }

    public Address getCoordinator() {
        return this.coord;
    }

    public Address getLocalAddress() {
        return this.local_addr;
    }

    @ManagedAttribute(description="Number of messages in the forward-table")
    public int getForwardTableSize() {
        return this.forward_table.size();
    }

    public void setThreshold(int new_threshold) {
        this.threshold = new_threshold;
    }

    public void setDeliveryTableMaxSize(int size) {
        this.delivery_table_max_size = size;
    }

    @Override
    @ManagedOperation
    public void resetStats() {
        this.delivered_bcasts = 0L;
        this.received_bcasts = 0L;
        this.received_forwards = 0L;
        this.bcast_msgs = 0L;
        this.forwarded_msgs = 0L;
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.running = true;
        this.ack_mode = true;
    }

    @Override
    public void stop() {
        this.running = false;
        this.unblockAll();
        this.stopFlusher();
        super.stop();
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 6: {
                this.handleViewChange((View)evt.getArg());
                break;
            }
            case 15: {
                this.handleTmpView((View)evt.getArg());
                break;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
            }
        }
        return this.down_prot.down(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object down(Message msg) {
        if (msg.getDest() != null || msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB)) {
            return this.down_prot.down(msg);
        }
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        if (this.flushing) {
            this.block();
        }
        long next_seqno = this.seqno.incrementAndGet();
        this.in_flight_sends.incrementAndGet();
        try {
            SequencerHeader hdr = new SequencerHeader(this.is_coord ? (byte)3 : 4, next_seqno);
            msg.putHeader(this.id, hdr);
            if (this.log.isTraceEnabled()) {
                this.log.trace("[" + this.local_addr + "]: forwarding " + this.local_addr + "::" + this.seqno + " to coord " + this.coord);
            }
            this.forwardToCoord(next_seqno, msg);
        }
        catch (Exception ex) {
            this.log.error(Util.getMessage("FailedSendingMessage"), ex);
        }
        finally {
            this.in_flight_sends.decrementAndGet();
        }
        return null;
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 6: {
                Object retval = this.up_prot.up(evt);
                this.handleViewChange((View)evt.getArg());
                return retval;
            }
            case 15: {
                this.handleTmpView((View)evt.getArg());
            }
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        if (msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB)) {
            return this.up_prot.up(msg);
        }
        SequencerHeader hdr = (SequencerHeader)msg.getHeader(this.id);
        if (hdr == null) {
            return this.up_prot.up(msg);
        }
        switch (hdr.type) {
            case 1: 
            case 2: {
                if (!this.is_coord) {
                    if (this.log.isErrorEnabled()) {
                        this.log.error(this.local_addr + ": non-coord; dropping FORWARD request from " + msg.getSrc());
                    }
                    return null;
                }
                Address sender = msg.getSrc();
                if (this.view != null && !this.view.containsMember(sender)) {
                    this.log.error("%s: dropping FORWARD request from non-member %s; view=%s", this.local_addr, sender, this.view);
                    return null;
                }
                this.broadcast(msg, true, msg.getSrc(), hdr.seqno, hdr.type == 2);
                ++this.received_forwards;
                break;
            }
            case 3: {
                this.deliver(msg, hdr);
                ++this.received_bcasts;
                break;
            }
            case 4: {
                this.unwrapAndDeliver(msg, hdr.flush_ack);
                ++this.received_bcasts;
            }
        }
        return null;
    }

    @Override
    public void up(MessageBatch batch) {
        for (Message msg : batch) {
            if (msg.isFlagSet(Message.Flag.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB) || msg.getHeader(this.id) == null) continue;
            batch.remove(msg);
            try {
                this.up(msg);
            }
            catch (Throwable t) {
                this.log.error(Util.getMessage("FailedPassingUpMessage"), t);
            }
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    protected void handleViewChange(View v) {
        boolean coord_changed;
        List<Address> mbrs = v.getMembers();
        if (mbrs.isEmpty()) {
            return;
        }
        if (this.view != null && this.view.compareTo(v) >= 0) {
            return;
        }
        this.view = v;
        this.delivery_table.keySet().retainAll(mbrs);
        Address existing_coord = this.coord;
        Address new_coord = mbrs.get(0);
        boolean bl = coord_changed = !Objects.equals(existing_coord, new_coord);
        if (coord_changed && new_coord != null) {
            this.stopFlusher();
            this.startFlusher(new_coord);
        }
    }

    protected void flush(Address new_coord) throws InterruptedException {
        while (this.flushing && this.running && this.in_flight_sends.get() != 0) {
            Thread.sleep(100L);
        }
        this.send_lock.lockInterruptibly();
        try {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": coord changed from " + this.coord + " to " + new_coord);
            }
            this.coord = new_coord;
            this.is_coord = Objects.equals(this.local_addr, this.coord);
            if (this.flush_forward_table) {
                this.flushMessagesInForwardTable();
            }
        }
        finally {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing completed");
            }
            this.flushing = false;
            this.ack_mode = true;
            this.num_acks = 0;
            this.send_cond.signalAll();
            this.send_lock.unlock();
        }
    }

    private void handleTmpView(View v) {
        Address new_coord = v.getCoord();
        if (new_coord != null && !new_coord.equals(this.coord) && this.local_addr != null && this.local_addr.equals(new_coord)) {
            this.handleViewChange(v);
        }
    }

    protected void flushMessagesInForwardTable() {
        if (this.is_coord) {
            for (Map.Entry entry : this.forward_table.entrySet()) {
                Buffer buf;
                Long key = (Long)entry.getKey();
                Message msg = (Message)entry.getValue();
                try {
                    buf = Util.streamableToBuffer(msg);
                }
                catch (Exception e) {
                    this.log.error(Util.getMessage("FlushingBroadcastingFailed"), e);
                    continue;
                }
                SequencerHeader hdr = new SequencerHeader(4, key);
                Message forward_msg = new Message(null, buf).putHeader(this.id, hdr);
                if (this.log.isTraceEnabled()) {
                    this.log.trace(this.local_addr + ": flushing (broadcasting) " + this.local_addr + "::" + key);
                }
                this.down_prot.down(forward_msg);
            }
            return;
        }
        block5: while (this.flushing && this.running && !this.forward_table.isEmpty()) {
            Buffer buf;
            Map.Entry<Long, Message> entry = this.forward_table.firstEntry();
            Long key = entry.getKey();
            Message msg = entry.getValue();
            try {
                buf = Util.streamableToBuffer(msg);
            }
            catch (Exception e) {
                this.log.error(Util.getMessage("FlushingBroadcastingFailed"), e);
                continue;
            }
            while (this.flushing && this.running && !this.forward_table.isEmpty()) {
                SequencerHeader hdr = new SequencerHeader(2, key);
                Message forward_msg = new Message(this.coord, buf).putHeader(this.id, hdr).setFlag(Message.Flag.DONT_BUNDLE);
                if (this.log.isTraceEnabled()) {
                    this.log.trace(this.local_addr + ": flushing (forwarding) " + this.local_addr + "::" + key + " to coord " + this.coord);
                }
                this.ack_promise.reset();
                this.down_prot.down(forward_msg);
                Long ack = this.ack_promise.getResult(500L);
                if (!Objects.equals(ack, key) && this.forward_table.containsKey(key)) continue;
                continue block5;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void forwardToCoord(long seqno, Message msg) {
        if (this.is_coord) {
            this.forward(msg, seqno, false);
            return;
        }
        if (!this.running || this.flushing) {
            this.forward_table.put(seqno, msg);
            return;
        }
        if (!this.ack_mode) {
            this.forward_table.put(seqno, msg);
            this.forward(msg, seqno, false);
            return;
        }
        this.send_lock.lock();
        try {
            this.forward_table.put(seqno, msg);
            while (this.running && !this.flushing) {
                this.ack_promise.reset();
                this.forward(msg, seqno, true);
                if (this.ack_mode && this.running) {
                    if (this.flushing) {
                    } else {
                        Long ack = this.ack_promise.getResult(500L);
                        if (!Objects.equals(ack, seqno)) {
                            if (this.forward_table.containsKey(seqno)) continue;
                        }
                    }
                }
                break;
            }
        }
        finally {
            this.send_lock.unlock();
        }
    }

    protected void forward(Message msg, long seqno, boolean flush) {
        Address target = this.coord;
        if (target == null) {
            return;
        }
        byte type = flush ? (byte)2 : 1;
        try {
            SequencerHeader hdr = new SequencerHeader(type, seqno);
            Message forward_msg = new Message(target, Util.streamableToBuffer(msg)).putHeader(this.id, hdr);
            this.down_prot.down(forward_msg);
            ++this.forwarded_msgs;
        }
        catch (Exception ex) {
            this.log.error(Util.getMessage("FailedForwardingMessageTo") + msg.getDest(), ex);
        }
    }

    protected void broadcast(Message msg, boolean copy, Address original_sender, long seqno, boolean resend) {
        Message bcast_msg = null;
        if (!copy) {
            bcast_msg = msg;
        } else {
            SequencerHeader new_hdr = new SequencerHeader(4, seqno);
            bcast_msg = new Message(null, msg.getRawBuffer(), msg.getOffset(), msg.getLength()).putHeader(this.id, new_hdr);
            if (resend) {
                new_hdr.flush_ack = true;
                bcast_msg.setFlag(Message.Flag.DONT_BUNDLE);
            }
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": broadcasting " + original_sender + "::" + seqno);
        }
        this.down_prot.down(bcast_msg);
        ++this.bcast_msgs;
    }

    protected void unwrapAndDeliver(Message msg, boolean flush_ack) {
        try {
            Message msg_to_deliver = Util.streamableFromBuffer(Message::new, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
            SequencerHeader hdr = (SequencerHeader)msg_to_deliver.getHeader(this.id);
            if (flush_ack) {
                hdr.flush_ack = true;
            }
            this.deliver(msg_to_deliver, hdr);
        }
        catch (Exception ex) {
            this.log.error(Util.getMessage("FailureUnmarshallingBuffer"), ex);
        }
    }

    protected void deliver(Message msg, SequencerHeader hdr) {
        Address sender = msg.getSrc();
        if (sender == null) {
            this.log.error("%s: sender is null, cannot deliver ::%d", this.local_addr, hdr.getSeqno());
            return;
        }
        long msg_seqno = hdr.getSeqno();
        if (sender.equals(this.local_addr)) {
            this.forward_table.remove(msg_seqno);
            if (hdr.flush_ack) {
                this.ack_promise.setResult(msg_seqno);
                if (this.ack_mode && !this.flushing && this.threshold > 0 && ++this.num_acks >= this.threshold) {
                    this.ack_mode = false;
                    this.num_acks = 0;
                }
            }
        }
        if (!this.canDeliver(sender, msg_seqno)) {
            this.log.warn("%s: dropped duplicate message %s::%d", this.local_addr, sender, msg_seqno);
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": delivering " + sender + "::" + msg_seqno);
        }
        this.up_prot.up(msg);
        ++this.delivered_bcasts;
    }

    protected boolean canDeliver(Address sender, long seqno) {
        BoundedHashMap existing;
        BoundedHashMap<Long, Long> seqno_set = (BoundedHashMap<Long, Long>)this.delivery_table.get(sender);
        if (seqno_set == null && (existing = this.delivery_table.put(sender, seqno_set = new BoundedHashMap<Long, Long>(this.delivery_table_max_size))) != null) {
            seqno_set = existing;
        }
        return seqno_set.add(seqno, seqno);
    }

    protected void block() {
        this.send_lock.lock();
        try {
            while (this.flushing && this.running) {
                try {
                    this.send_cond.await();
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        finally {
            this.send_lock.unlock();
        }
    }

    protected void unblockAll() {
        this.flushing = false;
        this.send_lock.lock();
        try {
            this.send_cond.signalAll();
            this.ack_promise.setResult(null);
        }
        finally {
            this.send_lock.unlock();
        }
    }

    protected synchronized void startFlusher(Address new_coord) {
        if (this.flusher == null || !this.flusher.isAlive()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing started");
            }
            this.flushing = true;
            this.flusher = new Flusher(new_coord);
            this.flusher.setName("Flusher");
            this.flusher.start();
        }
    }

    protected void stopFlusher() {
        this.flushing = false;
        Flusher tmp = this.flusher;
        while (tmp != null && tmp.isAlive()) {
            tmp.interrupt();
            this.ack_promise.setResult(null);
            try {
                tmp.join();
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public static class SequencerHeader
    extends Header {
        protected static final byte FORWARD = 1;
        protected static final byte FLUSH = 2;
        protected static final byte BCAST = 3;
        protected static final byte WRAPPED_BCAST = 4;
        protected byte type = (byte)-1;
        protected long seqno = -1L;
        protected boolean flush_ack;

        public SequencerHeader() {
        }

        public SequencerHeader(byte type) {
            this.type = type;
        }

        public SequencerHeader(byte type, long seqno) {
            this(type);
            this.seqno = seqno;
        }

        @Override
        public short getMagicId() {
            return 61;
        }

        public long getSeqno() {
            return this.seqno;
        }

        @Override
        public Supplier<? extends Header> create() {
            return SequencerHeader::new;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder(64);
            sb.append(this.printType());
            if (this.seqno >= 0L) {
                sb.append(" seqno=" + this.seqno);
            }
            if (this.flush_ack) {
                sb.append(" (flush_ack)");
            }
            return sb.toString();
        }

        protected final String printType() {
            switch (this.type) {
                case 1: {
                    return "FORWARD";
                }
                case 2: {
                    return "FLUSH";
                }
                case 3: {
                    return "BCAST";
                }
                case 4: {
                    return "WRAPPED_BCAST";
                }
            }
            return "n/a";
        }

        @Override
        public void writeTo(DataOutput out) throws Exception {
            out.writeByte(this.type);
            Bits.writeLong(this.seqno, out);
            out.writeBoolean(this.flush_ack);
        }

        @Override
        public void readFrom(DataInput in) throws Exception {
            this.type = in.readByte();
            this.seqno = Bits.readLong(in);
            this.flush_ack = in.readBoolean();
        }

        @Override
        public int serializedSize() {
            return 1 + Bits.size(this.seqno) + 1;
        }
    }

    protected class Flusher
    extends Thread {
        protected final Address new_coord;

        public Flusher(Address new_coord) {
            this.new_coord = new_coord;
        }

        @Override
        public void run() {
            try {
                SEQUENCER.this.flush(this.new_coord);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

