package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Bits;
import org.jgroups.util.BoundedHashMap;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;

@MBean(description = "Implementation of total order protocol using a sequencer")
/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.2.0-SNAPSHOT.jar:org/jgroups/protocols/SEQUENCER.class */
public class SEQUENCER extends Protocol {
    protected Address local_addr;
    protected volatile Address coord;
    protected volatile View view;
    protected volatile Flusher flusher;
    protected volatile boolean is_coord = false;
    protected final AtomicLong seqno = new AtomicLong(0);
    protected final NavigableMap<Long, Message> forward_table = new ConcurrentSkipListMap();
    protected final Lock send_lock = new ReentrantLock();
    protected final Condition send_cond = this.send_lock.newCondition();
    protected volatile boolean ack_mode = true;
    protected volatile boolean flushing = false;
    protected volatile boolean running = true;
    protected final AtomicInteger in_flight_sends = new AtomicInteger(0);
    protected final ConcurrentMap<Address, BoundedHashMap<Long, Long>> delivery_table = Util.createConcurrentMap();
    protected final Promise<Long> ack_promise = new Promise<>();

    @Property(description = "Size of the set to store received seqnos (for duplicate checking)")
    protected int delivery_table_max_size = 2000;

    @Property(description = "Number of acks needed before going from ack-mode to normal mode. 0 disables this, which means that ack-mode is always on")
    protected int threshold = 10;
    protected int num_acks = 0;
    protected long forwarded_msgs = 0;
    protected long bcast_msgs = 0;
    protected long received_forwards = 0;
    protected long received_bcasts = 0;
    protected long delivered_bcasts = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.2.0-SNAPSHOT.jar:org/jgroups/protocols/SEQUENCER$Flusher.class */
    public class Flusher extends Thread {
        protected final Address new_coord;

