package org.apache.activemq.broker.region.cursors;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-broker-5.11.0.redhat-6-2-0-SNAPSHOT.jar:org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.class */
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StoreDurableSubscriberCursor.class);
    private final String clientId;
    private final String subscriberName;
    private final Map<Destination, TopicStorePrefetch> topics;
    private final List<PendingMessageCursor> storePrefetches;
    private final PendingMessageCursor nonPersistent;
    private PendingMessageCursor currentCursor;
    private final DurableTopicSubscription subscription;
    private boolean immediatePriorityDispatch;

    public StoreDurableSubscriberCursor(Broker broker, String str, String str2, int i, DurableTopicSubscription durableTopicSubscription) {
        super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, durableTopicSubscription));
        this.topics = new HashMap();
        this.storePrefetches = new CopyOnWriteArrayList();
        this.immediatePriorityDispatch = true;
        this.subscription = durableTopicSubscription;
        this.clientId = str;
        this.subscriberName = str2;
        if (broker.getBrokerService().isPersistent()) {
            this.nonPersistent = new FilePendingMessageCursor(broker, str + str2, this.prioritizedMessages);
        } else {
            this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
        }
        this.nonPersistent.setMaxBatchSize(i);
        this.nonPersistent.setSystemUsage(this.systemUsage);
        this.storePrefetches.add(this.nonPersistent);
        if (this.prioritizedMessages) {
            setMaxAuditDepth(10 * getMaxAuditDepth());
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public synchronized void start() throws Exception {
        if (isStarted()) {
            return;
        }
        super.start();
        for (PendingMessageCursor pendingMessageCursor : this.storePrefetches) {
            pendingMessageCursor.setMessageAudit(getMessageAudit());
            pendingMessageCursor.start();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public synchronized void stop() throws Exception {
        if (isStarted()) {
            if (this.subscription.isKeepDurableSubsActive()) {
                super.gc();
                Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
                while (it.hasNext()) {
                    it.next().gc();
                }
                return;
            }
            super.stop();
            Iterator<PendingMessageCursor> it2 = this.storePrefetches.iterator();
            while (it2.hasNext()) {
                it2.next().stop();
            }
            getMessageAudit().clear();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void add(ConnectionContext connectionContext, Destination destination) throws Exception {
        if (destination == null || AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
            return;
        }
        TopicStorePrefetch topicStorePrefetch = new TopicStorePrefetch(this.subscription, (Topic) destination, this.clientId, this.subscriberName);
        topicStorePrefetch.setMaxBatchSize(destination.getMaxPageSize());
        topicStorePrefetch.setSystemUsage(this.systemUsage);
        topicStorePrefetch.setMessageAudit(getMessageAudit());
        topicStorePrefetch.setEnableAudit(isEnableAudit());
        topicStorePrefetch.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
        topicStorePrefetch.setUseCache(isUseCache());
        topicStorePrefetch.setCacheEnabled(isUseCache() && topicStorePrefetch.isEmpty());
        this.topics.put(destination, topicStorePrefetch);
        this.storePrefetches.add(topicStorePrefetch);
        if (isStarted()) {
            topicStorePrefetch.start();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized List<MessageReference> remove(ConnectionContext connectionContext, Destination destination) throws Exception {
        TopicStorePrefetch remove = this.topics.remove(destination);
        if (remove != null) {
            this.storePrefetches.remove(remove);
        }
        return Collections.EMPTY_LIST;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean isEmpty() {
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            if (!it.next().isEmpty()) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean isEmpty(Destination destination) {
        boolean z = true;
        TopicStorePrefetch topicStorePrefetch = this.topics.get(destination);
        if (topicStorePrefetch != null) {
            z = topicStorePrefetch.isEmpty();
        }
        return z;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public boolean isRecoveryRequired() {
        return false;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean addMessageLast(MessageReference messageReference) throws Exception {
        TopicStorePrefetch topicStorePrefetch;
        if (messageReference == null) {
            return true;
        }
        Message message = messageReference.getMessage();
        if (isStarted() && !message.isPersistent()) {
            this.nonPersistent.addMessageLast(messageReference);
        }
        if (!message.isPersistent() || (topicStorePrefetch = this.topics.get((Destination) message.getRegionDestination())) == null) {
            return true;
        }
        topicStorePrefetch.addMessageLast(messageReference);
        if (!this.prioritizedMessages || !this.immediatePriorityDispatch || !topicStorePrefetch.isPaging() || message.getPriority() <= topicStorePrefetch.getLastRecoveredPriority()) {
            return true;
        }
        topicStorePrefetch.recoverMessage(messageReference.getMessage(), true);
        LOG.trace("cached high priority ({} message: {}, current paged batch priority: {}, cache size: {}", Byte.valueOf(message.getPriority()), message.getMessageId(), Byte.valueOf(topicStorePrefetch.getLastRecoveredPriority()), Integer.valueOf(topicStorePrefetch.batchList.size()));
        return true;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public boolean isTransient() {
        return this.subscription.isKeepDurableSubsActive();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void addMessageFirst(MessageReference messageReference) throws Exception {
        if (messageReference != null) {
            Message message = messageReference.getMessage();
            if (!message.isPersistent()) {
                this.nonPersistent.addMessageFirst(messageReference);
                return;
            }
            TopicStorePrefetch topicStorePrefetch = this.topics.get((Destination) message.getRegionDestination());
            if (topicStorePrefetch != null) {
                topicStorePrefetch.addMessageFirst(messageReference);
            }
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void addRecoveredMessage(MessageReference messageReference) throws Exception {
        this.nonPersistent.addMessageLast(messageReference);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void clear() {
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean hasNext() {
        boolean z = true;
        if (1 != 0) {
            try {
                this.currentCursor = getNextCursor();
                z = this.currentCursor != null ? this.currentCursor.hasNext() : false;
            } catch (Exception e) {
                LOG.error("Failed to get current cursor ", (Throwable) e);
                throw new RuntimeException(e);
            }
        }
        return z;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized MessageReference next() {
        return this.currentCursor != null ? this.currentCursor.next() : null;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void remove() {
        if (this.currentCursor != null) {
            this.currentCursor.remove();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void remove(MessageReference messageReference) {
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().remove(messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void reset() {
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().reset();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void release() {
        this.currentCursor = null;
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().release();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized int size() {
        int i = 0;
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setMaxBatchSize(int i) {
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setMaxBatchSize(i);
        }
        super.setMaxBatchSize(i);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void gc() {
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().gc();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setSystemUsage(SystemUsage systemUsage) {
        super.setSystemUsage(systemUsage);
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setSystemUsage(systemUsage);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setMemoryUsageHighWaterMark(int i) {
        super.setMemoryUsageHighWaterMark(i);
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setMemoryUsageHighWaterMark(i);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setMaxProducersToAudit(int i) {
        super.setMaxProducersToAudit(i);
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setMaxProducersToAudit(i);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setMaxAuditDepth(int i) {
        super.setMaxAuditDepth(i);
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setMaxAuditDepth(i);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setEnableAudit(boolean z) {
        super.setEnableAudit(z);
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setEnableAudit(z);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setUseCache(boolean z) {
        super.setUseCache(z);
        Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
        while (it.hasNext()) {
            it.next().setUseCache(z);
        }
    }

    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (this.currentCursor == null || this.currentCursor.isEmpty()) {
            this.currentCursor = null;
            Iterator<PendingMessageCursor> it = this.storePrefetches.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                PendingMessageCursor next = it.next();
                if (next.hasNext()) {
                    this.currentCursor = next;
                    break;
                }
            }
            if (this.storePrefetches.size() > 1) {
                this.storePrefetches.add(this.storePrefetches.remove(0));
            }
        }
        return this.currentCursor;
    }

    public String toString() {
        return "StoreDurableSubscriber(" + this.clientId + ":" + this.subscriberName + ")";
    }

    public boolean isImmediatePriorityDispatch() {
        return this.immediatePriorityDispatch;
    }

    public void setImmediatePriorityDispatch(boolean z) {
        this.immediatePriorityDispatch = z;
    }
}
