package org.apache.qpid.server;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;

/* loaded from: input_file:org/apache/qpid/server/AMQChannel.class */
public class AMQChannel {
    public static final int DEFAULT_PREFETCH = 5000;
    private static final Logger _log = Logger.getLogger(AMQChannel.class);
    private final int _channelId;
    private boolean _transactional;
    private AMQQueue _defaultQueue;
    private int _consumerTag;
    private AMQMessage _currentMessage;
    private final MessageStore _messageStore;
    private long _lastDeliveryTag;
    private final MessageRouter _exchanges;
    private final TxnBuffer _txnBuffer;
    private TxAck ackOp;
    private AtomicLong _deliveryTag = new AtomicLong(0);
    private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap();
    private final Object _unacknowledgedMessageMapLock = new Object();
    private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap(DEFAULT_PREFETCH);
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private final List<AMQDataBlock> _returns = new LinkedList();
    private long _prefetch_HighWaterMark = 5000;
    private long _prefetch_LowWaterMark = this._prefetch_HighWaterMark / 2;

    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$AckMap.class */
    private class AckMap implements UnacknowledgedMessageMap {
        private AckMap() {
        }

        @Override // org.apache.qpid.server.ack.UnacknowledgedMessageMap
        public void collect(long j, boolean z, List<UnacknowledgedMessage> list) {
            impl().collect(j, z, list);
        }

        @Override // org.apache.qpid.server.ack.UnacknowledgedMessageMap
        public void remove(List<UnacknowledgedMessage> list) {
            impl().remove(list);
        }