        public Flusher(Address address) {
            this.new_coord = address;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                SEQUENCER.this.flush(this.new_coord);
            } catch (InterruptedException e) {
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.2.0-SNAPSHOT.jar:org/jgroups/protocols/SEQUENCER$SequencerHeader.class */
    public static class SequencerHeader extends Header {
        protected static final byte FORWARD = 1;
        protected static final byte FLUSH = 2;
        protected static final byte BCAST = 3;
        protected static final byte WRAPPED_BCAST = 4;
        protected byte type;
        protected long seqno;
        protected boolean flush_ack;

        public SequencerHeader() {
            this.type = (byte) -1;
            this.seqno = -1L;
        }

        public SequencerHeader(byte b) {
            this.type = (byte) -1;
            this.seqno = -1L;
            this.type = b;
        }

        public SequencerHeader(byte b, long j) {
            this(b);
            this.seqno = j;
        }

        public long getSeqno() {
            return this.seqno;
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuilder sb = new StringBuilder(64);
            sb.append(printType());
            if (this.seqno >= 0) {
                sb.append(" seqno=" + this.seqno);
            }
            if (this.flush_ack) {
                sb.append(" (flush_ack)");
            }
            return sb.toString();
        }

        protected final String printType() {
            switch (this.type) {
                case 1:
                    return "FORWARD";
                case 2:
                    return "FLUSH";
                case 3:
                    return "BCAST";
                case 4:
                    return "WRAPPED_BCAST";
                default:
                    return "n/a";
            }
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeByte(this.type);
            Bits.writeLong(this.seqno, dataOutput);
            dataOutput.writeBoolean(this.flush_ack);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readByte();
            this.seqno = Bits.readLong(dataInput);
            this.flush_ack = dataInput.readBoolean();
        }

        @Override // org.jgroups.Header
        public int size() {
            return 1 + Bits.size(this.seqno) + 1;
        }
    }

    @ManagedAttribute
    public boolean isCoordinator() {
        return this.is_coord;
    }

    public Address getCoordinator() {
        return this.coord;
    }

    public Address getLocalAddress() {
        return this.local_addr;
    }

    @ManagedAttribute
    public long getForwarded() {
        return this.forwarded_msgs;
    }

    @ManagedAttribute
    public long getBroadcast() {
        return this.bcast_msgs;
    }

    @ManagedAttribute
    public long getReceivedForwards() {
        return this.received_forwards;
    }

    @ManagedAttribute
    public long getReceivedBroadcasts() {
        return this.received_bcasts;
    }

    @ManagedAttribute(description = "Number of messages in the forward-table")
    public int getForwardTableSize() {
        return this.forward_table.size();
    }

    public void setThreshold(int i) {
        this.threshold = i;
    }

    public void setDeliveryTableMaxSize(int i) {
        this.delivery_table_max_size = i;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r5v0, types: [org.jgroups.protocols.SEQUENCER] */
    @Override // org.jgroups.stack.Protocol
    @ManagedOperation
    public void resetStats() {
        ?? r5 = 0;
        this.delivered_bcasts = 0L;
        this.received_bcasts = 0L;
        r5.received_forwards = this;
        this.bcast_msgs = this;
        this.forwarded_msgs = 0L;
    }

    @Override // org.jgroups.stack.Protocol
    @ManagedOperation
    public Map<String, Object> dumpStats() {
        Map<String, Object> dumpStats = super.dumpStats();
        dumpStats.put("forwarded", Long.valueOf(this.forwarded_msgs));
        dumpStats.put("broadcast", Long.valueOf(this.bcast_msgs));
        dumpStats.put("received_forwards", Long.valueOf(this.received_forwards));
        dumpStats.put("received_bcasts", Long.valueOf(this.received_bcasts));
        dumpStats.put("delivered_bcasts", Long.valueOf(this.delivered_bcasts));
        return dumpStats;
    }

    @Override // org.jgroups.stack.Protocol
    @ManagedOperation
    public String printStats() {
        return dumpStats().toString();
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        super.start();
        this.running = true;
        this.ack_mode = true;
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        this.running = false;
        unblockAll();
        stopFlusher();
        super.stop();
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (message.getDest() == null && !message.isFlagSet(Message.Flag.NO_TOTAL_ORDER) && !message.isFlagSet(Message.Flag.OOB)) {
                    if (message.getSrc() == null) {
                        message.setSrc(this.local_addr);
                    }
                    if (this.flushing) {
                        block();
                    }
                    long incrementAndGet = this.seqno.incrementAndGet();
                    this.in_flight_sends.incrementAndGet();
                    try {
                        try {
                            message.putHeader(this.id, new SequencerHeader(this.is_coord ? (byte) 3 : (byte) 4, incrementAndGet));
                            if (this.log.isTraceEnabled()) {
                                this.log.trace("[" + this.local_addr + "]: forwarding " + this.local_addr + "::" + this.seqno + " to coord " + this.coord);
                            }
                            forwardToCoord(incrementAndGet, message);
                            this.in_flight_sends.decrementAndGet();
                            return null;
                        } catch (Exception e) {
                            this.log.error("failed sending message", e);
                            this.in_flight_sends.decrementAndGet();
                            return null;
                        }
                    } catch (Throwable th) {
                        this.in_flight_sends.decrementAndGet();
                        throw th;
                    }
                }
                break;
            case 6:
                handleViewChange((View) event.getArg());
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
            case 15:
                handleTmpView((View) event.getArg());
                break;
        }
        return this.down_prot.down(event);
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        SequencerHeader sequencerHeader;
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (!message.isFlagSet(Message.Flag.NO_TOTAL_ORDER) && !message.isFlagSet(Message.Flag.OOB) && (sequencerHeader = (SequencerHeader) message.getHeader(this.id)) != null) {
                    switch (sequencerHeader.type) {
                        case 1:
                        case 2:
                            if (!this.is_coord) {
                                if (!this.log.isErrorEnabled()) {
                                    return null;
                                }
                                this.log.error(this.local_addr + ": non-coord; dropping FORWARD request from " + message.getSrc());
                                return null;
                            }
                            Address src = message.getSrc();
                            if (this.view == null || this.view.containsMember(src)) {
                                broadcast(message, true, message.getSrc(), sequencerHeader.seqno, sequencerHeader.type == 2);
                                this.received_forwards++;
                                return null;
                            }
                            if (!this.log.isErrorEnabled()) {
                                return null;
                            }
                            this.log.error(this.local_addr + ": dropping FORWARD request from non-member " + src + "; view=" + this.view);
                            return null;
                        case 3:
                            deliver(message, event, sequencerHeader);
                            this.received_bcasts++;
                            return null;
                        case 4:
                            unwrapAndDeliver(message, sequencerHeader.flush_ack);
                            this.received_bcasts++;
                            return null;
                        default:
                            return null;
                    }
                }
                break;
            case 6:
                Object up = this.up_prot.up(event);
                handleViewChange((View) event.getArg());
                return up;
            case 15:
                handleTmpView((View) event.getArg());
                break;
        }
        return this.up_prot.up(event);
    }

