package org.jgroups.protocols;

import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;

@Experimental
/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.Final.jar:org/jgroups/protocols/PRIO.class */
public class PRIO extends Protocol {
    private PriorityBlockingQueue<PriorityMessage> downMessageQueue;
    private PriorityBlockingQueue<PriorityMessage> upMessageQueue;
    private DownMessageThread downMessageThread;
    private UpMessageThread upMessageThread;

    @Property(description = "The number of miliseconds to sleep before after an error occurs before sending the next message")
    private int message_failure_sleep_time = 120000;

    @Property(description = "true to prioritize outgoing messages")
    private boolean prioritize_down = true;

    @Property(description = "true to prioritize incoming messages")
    private boolean prioritize_up = true;
    private Address local_addr;

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.Final.jar:org/jgroups/protocols/PRIO$DownMessageThread.class */
    private class DownMessageThread extends MessageThread {
        private DownMessageThread(PRIO prio, PriorityBlockingQueue<PriorityMessage> priorityBlockingQueue) {
            super(prio, priorityBlockingQueue);
        }

        @Override // org.jgroups.protocols.PRIO.MessageThread
        protected void handleMessage(PriorityMessage priorityMessage) {
            PRIO.this.log.trace("%s: sending priority %d message", PRIO.this.local_addr, Byte.valueOf(priorityMessage.priority));
            PRIO.this.down_prot.down(priorityMessage.event);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.Final.jar:org/jgroups/protocols/PRIO$MessageThread.class */
    private abstract class MessageThread extends Thread {
        private PRIO prio;
        private PriorityBlockingQueue<PriorityMessage> messageQueue;
        private volatile boolean running;

        private MessageThread(PRIO prio, PriorityBlockingQueue<PriorityMessage> priorityBlockingQueue) {
            this.running = true;
            this.prio = prio;
            this.messageQueue = priorityBlockingQueue;
            setName("PRIO " + (priorityBlockingQueue == PRIO.this.downMessageQueue ? "down" : "up"));
        }

        protected abstract void handleMessage(PriorityMessage priorityMessage);

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                PriorityMessage priorityMessage = null;
                try {
                    priorityMessage = this.messageQueue.take();
                    handleMessage(priorityMessage);
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    PRIO.this.log.error("Error handling message.  Sleeping " + (this.prio.message_failure_sleep_time / 1000) + " seconds", e2);
                    try {
                        sleep(this.prio.message_failure_sleep_time);
                        this.messageQueue.add(priorityMessage);
                    } catch (InterruptedException e3) {
                        return;
                    }
                }
            }
        }

        public void setRunning(boolean z) {
            this.running = z;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.Final.jar:org/jgroups/protocols/PRIO$PriorityCompare.class */
    private static class PriorityCompare implements Comparator<PriorityMessage> {
        private PriorityCompare() {
        }

        @Override // java.util.Comparator
        public int compare(PriorityMessage priorityMessage, PriorityMessage priorityMessage2) {
            if (priorityMessage.priority > priorityMessage2.priority) {
                return 1;
            }
            if (priorityMessage.priority < priorityMessage2.priority) {
                return -1;
            }
            if (priorityMessage.timestamp > priorityMessage2.timestamp) {
                return 1;
            }
            return priorityMessage.timestamp < priorityMessage2.timestamp ? -1 : 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.Final.jar:org/jgroups/protocols/PRIO$PriorityMessage.class */
    public static class PriorityMessage {
        Event event;
        long timestamp = System.currentTimeMillis();
        byte priority;

        protected PriorityMessage(Event event, byte b) {
            this.event = event;
            this.priority = b;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.0.0.Final.jar:org/jgroups/protocols/PRIO$UpMessageThread.class */
    private class UpMessageThread extends MessageThread {
        private UpMessageThread(PRIO prio, PriorityBlockingQueue<PriorityMessage> priorityBlockingQueue) {
            super(prio, priorityBlockingQueue);
        }

        @Override // org.jgroups.protocols.PRIO.MessageThread
        protected void handleMessage(PriorityMessage priorityMessage) {
            PRIO.this.log.trace("%s: delivering priority %d message", PRIO.this.local_addr, Byte.valueOf(priorityMessage.priority));
            PRIO.this.up_prot.up(priorityMessage.event);
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        if (this.prioritize_down) {
            this.downMessageQueue = new PriorityBlockingQueue<>(100, new PriorityCompare());
            this.downMessageThread = new DownMessageThread(this, this.downMessageQueue);
            this.downMessageThread.start();
        }
        if (this.prioritize_up) {
            this.upMessageQueue = new PriorityBlockingQueue<>(100, new PriorityCompare());
            this.upMessageThread = new UpMessageThread(this, this.upMessageQueue);
            this.upMessageThread.start();
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        if (this.downMessageThread != null) {
            this.downMessageThread.setRunning(false);
            this.downMessageThread.interrupt();
        }
        if (this.upMessageThread != null) {
            this.upMessageThread.setRunning(false);
            this.upMessageThread.interrupt();
        }
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        PrioHeader prioHeader;
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (!message.isFlagSet(Message.Flag.OOB) && (prioHeader = (PrioHeader) message.getHeader(this.id)) != null) {
                    this.log.trace("%s: adding priority message %d to UP queue", this.local_addr, Byte.valueOf(prioHeader.getPriority()));
                    this.upMessageQueue.add(new PriorityMessage(event, prioHeader.getPriority()));
                    return null;
                }
                return this.up_prot.up(event);
            default:
                return this.up_prot.up(event);
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void up(MessageBatch messageBatch) {
        PrioHeader prioHeader;
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (!next.isFlagSet(Message.Flag.OOB) && (prioHeader = (PrioHeader) next.getHeader(this.id)) != null) {
                this.log.trace("%s: adding priority message %d to UP queue", this.local_addr, Byte.valueOf(prioHeader.getPriority()));
                this.upMessageQueue.add(new PriorityMessage(new Event(1, next), prioHeader.getPriority()));
                messageBatch.remove(next);
            }
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        PrioHeader prioHeader;
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                if (!message.isFlagSet(Message.Flag.OOB) && (prioHeader = (PrioHeader) message.getHeader(this.id)) != null) {
                    this.log.trace("%s: adding priority message %d to DOWN queue", this.local_addr, Byte.valueOf(prioHeader.getPriority()));
                    this.downMessageQueue.add(new PriorityMessage(event, prioHeader.getPriority()));
                    return null;
                }
                return this.down_prot.down(event);
            case 8:
                this.local_addr = (Address) event.getArg();
                return this.down_prot.down(event);
            default:
                return this.down_prot.down(event);
        }
    }
}