        private UnacknowledgedMessageMap impl() {
            return new UnacknowledgedMessageMapImpl(AMQChannel.this._unacknowledgedMessageMapLock, AMQChannel.this._unacknowledgedMessageMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$Cleanup.class */
    public class Cleanup implements TxnOp {
        private final AMQMessage _msg;

        Cleanup(AMQMessage aMQMessage) {
            this._msg = aMQMessage;
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void prepare() throws AMQException {
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void undoPrepare() {
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void commit() {
            try {
                this._msg.decrementReference();
            } catch (AMQException e) {
                AMQChannel._log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
            }
            try {
                this._msg.checkDeliveredToConsumer();
            } catch (NoConsumersException e2) {
                AMQChannel.this._returns.add(e2.getReturnMessage(AMQChannel.this._channelId));
            }
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void rollback() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/AMQChannel$Store.class */
    public class Store implements TxnOp {
        private final AMQMessage _msg;

        Store(AMQMessage aMQMessage) {
            this._msg = aMQMessage;
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void prepare() throws AMQException {
            this._msg.storeMessage();
            this._msg.decrementReference();
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void undoPrepare() {
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void commit() {
        }

        @Override // org.apache.qpid.server.txn.TxnOp
        public void rollback() {
        }
    }

    public AMQChannel(int i, MessageStore messageStore, MessageRouter messageRouter) throws AMQException {
        this._channelId = i;
        this._messageStore = messageStore;
        this._exchanges = messageRouter;
        this._txnBuffer = new TxnBuffer(this._messageStore);
    }

    public int getChannelId() {
        return this._channelId;
    }

    public boolean isTransactional() {
        return this._transactional;
    }

    public void setTransactional(boolean z) {
        this._transactional = z;
    }

    public long getPrefetchCount() {
        return this._prefetch_HighWaterMark;
    }

    public void setPrefetchCount(long j) {
        this._prefetch_HighWaterMark = j;
    }

    public long getPrefetchLowMarkCount() {
        return this._prefetch_LowWaterMark;
    }

    public void setPrefetchLowMarkCount(long j) {
        this._prefetch_LowWaterMark = j;
    }

    public long getPrefetchHighMarkCount() {
        return this._prefetch_HighWaterMark;
    }

    public void setPrefetchHighMarkCount(long j) {
        this._prefetch_HighWaterMark = j;
    }

    public void setPublishFrame(BasicPublishBody basicPublishBody, AMQProtocolSession aMQProtocolSession) throws AMQException {
        this._currentMessage = new AMQMessage(this._messageStore, basicPublishBody);
        this._currentMessage.setPublisher(aMQProtocolSession);
    }

    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        if (contentHeaderBody.bodySize == 0) {
            routeCurrentMessage();
        }
    }

    public void publishContentBody(ContentBody contentBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
        }
        if (this._currentMessage.getContentHeaderBody() == null) {
            throw new AMQException("Received content body without previously receiving a content header");
        }
        this._currentMessage.addContentBodyFrame(contentBody);
        if (this._currentMessage.isAllContentReceived()) {
            routeCurrentMessage();
        }
    }

    protected void routeCurrentMessage() throws AMQException {
        if (!this._transactional) {
            try {
                this._exchanges.routeContent(this._currentMessage);
                this._currentMessage.checkDeliveredToConsumer();
                this._currentMessage.decrementReference();
                this._currentMessage = null;
                return;
            } catch (Throwable th) {
                this._currentMessage.decrementReference();
                this._currentMessage = null;
                throw th;
            }
        }
        if (this._currentMessage.isPersistent()) {
            this._txnBuffer.containsPersistentChanges();
        }
        Store store = new Store(this._currentMessage);
        this._txnBuffer.enlist(store);
        this._currentMessage.setTxnBuffer(this._txnBuffer);
        try {
            try {
                this._exchanges.routeContent(this._currentMessage);
                this._txnBuffer.enlist(new Cleanup(this._currentMessage));
                this._currentMessage = null;
            } catch (RequiredDeliveryException e) {
                this._txnBuffer.cancel(store);
                throw e;
            }
        } catch (Throwable th2) {
            this._currentMessage = null;
            throw th2;
        }
    }

    public long getNextDeliveryTag() {
        return this._deliveryTag.incrementAndGet();
    }

    public int getNextConsumerTag() {
        int i = this._consumerTag + 1;
        this._consumerTag = i;
        return i;
    }

    public String subscribeToQueue(String str, AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession, boolean z) throws AMQException, ConsumerTagNotUniqueException {
        if (str == null) {
            str = "sgen_" + getNextConsumerTag();
        }
        if (this._consumerTag2QueueMap.containsKey(str)) {
            throw new ConsumerTagNotUniqueException();
        }
        aMQQueue.registerProtocolSession(aMQProtocolSession, this._channelId, str, z);
        this._consumerTag2QueueMap.put(str, aMQQueue);
        return str;
    }

    public void unsubscribeConsumer(AMQProtocolSession aMQProtocolSession, String str) throws AMQException {
        AMQQueue remove = this._consumerTag2QueueMap.remove(str);
        if (remove != null) {
            remove.unregisterProtocolSession(aMQProtocolSession, this._channelId, str);
        }
    }

    public void close(AMQProtocolSession aMQProtocolSession) throws AMQException {
        if (this._transactional) {
            synchronized (this._txnBuffer) {
                this._txnBuffer.rollback();
            }
        }
        unsubscribeAllConsumers(aMQProtocolSession);
        requeue();
    }

    private void unsubscribeAllConsumers(AMQProtocolSession aMQProtocolSession) throws AMQException {
        _log.info("Unsubscribing all consumers on channel " + toString());
        for (Map.Entry<String, AMQQueue> entry : this._consumerTag2QueueMap.entrySet()) {
            entry.getValue().unregisterProtocolSession(aMQProtocolSession, this._channelId, entry.getKey());
        }
        this._consumerTag2QueueMap.clear();
    }

    public void addUnacknowledgedMessage(AMQMessage aMQMessage, long j, String str, AMQQueue aMQQueue) {
        synchronized (this._unacknowledgedMessageMapLock) {
            this._unacknowledgedMessageMap.put(Long.valueOf(j), new UnacknowledgedMessage(aMQQueue, aMQMessage, str, j));
            this._lastDeliveryTag = j;
            checkSuspension();
        }
    }

    public void requeue() throws AMQException {
        Map<Long, UnacknowledgedMessage> map;
        synchronized (this._unacknowledgedMessageMapLock) {
            map = this._unacknowledgedMessageMap;
            this._unacknowledgedMessageMap = new LinkedHashMap(DEFAULT_PREFETCH);
        }
        for (UnacknowledgedMessage unacknowledgedMessage : map.values()) {
            if (unacknowledgedMessage.queue != null) {
                unacknowledgedMessage.queue.deliver(unacknowledgedMessage.message);
            }
        }
    }

    public void resend(AMQProtocolSession aMQProtocolSession) {
        synchronized (this._unacknowledgedMessageMapLock) {
            for (Map.Entry<Long, UnacknowledgedMessage> entry : this._unacknowledgedMessageMap.entrySet()) {
                long longValue = entry.getKey().longValue();
                aMQProtocolSession.writeFrame(entry.getValue().message.getDataBlock(this._channelId, entry.getValue().consumerTag, longValue));
            }
        }
    }

    public void queueDeleted(AMQQueue aMQQueue) {
        synchronized (this._unacknowledgedMessageMapLock) {
            Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = this._unacknowledgedMessageMap.entrySet().iterator();
            while (it.hasNext()) {
                UnacknowledgedMessage value = it.next().getValue();
                if (value.queue == aMQQueue) {
                    value.queue = null;
                    try {
                        value.message.decrementReference();
                    } catch (AMQException e) {
                        _log.error("Error decrementing ref count on message " + value.message.getMessageId() + ": " + e, e);
                    }
                }
            }
        }
    }

    public void acknowledgeMessage(long j, boolean z) throws AMQException {
        if (!this._transactional) {
            handleAcknowledgement(j, z);
            return;
        }
        if (!z || j > 0) {
            checkAck(j);
        }
        if (this.ackOp == null) {
            this.ackOp = new TxAck(new AckMap());
            this._txnBuffer.enlist(this.ackOp);
        }
        if (!z || j != 0) {
            this.ackOp.update(j, z);
            return;
        }
        synchronized (this._unacknowledgedMessageMapLock) {
            this.ackOp.update(this._lastDeliveryTag, z);
        }
    }

    private void checkAck(long j) throws AMQException {
        synchronized (this._unacknowledgedMessageMapLock) {
            if (!this._unacknowledgedMessageMap.containsKey(Long.valueOf(j))) {
                throw new AMQException("Ack with delivery tag " + j + " not known for channel");
            }
        }
    }

    private void handleAcknowledgement(long j, boolean z) throws AMQException {
        UnacknowledgedMessage remove;
        if (z) {
            LinkedList linkedList = new LinkedList();
            synchronized (this._unacknowledgedMessageMapLock) {
                if (j == 0) {
                    _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + this._unacknowledgedMessageMap.size());
                    linkedList = new LinkedList(this._unacknowledgedMessageMap.values());
                    this._unacknowledgedMessageMap.clear();
                } else {
                    if (!this._unacknowledgedMessageMap.containsKey(Long.valueOf(j))) {
                        throw new AMQException("Multiple ack on delivery tag " + j + " not known for channel");
                    }
                    Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = this._unacknowledgedMessageMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry<Long, UnacknowledgedMessage> next = it.next();
                        if (next.getKey().longValue() > j) {
                            throw new AMQException("UnacknowledgedMessageMap is out of order:" + next.getKey() + " When deliveryTag is:" + j + "ES:" + this._unacknowledgedMessageMap.entrySet().toString());
                        }
                        it.remove();
                        linkedList.add(next.getValue());
                        if (next.getKey().longValue() == j) {
                            break;
                        }
                    }
                }
            }
            if (_log.isTraceEnabled()) {
                _log.trace("Received multiple ack for delivery tag " + j + ". Removing " + linkedList.size() + " items.");
            }
            Iterator it2 = linkedList.iterator();
            while (it2.hasNext()) {
                ((UnacknowledgedMessage) it2.next()).discard();
            }
        } else {
            synchronized (this._unacknowledgedMessageMapLock) {
                remove = this._unacknowledgedMessageMap.remove(Long.valueOf(j));
            }
            if (remove == null) {
                _log.trace("Single ack on delivery tag " + j + " not known for channel:" + this._channelId);
                throw new AMQException("Single ack on delivery tag " + j + " not known for channel:" + this._channelId);
            }
            remove.discard();
            if (_log.isTraceEnabled()) {
                _log.trace("Received non-multiple ack for messaging with delivery tag " + j);
            }
        }
        checkSuspension();
    }

    public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    private void checkSuspension() {
        boolean z;
        synchronized (this._unacknowledgedMessageMapLock) {
            z = ((long) this._unacknowledgedMessageMap.size()) >= this._prefetch_HighWaterMark;
        }
        setSuspended(z);
    }

    public void setSuspended(boolean z) {
        if (this._suspended.get() && !z) {
            synchronized (this._unacknowledgedMessageMapLock) {
                z = ((long) this._unacknowledgedMessageMap.size()) > this._prefetch_LowWaterMark;
            }
        }
        boolean andSet = this._suspended.getAndSet(z);
        if (andSet != z) {
            if (!andSet) {
                _log.debug("Suspending channel " + this);
                return;
            }
            _log.debug("Unsuspending channel " + this);
            Iterator<AMQQueue> it = this._consumerTag2QueueMap.values().iterator();
            while (it.hasNext()) {
                it.next().deliverAsync();
            }
        }
    }

    public boolean isSuspended() {
        return this._suspended.get();
    }

    public void commit() throws AMQException {
        if (this.ackOp != null) {
            this.ackOp.consolidate();
            if (this.ackOp.checkPersistent()) {
                this._txnBuffer.containsPersistentChanges();
            }
            this.ackOp = null;
        }
        this._txnBuffer.commit();
    }

    public void rollback() throws AMQException {
        synchronized (this._txnBuffer) {
            this._txnBuffer.rollback();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder(30);
        sb.append("Channel: id ").append(this._channelId).append(", transaction mode: ").append(this._transactional);
        sb.append(", prefetch marks: ").append(this._prefetch_LowWaterMark);
        sb.append("/").append(this._prefetch_HighWaterMark);
        return sb.toString();
    }

    public void setDefaultQueue(AMQQueue aMQQueue) {
        this._defaultQueue = aMQQueue;
    }

    public AMQQueue getDefaultQueue() {
        return this._defaultQueue;
    }

    public void processReturns(AMQProtocolSession aMQProtocolSession) {
        Iterator<AMQDataBlock> it = this._returns.iterator();
        while (it.hasNext()) {
            aMQProtocolSession.writeFrame(it.next());
        }
        this._returns.clear();
    }
}