    @Override // org.jgroups.stack.Protocol
    public void up(MessageBatch messageBatch) {
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (!next.isFlagSet(Message.Flag.NO_TOTAL_ORDER) && !next.isFlagSet(Message.Flag.OOB) && next.getHeader(this.id) != null) {
                messageBatch.remove(next);
                try {
                    up(new Event(1, next));
                } catch (Throwable th) {
                    this.log.error("failed passing up message", th);
                }
            }
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    protected void handleViewChange(View view) {
        List<Address> members = view.getMembers();
        if (members.isEmpty()) {
            return;
        }
        if (this.view == null || this.view.compareTo(view) < 0) {
            this.view = view;
            this.delivery_table.keySet().retainAll(members);
            Address address = this.coord;
            Address address2 = members.get(0);
            if (!(address == null || !address.equals(address2)) || address2 == null) {
                return;
            }
            stopFlusher();
            startFlusher(address2);
        }
    }

    protected void flush(Address address) throws InterruptedException {
        while (this.flushing && this.running && this.in_flight_sends.get() != 0) {
            Thread.sleep(100L);
        }
        this.send_lock.lockInterruptibly();
        try {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": coord changed from " + this.coord + " to " + address);
            }
            this.coord = address;
            this.is_coord = this.local_addr != null && this.local_addr.equals(this.coord);
            flushMessagesInForwardTable();
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing completed");
            }
            this.flushing = false;
            this.ack_mode = true;
            this.num_acks = 0;
            this.send_cond.signalAll();
            this.send_lock.unlock();
        } catch (Throwable th) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing completed");
            }
            this.flushing = false;
            this.ack_mode = true;
            this.num_acks = 0;
            this.send_cond.signalAll();
            this.send_lock.unlock();
            throw th;
        }
    }

    private void handleTmpView(View view) {
        List<Address> members = view.getMembers();
        if (members.isEmpty()) {
            return;
        }
        Address address = members.get(0);
        if (address.equals(this.coord) || this.local_addr == null || !this.local_addr.equals(address)) {
            return;
        }
        handleViewChange(view);
    }

    protected void flushMessagesInForwardTable() {
        if (this.is_coord) {
            for (Map.Entry<Long, Message> entry : this.forward_table.entrySet()) {
                Long key = entry.getKey();
                try {
                    Message putHeader = new Message((Address) null, Util.objectToByteBuffer(entry.getValue())).putHeader(this.id, new SequencerHeader((byte) 4, key.longValue()));
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": flushing (broadcasting) " + this.local_addr + "::" + key);
                    }
                    this.down_prot.down(new Event(1, putHeader));
                } catch (Exception e) {
                    this.log.error("flushing (broadcasting) failed", e);
                }
            }
            return;
        }
        while (this.flushing && this.running && !this.forward_table.isEmpty()) {
            Map.Entry<Long, Message> firstEntry = this.forward_table.firstEntry();
            Long key2 = firstEntry.getKey();
            try {
                byte[] objectToByteBuffer = Util.objectToByteBuffer(firstEntry.getValue());
                while (this.flushing && this.running && !this.forward_table.isEmpty()) {
                    Message flag = new Message(this.coord, objectToByteBuffer).putHeader(this.id, new SequencerHeader((byte) 2, key2.longValue())).setFlag(Message.Flag.DONT_BUNDLE);
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": flushing (forwarding) " + this.local_addr + "::" + key2 + " to coord " + this.coord);
                    }
                    this.ack_promise.reset();
                    this.down_prot.down(new Event(1, flag));
                    Long result = this.ack_promise.getResult(500L);
                    if ((result == null || !result.equals(key2)) && this.forward_table.containsKey(key2)) {
                    }
                }
            } catch (Exception e2) {
                this.log.error("flushing (broadcasting) failed", e2);
            }
        }
    }

    protected void forwardToCoord(long j, Message message) {
        Long result;
        if (this.is_coord) {
            forward(message, j, false);
            return;
        }
        if (!this.running || this.flushing) {
            this.forward_table.put(Long.valueOf(j), message);
            return;
        }
        if (!this.ack_mode) {
            this.forward_table.put(Long.valueOf(j), message);
            forward(message, j, false);
            return;
        }
        this.send_lock.lock();
        try {
            this.forward_table.put(Long.valueOf(j), message);
            while (this.running && !this.flushing) {
                this.ack_promise.reset();
                forward(message, j, true);
                if (!this.ack_mode || !this.running || this.flushing || (((result = this.ack_promise.getResult(500L)) != null && result.equals(Long.valueOf(j))) || !this.forward_table.containsKey(Long.valueOf(j)))) {
                    break;
                }
            }
        } finally {
            this.send_lock.unlock();
        }
    }

    protected void forward(Message message, long j, boolean z) {
        Address address = this.coord;
        if (address == null) {
            return;
        }
        try {
            this.down_prot.down(new Event(1, new Message(address, Util.objectToByteBuffer(message)).putHeader(this.id, new SequencerHeader(z ? (byte) 2 : (byte) 1, j))));
            this.forwarded_msgs++;
        } catch (Exception e) {
            this.log.error("failed forwarding message to " + message.getDest(), e);
        }
    }

    protected void broadcast(Message message, boolean z, Address address, long j, boolean z2) {
        Message putHeader;
        if (z) {
            SequencerHeader sequencerHeader = new SequencerHeader((byte) 4, j);
            putHeader = new Message(null, message.getRawBuffer(), message.getOffset(), message.getLength()).putHeader(this.id, sequencerHeader);
            if (z2) {
                sequencerHeader.flush_ack = true;
                putHeader.setFlag(Message.Flag.DONT_BUNDLE);
            }
        } else {
            putHeader = message;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": broadcasting " + address + "::" + j);
        }
        this.down_prot.down(new Event(1, putHeader));
        this.bcast_msgs++;
    }

    protected void unwrapAndDeliver(Message message, boolean z) {
        try {
            Message message2 = (Message) Util.objectFromByteBuffer(message.getRawBuffer(), message.getOffset(), message.getLength());
            SequencerHeader sequencerHeader = (SequencerHeader) message2.getHeader(this.id);
            if (z) {
                sequencerHeader.flush_ack = true;
            }
            deliver(message2, new Event(1, message2), sequencerHeader);
        } catch (Exception e) {
            this.log.error("failure unmarshalling buffer", e);
        }
    }

    protected void deliver(Message message, Event event, SequencerHeader sequencerHeader) {
        Address src = message.getSrc();
        if (src == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(this.local_addr + ": sender is null, cannot deliver ::" + sequencerHeader.getSeqno());
                return;
            }
            return;
        }
        long seqno = sequencerHeader.getSeqno();
        if (src.equals(this.local_addr)) {
            this.forward_table.remove(Long.valueOf(seqno));
            if (sequencerHeader.flush_ack) {
                this.ack_promise.setResult(Long.valueOf(seqno));
                if (this.ack_mode && !this.flushing && this.threshold > 0) {
                    int i = this.num_acks + 1;
                    this.num_acks = i;
                    if (i >= this.threshold) {
                        this.ack_mode = false;
                        this.num_acks = 0;
                    }
                }
            }
        }
        if (!canDeliver(src, seqno)) {
            if (this.log.isWarnEnabled()) {
                this.log.warn(this.local_addr + ": dropped duplicate message " + src + "::" + seqno);
            }
        } else {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": delivering " + src + "::" + seqno);
            }
            this.up_prot.up(event);
            this.delivered_bcasts++;
        }
    }

    protected boolean canDeliver(Address address, long j) {
        BoundedHashMap<Long, Long> boundedHashMap = this.delivery_table.get(address);
        if (boundedHashMap == null) {
            boundedHashMap = new BoundedHashMap<>(this.delivery_table_max_size);
            BoundedHashMap<Long, Long> put = this.delivery_table.put(address, boundedHashMap);
            if (put != null) {
                boundedHashMap = put;
            }
        }
        return boundedHashMap.add(Long.valueOf(j), Long.valueOf(j));
    }

    protected void block() {
        this.send_lock.lock();
        while (this.flushing && this.running) {
            try {
                try {
                    this.send_cond.await();
                } catch (InterruptedException e) {
                }
            } finally {
                this.send_lock.unlock();
            }
        }
    }

    protected void unblockAll() {
        this.flushing = false;
        this.send_lock.lock();
        try {
            this.send_cond.signalAll();
            this.ack_promise.setResult(null);
            this.send_lock.unlock();
        } catch (Throwable th) {
            this.send_lock.unlock();
            throw th;
        }
    }

    protected synchronized void startFlusher(Address address) {
        if (this.flusher == null || !this.flusher.isAlive()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing started");
            }
            this.flushing = true;
            this.flusher = new Flusher(address);
            this.flusher.setName("Flusher");
            this.flusher.start();
        }
    }

    protected void stopFlusher() {
        this.flushing = false;
        Flusher flusher = this.flusher;
        while (flusher != null && flusher.isAlive()) {
            flusher.interrupt();
            this.ack_promise.setResult(null);
            try {
                flusher.join();
            } catch (InterruptedException e) {
            }
        }
    }
}
