package org.jboss.messaging.core.server.impl;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.Notification;
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.LargeServerMessage;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.utils.TypedProperties;

/* loaded from: input_file:org/jboss/messaging/core/server/impl/ServerConsumerImpl.class */
public class ServerConsumerImpl implements ServerConsumer {
    private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private final long id;
    private final long replicatedSessionID;
    private final Queue messageQueue;
    private final Filter filter;
    private final int minLargeMessageSize;
    private final ServerSession session;
    private final Executor executor;
    private boolean started;
    private final boolean browseOnly;
    private final boolean updateDeliveries;
    private final StorageManager storageManager;
    private final PagingManager pagingManager;
    private final Channel channel;
    private final Channel replicatingChannel;
    private volatile boolean closed;
    private final boolean preAcknowledge;
    private final ManagementService managementService;
    private final Binding binding;
    private final Lock lock = new ReentrantLock();
    private AtomicInteger availableCredits = new AtomicInteger(0);
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
    private final AtomicInteger pendingLargeMessagesCounter = new AtomicInteger(0);
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue();
    final Runnable resumeLargeMessageRunnable = new Runnable() { // from class: org.jboss.messaging.core.server.impl.ServerConsumerImpl.3
        @Override // java.lang.Runnable
        public void run() {
            ServerConsumerImpl.this.lock.lock();
            try {
                if (ServerConsumerImpl.this.largeMessageDeliverer == null || ServerConsumerImpl.this.largeMessageDeliverer.deliver()) {
                    ServerConsumerImpl.this.session.promptDelivery(ServerConsumerImpl.this.messageQueue);
                }
            } finally {
                ServerConsumerImpl.this.lock.unlock();
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jboss/messaging/core/server/impl/ServerConsumerImpl$LargeMessageDeliverer.class */
    public class LargeMessageDeliverer {
        private final long sizePendingLargeMessage;
        private final LargeServerMessage pendingLargeMessage;
        private final MessageReference ref;
        private volatile boolean sentFirstMessage = false;
        private volatile long positionPendingLargeMessage;

        public LargeMessageDeliverer(LargeServerMessage largeServerMessage, MessageReference messageReference) {
            this.pendingLargeMessage = largeServerMessage;
            this.pendingLargeMessage.incrementRefCount();
            this.sizePendingLargeMessage = this.pendingLargeMessage.getLargeBodySize();
            this.ref = messageReference;
        }

        public boolean deliver() {
            SessionReceiveMessage sessionReceiveMessage;
            ServerConsumerImpl.this.lock.lock();
            try {
                if (this.pendingLargeMessage == null) {
                    return true;
                }
                if (ServerConsumerImpl.this.availableCredits != null && ServerConsumerImpl.this.availableCredits.get() <= 0) {
                    ServerConsumerImpl.this.lock.unlock();
                    return false;
                }
                if (this.sentFirstMessage) {
                    sessionReceiveMessage = null;
                } else {
                    this.sentFirstMessage = true;
                    ChannelBuffer buffer = ChannelBuffers.buffer(this.pendingLargeMessage.getPropertiesEncodeSize());
                    this.pendingLargeMessage.encodeProperties(buffer);
                    sessionReceiveMessage = new SessionReceiveMessage(ServerConsumerImpl.this.id, buffer.array(), this.pendingLargeMessage.getLargeBodySize(), this.ref.getDeliveryCount());
                }
                int preCalculateFlowControl = ServerConsumerImpl.this.availableCredits != null ? preCalculateFlowControl(sessionReceiveMessage) : 0;
                if (sessionReceiveMessage != null) {
                    ServerConsumerImpl.this.channel.send(sessionReceiveMessage);
                    if (ServerConsumerImpl.this.availableCredits != null) {
                        preCalculateFlowControl -= sessionReceiveMessage.getRequiredBufferSize();
                    }
                }
                while (this.positionPendingLargeMessage < this.sizePendingLargeMessage) {
                    if (preCalculateFlowControl <= 0 && ServerConsumerImpl.this.availableCredits != null) {
                        if (ServerConsumerImpl.trace) {
                            ServerConsumerImpl.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + ServerConsumerImpl.this.messageQueue.isBackup());
                        }
                        ServerConsumerImpl.this.lock.unlock();
                        return false;
                    }
                    SessionReceiveContinuationMessage createChunkSend = createChunkSend();
                    int length = createChunkSend.getBody().length;
                    if (ServerConsumerImpl.this.availableCredits != null) {
                        int requiredBufferSize = preCalculateFlowControl - createChunkSend.getRequiredBufferSize();
                        preCalculateFlowControl = requiredBufferSize;
                        if (requiredBufferSize < 0) {
                            ServerConsumerImpl.log.warn("Flowcontrol logic is not working properly, too many credits were taken");
                        }
                    }
                    if (ServerConsumerImpl.trace) {
                        ServerConsumerImpl.trace("deliverLargeMessage: Sending " + createChunkSend.getRequiredBufferSize() + " availableCredits now is " + ServerConsumerImpl.this.availableCredits + " isBackup = " + ServerConsumerImpl.this.messageQueue.isBackup());
                    }
                    ServerConsumerImpl.this.channel.send(createChunkSend);
                    this.positionPendingLargeMessage += length;
                }
                if (preCalculateFlowControl != 0) {
                    ServerConsumerImpl.log.warn("Flowcontrol logic is not working properly... creidts = " + preCalculateFlowControl);
                }
                if (ServerConsumerImpl.trace) {
                    ServerConsumerImpl.trace("Finished deliverLargeMessage isBackup = " + ServerConsumerImpl.this.messageQueue.isBackup());
                }
                close();
                ServerConsumerImpl.this.lock.unlock();
                return true;
            } finally {
                ServerConsumerImpl.this.lock.unlock();
            }
        }

        public void close() {
            this.pendingLargeMessage.releaseResources();
            int decrementRefCount = this.pendingLargeMessage.decrementRefCount();
            if (ServerConsumerImpl.this.preAcknowledge && !ServerConsumerImpl.this.browseOnly) {
                decrementRefCount = this.pendingLargeMessage.decrementRefCount();
            }
            if (!ServerConsumerImpl.this.browseOnly && decrementRefCount == 0) {
                try {
                    ServerConsumerImpl.this.pagingManager.getPageStore(ServerConsumerImpl.this.binding.getAddress()).addSize(-this.pendingLargeMessage.getMemoryEstimate());
                } catch (Exception e) {
                    ServerConsumerImpl.log.error("Error getting pageStore", e);
                }
            }
            ServerConsumerImpl.this.largeMessageDeliverer = null;
            ServerConsumerImpl.this.pendingLargeMessagesCounter.decrementAndGet();
        }

        private int preCalculateFlowControl(SessionReceiveMessage sessionReceiveMessage) {
            int i;
            int i2;
            do {
                i = ServerConsumerImpl.this.availableCredits.get();
                i2 = 0;
                if (sessionReceiveMessage != null) {
                    i2 = sessionReceiveMessage.getRequiredBufferSize();
                }
                long j = this.positionPendingLargeMessage;
                while (true) {
                    long j2 = j;
                    if (i2 >= i || j2 >= this.sizePendingLargeMessage) {
                        break;
                    }
                    long min = (int) Math.min(this.sizePendingLargeMessage - j2, ServerConsumerImpl.this.minLargeMessageSize);
                    i2 = (int) (i2 + min + 26);
                    j = j2 + min;
                }
            } while (!ServerConsumerImpl.this.availableCredits.compareAndSet(i, i - i2));
            if (ServerConsumerImpl.trace) {
                ServerConsumerImpl.log.trace("Taking " + i2 + " credits out on preCalculateFlowControl (largeMessage)");
            }
            return i2;
        }

        private SessionReceiveContinuationMessage createChunkSend() {
            int min = (int) Math.min(this.sizePendingLargeMessage - this.positionPendingLargeMessage, ServerConsumerImpl.this.minLargeMessageSize);
            ChannelBuffer buffer = ChannelBuffers.buffer(min);
            this.pendingLargeMessage.encodeBody(buffer, this.positionPendingLargeMessage, min);
            return new SessionReceiveContinuationMessage(ServerConsumerImpl.this.id, buffer.array(), this.positionPendingLargeMessage + ((long) min) < this.sizePendingLargeMessage, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void trace(String str) {
        log.trace(str);
    }

    public ServerConsumerImpl(long j, long j2, ServerSession serverSession, QueueBinding queueBinding, Filter filter, boolean z, boolean z2, StorageManager storageManager, PagingManager pagingManager, Channel channel, Channel channel2, boolean z3, boolean z4, Executor executor, ManagementService managementService) throws Exception {
        this.id = j;
        this.replicatedSessionID = j2;
        this.filter = filter;
        this.session = serverSession;
        this.binding = queueBinding;
        this.messageQueue = queueBinding.getQueue();
        this.executor = executor;
        this.started = z2 || z;
        this.browseOnly = z2;
        this.storageManager = storageManager;
        this.channel = channel;
        this.replicatingChannel = channel2;
        this.preAcknowledge = z3;
        this.pagingManager = pagingManager;
        this.managementService = managementService;
        this.minLargeMessageSize = serverSession.getMinLargeMessageSize();
        this.updateDeliveries = z4;
        queueBinding.getQueue().addConsumer(this);
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public long getID() {
        return this.id;
    }

    @Override // org.jboss.messaging.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        return doHandle(messageReference);
    }

    @Override // org.jboss.messaging.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void close() throws Exception {
        setStarted(false);
        if (this.largeMessageDeliverer != null) {
            this.largeMessageDeliverer.close();
        }
        this.messageQueue.removeConsumer(this);
        this.session.removeConsumer(this);
        Iterator<MessageReference> it = cancelRefs(false, (Transaction) null).iterator();
        this.closed = true;
        TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        while (it.hasNext()) {
            MessageReference next = it.next();
            next.getQueue().cancel(transactionImpl, next);
        }
        transactionImpl.rollback();
        if (this.browseOnly) {
            return;
        }
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.putStringProperty(ManagementHelper.HDR_ADDRESS, this.binding.getAddress());
        typedProperties.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, this.binding.getClusterName());
        typedProperties.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, this.binding.getRoutingName());
        typedProperties.putStringProperty(ManagementHelper.HDR_FILTERSTRING, this.filter == null ? null : this.filter.getFilterString());
        typedProperties.putIntProperty(ManagementHelper.HDR_DISTANCE, this.binding.getDistance());
        typedProperties.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, this.messageQueue.getConsumerCount());
        this.managementService.sendNotification(new Notification(null, NotificationType.CONSUMER_CLOSED, typedProperties));
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public int getCountOfPendingDeliveries() {
        return this.deliveringRefs.size();
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public LinkedList<MessageReference> cancelRefs(boolean z, Transaction transaction) throws Exception {
        boolean z2 = z;
        LinkedList<MessageReference> linkedList = new LinkedList<>();
        if (!this.deliveringRefs.isEmpty()) {
            for (MessageReference messageReference : this.deliveringRefs) {
                if (z2) {
                    acknowledge(false, transaction, messageReference.getMessage().getMessageID());
                    z2 = false;
                } else {
                    messageReference.decrementDeliveryCount();
                    linkedList.add(messageReference);
                }
            }
            this.deliveringRefs.clear();
        }
        return linkedList;
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void setStarted(boolean z) {
        this.lock.lock();
        try {
            this.started = this.browseOnly || z;
            this.lock.unlock();
            if (z) {
                promptDelivery();
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void receiveCredits(int i) throws Exception {
        if (i == -1) {
            this.availableCredits = null;
            return;
        }
        int andAdd = this.availableCredits.getAndAdd(i);
        if (trace) {
            log.trace("Received " + i + " credits, previous value = " + andAdd + " currentValue = " + this.availableCredits.get());
        }
        if (andAdd > 0 || andAdd + i <= 0) {
            return;
        }
        promptDelivery();
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public Queue getQueue() {
        return this.messageQueue;
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void acknowledge(boolean z, Transaction transaction, long j) throws Exception {
        MessageReference poll;
        if (this.browseOnly) {
            return;
        }
        do {
            poll = this.deliveringRefs.poll();
            if (poll == null) {
                throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" + this.id + ", messageId = " + j + " backup = " + this.messageQueue.isBackup() + " queue = " + ((Object) this.messageQueue.getName()) + " closed = " + this.closed);
            }
            if (z) {
                poll.getQueue().acknowledge(poll);
            } else {
                poll.getQueue().acknowledge(transaction, poll);
            }
        } while (poll.getMessage().getMessageID() != j);
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public MessageReference getExpired(long j) throws Exception {
        if (this.browseOnly) {
            return null;
        }
        Iterator<MessageReference> it = this.deliveringRefs.iterator();
        MessageReference messageReference = null;
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            MessageReference next = it.next();
            if (next.getMessage().getMessageID() == j) {
                it.remove();
                messageReference = next;
                break;
            }
        }
        if (messageReference == null) {
            throw new IllegalStateException("Could not find reference with id " + j + " backup " + this.messageQueue.isBackup() + " closed " + this.closed);
        }
        return messageReference;
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void deliverReplicated(long j) throws Exception {
        MessageReference removeFirstReference = this.messageQueue.removeFirstReference(j);
        if (removeFirstReference == null) {
            if (!this.pagingManager.getPageStore(this.binding.getAddress()).readPage()) {
                throw new IllegalStateException("Cannot find Reference[" + j + "] in queue " + ((Object) this.messageQueue.getName()));
            }
            removeFirstReference = this.messageQueue.removeFirstReference(j);
            if (removeFirstReference == null) {
                throw new IllegalStateException("Cannot find Reference[" + j + "] after depaging on Queue " + ((Object) this.messageQueue.getName()));
            }
        }
        HandleStatus doHandle = doHandle(removeFirstReference);
        if (doHandle != HandleStatus.HANDLED) {
            throw new IllegalStateException("Reference " + removeFirstReference + " was not handled on backup node, handleStatus = " + doHandle);
        }
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void failedOver() {
        if (this.messageQueue.consumerFailedOver() && this.started) {
            promptDelivery();
        }
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void lock() {
        this.lock.lock();
    }

    @Override // org.jboss.messaging.core.server.ServerConsumer
    public void unlock() {
        this.lock.unlock();
    }

    public AtomicInteger getAvailableCredits() {
        return this.availableCredits;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void promptDelivery() {
        if (trace) {
            log.trace("Starting prompt delivery");
        }
        this.lock.lock();
        try {
            if (this.largeMessageDeliverer != null) {
                resumeLargeMessage();
            } else {
                this.session.promptDelivery(this.messageQueue);
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void resumeLargeMessage() {
        if (this.messageQueue.isBackup()) {
            this.largeMessageDeliverer.deliver();
        } else {
            this.executor.execute(this.resumeLargeMessageRunnable);
        }
    }

    private HandleStatus doHandle(MessageReference messageReference) throws Exception {
        if (this.availableCredits != null && this.availableCredits.get() <= 0) {
            return HandleStatus.BUSY;
        }
        this.lock.lock();
        try {
            if (!this.started) {
                HandleStatus handleStatus = HandleStatus.BUSY;
                this.lock.unlock();
                return handleStatus;
            }
            if (this.pendingLargeMessagesCounter.get() > 0) {
                if (this.messageQueue.isBackup()) {
                    log.warn("doHandle: rejecting message while send is pending, ignoring reference = " + messageReference + " backup = " + this.messageQueue.isBackup());
                }
                HandleStatus handleStatus2 = HandleStatus.BUSY;
                this.lock.unlock();
                return handleStatus2;
            }
            ServerMessage message = messageReference.getMessage();
            if (this.filter != null && !this.filter.match(message)) {
                HandleStatus handleStatus3 = HandleStatus.NO_MATCH;
                this.lock.unlock();
                return handleStatus3;
            }
            if (!this.browseOnly) {
                if (!this.preAcknowledge) {
                    this.deliveringRefs.add(messageReference);
                }
                messageReference.getQueue().referenceHandled();
                messageReference.incrementDeliveryCount();
                if (this.updateDeliveries && messageReference.getMessage().isDurable() && messageReference.getQueue().isDurable()) {
                    this.storageManager.updateDeliveryCount(messageReference);
                }
                if (this.preAcknowledge) {
                    if (message.isLargeMessage()) {
                        message.incrementRefCount();
                    }
                    messageReference.getQueue().acknowledge(messageReference);
                }
            }
            if (message.isLargeMessage()) {
                deliverLargeMessage(messageReference, message);
            } else {
                deliverStandardMessage(messageReference, message);
            }
            HandleStatus handleStatus4 = HandleStatus.HANDLED;
            this.lock.unlock();
            return handleStatus4;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void deliverLargeMessage(MessageReference messageReference, ServerMessage serverMessage) {
        this.pendingLargeMessagesCounter.incrementAndGet();
        final LargeMessageDeliverer largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) serverMessage, messageReference);
        if (this.replicatingChannel == null) {
            this.largeMessageDeliverer = largeMessageDeliverer;
            this.largeMessageDeliverer.deliver();
        } else {
            SessionReplicateDeliveryMessage sessionReplicateDeliveryMessage = new SessionReplicateDeliveryMessage(this.id, serverMessage.getMessageID());
            sessionReplicateDeliveryMessage.setChannelID(this.channel.getID());
            this.replicatingChannel.replicatePacket(sessionReplicateDeliveryMessage, this.replicatedSessionID, new Runnable() { // from class: org.jboss.messaging.core.server.impl.ServerConsumerImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    ServerConsumerImpl.this.lock.lock();
                    try {
                        ServerConsumerImpl.this.largeMessageDeliverer = largeMessageDeliverer;
                        if (ServerConsumerImpl.this.largeMessageDeliverer.deliver()) {
                            ServerConsumerImpl.this.promptDelivery();
                        }
                    } finally {
                        ServerConsumerImpl.this.lock.unlock();
                    }
                }
            });
        }
    }

    private void deliverStandardMessage(final MessageReference messageReference, ServerMessage serverMessage) {
        final SessionReceiveMessage sessionReceiveMessage = new SessionReceiveMessage(this.id, serverMessage, messageReference.getDeliveryCount());
        if (this.availableCredits != null) {
            this.availableCredits.addAndGet(-sessionReceiveMessage.getRequiredBufferSize());
            if (trace) {
                log.trace("Taking " + sessionReceiveMessage.getRequiredBufferSize() + " out of flow control");
            }
        }
        if (this.replicatingChannel == null) {
            if (trace) {
                log.trace("delivering Message " + messageReference + " on backup");
            }
            this.channel.send(sessionReceiveMessage);
        } else {
            SessionReplicateDeliveryMessage sessionReplicateDeliveryMessage = new SessionReplicateDeliveryMessage(this.id, serverMessage.getMessageID());
            sessionReplicateDeliveryMessage.setChannelID(this.channel.getID());
            this.replicatingChannel.replicatePacket(sessionReplicateDeliveryMessage, this.replicatedSessionID, new Runnable() { // from class: org.jboss.messaging.core.server.impl.ServerConsumerImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    if (ServerConsumerImpl.trace) {
                        ServerConsumerImpl.log.trace("delivering Message " + messageReference + " on live");
                    }
                    ServerConsumerImpl.this.channel.send(sessionReceiveMessage);
                }
            });
        }
    }
}
