package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-329-09.zip:modules/system/layers/fuse/org/apache/activemq/main/activemq-broker-5.11.0.redhat-630329-09.jar:org/apache/activemq/broker/region/Topic.class */
public class Topic extends BaseDestination implements Task {
    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
    private final TopicMessageStore topicStore;
    protected final CopyOnWriteArrayList<Subscription> consumers;
    private final ReentrantReadWriteLock dispatchLock;
    private DispatchPolicy dispatchPolicy;
    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers;
    private final TaskRunner taskRunner;
    private final LinkedList<Runnable> messagesWaitingForSpace;
    private final Runnable sendMessagesWaitingForSpaceTask;
    private final Runnable expireMessagesTask;

    public Topic(BrokerService brokerService, ActiveMQDestination activeMQDestination, TopicMessageStore topicMessageStore, DestinationStatistics destinationStatistics, TaskRunnerFactory taskRunnerFactory) throws Exception {
        super(brokerService, topicMessageStore, activeMQDestination, destinationStatistics);
        this.consumers = new CopyOnWriteArrayList<>();
        this.dispatchLock = new ReentrantReadWriteLock();
        this.dispatchPolicy = new SimpleDispatchPolicy();
        this.durableSubscribers = new ConcurrentHashMap();
        this.messagesWaitingForSpace = new LinkedList<>();
        this.sendMessagesWaitingForSpaceTask = new Runnable() { // from class: org.apache.activemq.broker.region.Topic.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Topic.this.taskRunner.wakeup();
                } catch (InterruptedException e) {
                }
            }
        };
        this.expireMessagesTask = new Runnable() { // from class: org.apache.activemq.broker.region.Topic.6
            @Override // java.lang.Runnable
            public void run() {
                Topic.this.doBrowse(new InsertionCountList(), Topic.this.getMaxExpirePageSize());
            }
        };
        this.topicStore = topicMessageStore;
        this.subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
        this.taskRunner = taskRunnerFactory.createTaskRunner(this, "Topic  " + activeMQDestination.getPhysicalName());
    }

    @Override // org.apache.activemq.broker.region.BaseDestination
    public void initialize() throws Exception {
        super.initialize();
        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(this.destination)) {
            this.subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
            setAlwaysRetroactive(true);
        }
        if (this.store != null) {
        }
    }

    @Override // org.apache.activemq.broker.region.BaseDestination, org.apache.activemq.broker.region.Destination
    public List<Subscription> getConsumers() {
        ArrayList arrayList;
        synchronized (this.consumers) {
            arrayList = new ArrayList(this.consumers);
        }
        return arrayList;
    }

    public boolean lock(MessageReference messageReference, LockOwner lockOwner) {
        return true;
    }

    @Override // org.apache.activemq.broker.region.BaseDestination, org.apache.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        if (subscription.getConsumerInfo().isDurable()) {
            DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) subscription;
            super.addSubscription(connectionContext, subscription);
            subscription.add(connectionContext, this);
            if (durableTopicSubscription.isActive()) {
                synchronized (this.consumers) {
                    boolean z = false;
                    if (this.consumers.size() != 0) {
                        Iterator<Subscription> it = this.consumers.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            Subscription next = it.next();
                            if (next.getConsumerInfo().isDurable() && ((DurableTopicSubscription) next).getSubscriptionKey().equals(durableTopicSubscription.getSubscriptionKey())) {
                                z = true;
                                break;
                            }
                        }
                    } else {
                        z = false;
                    }
                    if (!z) {
                        this.consumers.add(subscription);
                    }
                }
            }
            this.durableSubscribers.put(durableTopicSubscription.getSubscriptionKey(), durableTopicSubscription);
            return;
        }
        if (!subscription.getConsumerInfo().isRetroactive() && !isAlwaysRetroactive()) {
            synchronized (this.consumers) {
                if (!this.consumers.contains(subscription)) {
                    subscription.add(connectionContext, this);
                    this.consumers.add(subscription);
                    super.addSubscription(connectionContext, subscription);
                }
            }
            return;
        }
        this.dispatchLock.writeLock().lock();
        try {
            boolean z2 = false;
            synchronized (this.consumers) {
                if (!this.consumers.contains(subscription)) {
                    subscription.add(connectionContext, this);
                    this.consumers.add(subscription);
                    z2 = true;
                    super.addSubscription(connectionContext, subscription);
                }
            }
            if (z2) {
                this.subscriptionRecoveryPolicy.recover(connectionContext, this, subscription);
            }
        } finally {
            this.dispatchLock.writeLock().unlock();
        }
    }

    @Override // org.apache.activemq.broker.region.BaseDestination, org.apache.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription, long j) throws Exception {
        boolean remove;
        if (!subscription.getConsumerInfo().isDurable()) {
            synchronized (this.consumers) {
                remove = this.consumers.remove(subscription);
            }
            if (remove) {
                super.removeSubscription(connectionContext, subscription, j);
            }
        }
        subscription.remove(connectionContext, this);
    }

    public void deleteSubscription(ConnectionContext connectionContext, SubscriptionKey subscriptionKey) throws Exception {
        if (this.topicStore != null) {
            this.topicStore.deleteSubscription(subscriptionKey.clientId, subscriptionKey.subscriptionName);
            DurableTopicSubscription remove = this.durableSubscribers.remove(subscriptionKey);
            if (remove != null) {
                this.destinationStatistics.getConsumers().decrement();
                remove.deactivate(false, 0L);
                this.consumers.remove(remove);
            }
        }
    }

    public void activate(ConnectionContext connectionContext, final DurableTopicSubscription durableTopicSubscription) throws Exception {
        this.dispatchLock.writeLock().lock();
        try {
            if (this.topicStore == null) {
                return;
            }
            String clientId = durableTopicSubscription.getSubscriptionKey().getClientId();
            String subscriptionName = durableTopicSubscription.getSubscriptionKey().getSubscriptionName();
            String selector = durableTopicSubscription.getConsumerInfo().getSelector();
            SubscriptionInfo lookupSubscription = this.topicStore.lookupSubscription(clientId, subscriptionName);
            if (lookupSubscription != null) {
                String selector2 = lookupSubscription.getSelector();
                if (((selector2 == null) ^ (selector == null)) || !(selector2 == null || selector2.equals(selector))) {
                    this.topicStore.deleteSubscription(clientId, subscriptionName);
                    lookupSubscription = null;
                    synchronized (this.consumers) {
                        this.consumers.remove(durableTopicSubscription);
                    }
                } else {
                    synchronized (this.consumers) {
                        if (!this.consumers.contains(durableTopicSubscription)) {
                            this.consumers.add(durableTopicSubscription);
                        }
                    }
                }
            }
            if (lookupSubscription == null) {
                SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
                subscriptionInfo.setClientId(clientId);
                subscriptionInfo.setSelector(selector);
                subscriptionInfo.setSubscriptionName(subscriptionName);
                subscriptionInfo.setDestination(getActiveMQDestination());
                subscriptionInfo.setSubscribedDestination(durableTopicSubscription.getConsumerInfo().getDestination());
                synchronized (this.consumers) {
                    this.consumers.add(durableTopicSubscription);
                    this.topicStore.addSubscription(subscriptionInfo, durableTopicSubscription.getConsumerInfo().isRetroactive());
                }
            }
            final NonCachedMessageEvaluationContext nonCachedMessageEvaluationContext = new NonCachedMessageEvaluationContext();
            nonCachedMessageEvaluationContext.setDestination(this.destination);
            if (durableTopicSubscription.isRecoveryRequired()) {
                this.topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { // from class: org.apache.activemq.broker.region.Topic.2
                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean recoverMessage(Message message) throws Exception {
                        message.setRegionDestination(Topic.this);
                        try {
                            nonCachedMessageEvaluationContext.setMessageReference(message);
                            if (durableTopicSubscription.matches(message, nonCachedMessageEvaluationContext)) {
                                durableTopicSubscription.add(message);
                            }
                            return true;
                        } catch (IOException e) {
                            Topic.LOG.error("Failed to recover this message {}", message, e);
                            return true;
                        }
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean recoverMessageReference(MessageId messageId) throws Exception {
                        throw new RuntimeException("Should not be called.");
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean hasSpace() {
                        return true;
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean isDuplicate(MessageId messageId) {
                        return false;
                    }
                });
            }
            this.dispatchLock.writeLock().unlock();
        } finally {
            this.dispatchLock.writeLock().unlock();
        }
    }

    public void deactivate(ConnectionContext connectionContext, DurableTopicSubscription durableTopicSubscription, List<MessageReference> list) throws Exception {
        synchronized (this.consumers) {
            this.consumers.remove(durableTopicSubscription);
        }
        durableTopicSubscription.remove(connectionContext, this, list);
    }

    public void recoverRetroactiveMessages(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        if (subscription.getConsumerInfo().isRetroactive()) {
            this.subscriptionRecoveryPolicy.recover(connectionContext, this, subscription);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void send(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws Exception {
        final ConnectionContext connectionContext = producerBrokerExchange.getConnectionContext();
        final ProducerInfo info = producerBrokerExchange.getProducerState().getInfo();
        producerBrokerExchange.incrementSend();
        final boolean z = (message.isResponseRequired() || info.getWindowSize() <= 0 || connectionContext.isInRecoveryMode()) ? false : true;
        message.setRegionDestination(this);
        if (message.isExpired()) {
            this.broker.messageExpired(connectionContext, message, null);
            getDestinationStatistics().getExpired().increment();
            if (z) {
                connectionContext.getConnection().dispatchAsync(new ProducerAck(info.getProducerId(), message.getSize()));
                return;
            }
            return;
        }
        if (this.memoryUsage.isFull()) {
            isFull(connectionContext, this.memoryUsage);
            fastProducer(connectionContext, info);
            if (isProducerFlowControl() && connectionContext.isProducerFlowControl()) {
                if (isFlowControlLogRequired()) {
                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", getActiveMQDestination().getQualifiedName(), Long.valueOf(this.memoryUsage.getLimit()));
                }
                if (!connectionContext.isNetworkConnection() && this.systemUsage.isSendFailIfNoSpace()) {
                    throw new ResourceAllocationException("Usage Manager memory limit (" + this.memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + ". See http://activemq.apache.org/producer-flow-control.html for more info");
                }
                if (info.getWindowSize() > 0 || message.isResponseRequired()) {
                    synchronized (this.messagesWaitingForSpace) {
                        this.messagesWaitingForSpace.add(new Runnable() { // from class: org.apache.activemq.broker.region.Topic.3
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    if (message.isExpired()) {
                                        Topic.this.broker.messageExpired(connectionContext, message, null);
                                        Topic.this.getDestinationStatistics().getExpired().increment();
                                    } else {
                                        Topic.this.doMessageSend(producerBrokerExchange, message);
                                    }
                                    if (z) {
                                        connectionContext.getConnection().dispatchAsync(new ProducerAck(info.getProducerId(), message.getSize()));
                                    } else {
                                        Response response = new Response();
                                        response.setCorrelationId(message.getCommandId());
                                        connectionContext.getConnection().dispatchAsync(response);
                                    }
                                } catch (Exception e) {
                                    if (z || connectionContext.isInRecoveryMode()) {
                                        return;
                                    }
                                    ExceptionResponse exceptionResponse = new ExceptionResponse(e);
                                    exceptionResponse.setCorrelationId(message.getCommandId());
                                    connectionContext.getConnection().dispatchAsync(exceptionResponse);
                                }
                            }
                        });
                        registerCallbackForNotFullNotification();
                        connectionContext.setDontSendReponse(true);
                    }
                    return;
                }
                if (this.memoryUsage.isFull()) {
                    if (connectionContext.isInTransaction()) {
                        int i = 0;
                        while (!this.memoryUsage.waitForSpace(1000L)) {
                            if (connectionContext.getStopping().get()) {
                                throw new IOException("Connection closed, send aborted.");
                            }
                            if (i > 2 && connectionContext.isInTransaction()) {
                                i = 0;
                                LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", Integer.valueOf(connectionContext.getTransaction().size()), message);
                            }
                            i++;
                        }
                    } else {
                        waitForSpace(connectionContext, producerBrokerExchange, this.memoryUsage, "Usage Manager Memory Usage limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + ". See http://activemq.apache.org/producer-flow-control.html for more info");
                    }
                }
                if (message.isExpired()) {
                    getDestinationStatistics().getExpired().increment();
                    LOG.debug("Expired message: {}", message);
                    return;
                }
            }
        }
        doMessageSend(producerBrokerExchange, message);
        messageDelivered(connectionContext, message);
        if (z) {
            connectionContext.getConnection().dispatchAsync(new ProducerAck(info.getProducerId(), message.getSize()));
        }
    }

    synchronized void doMessageSend(ProducerBrokerExchange producerBrokerExchange, final Message message) throws IOException, Exception {
        final ConnectionContext connectionContext = producerBrokerExchange.getConnectionContext();
        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
        ListenableFuture<Object> listenableFuture = null;
        if (this.topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
            if (this.systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
                String str = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " + this.systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + ". See http://activemq.apache.org/producer-flow-control.html for more info";
                if (!connectionContext.isNetworkConnection() && this.systemUsage.isSendFailIfNoSpace()) {
                    throw new ResourceAllocationException(str);
                }
                waitForSpace(connectionContext, producerBrokerExchange, this.systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), str);
            }
            listenableFuture = this.topicStore.asyncAddTopicMessage(connectionContext, message, isOptimizeStorage());
        }
        message.incrementReferenceCount();
        if (connectionContext.isInTransaction()) {
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.broker.region.Topic.4
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (message.isExpired()) {
                        if (Topic.this.broker.isExpired(message)) {
                            Topic.this.getDestinationStatistics().getExpired().increment();
                            Topic.this.broker.messageExpired(connectionContext, message, null);
                        }
                    } else {
                        try {
                            Topic.this.dispatch(connectionContext, message);
                            message.decrementReferenceCount();
                        } finally {
                            message.decrementReferenceCount();
                        }
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    message.decrementReferenceCount();
                }
            });
        } else {
            try {
                dispatch(connectionContext, message);
                message.decrementReferenceCount();
            } catch (Throwable th) {
                message.decrementReferenceCount();
                throw th;
            }
        }
        if (listenableFuture == null || listenableFuture.isCancelled()) {
            return;
        }
        try {
            listenableFuture.get();
        } catch (CancellationException e) {
        }
    }

    private boolean canOptimizeOutPersistence() {
        return this.durableSubscribers.size() == 0;
    }

    public String toString() {
        return "Topic: destination=" + this.destination.getPhysicalName() + ", subscriptions=" + this.consumers.size();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void acknowledge(ConnectionContext connectionContext, Subscription subscription, MessageAck messageAck, MessageReference messageReference) throws IOException {
        if (this.topicStore != null && messageReference.isPersistent()) {
            SubscriptionKey subscriptionKey = ((DurableTopicSubscription) subscription).getSubscriptionKey();
            this.topicStore.acknowledge(connectionContext, subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName(), messageReference.getMessageId(), convertToNonRangedAck(messageAck, messageReference));
        }
        messageConsumed(connectionContext, messageReference);
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void gc() {
    }

    public Message loadMessage(MessageId messageId) throws IOException {
        if (this.topicStore != null) {
            return this.topicStore.getMessage(messageId);
        }
        return null;
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        this.subscriptionRecoveryPolicy.start();
        if (this.memoryUsage != null) {
            this.memoryUsage.start();
        }
        if (getExpireMessagesPeriod() <= 0 || AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
            return;
        }
        this.scheduler.executePeriodically(this.expireMessagesTask, getExpireMessagesPeriod());
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        if (this.taskRunner != null) {
            this.taskRunner.shutdown();
        }
        this.subscriptionRecoveryPolicy.stop();
        if (this.memoryUsage != null) {
            this.memoryUsage.stop();
        }
        if (this.topicStore != null) {
            this.topicStore.stop();
        }
        this.scheduler.cancel(this.expireMessagesTask);
    }

    @Override // org.apache.activemq.broker.region.Destination
    public Message[] browse() {
        ArrayList arrayList = new ArrayList();
        doBrowse(arrayList, getMaxBrowsePageSize());
        return (Message[]) arrayList.toArray(new Message[arrayList.size()]);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doBrowse(final List<Message> list, final int i) {
        try {
            if (this.topicStore != null) {
                final ArrayList<Message> arrayList = new ArrayList();
                this.topicStore.recover(new MessageRecoveryListener() { // from class: org.apache.activemq.broker.region.Topic.5
                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean recoverMessage(Message message) throws Exception {
                        if (message.isExpired()) {
                            arrayList.add(message);
                        }
                        list.add(message);
                        return true;
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean recoverMessageReference(MessageId messageId) throws Exception {
                        return true;
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean hasSpace() {
                        return list.size() < i;
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public boolean isDuplicate(MessageId messageId) {
                        return false;
                    }
                });
                ConnectionContext createConnectionContext = createConnectionContext();
                for (Message message : arrayList) {
                    for (DurableTopicSubscription durableTopicSubscription : this.durableSubscribers.values()) {
                        if (!durableTopicSubscription.isActive()) {
                            message.setRegionDestination(this);
                            messageExpired(createConnectionContext, durableTopicSubscription, message);
                        }
                    }
                }
                Message[] browse = this.subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                if (browse != null) {
                    for (int i2 = 0; i2 < browse.length && list.size() < i; i2++) {
                        list.add(browse[i2]);
                    }
                }
            }
        } catch (Throwable th) {
            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), th);
        }
    }

    @Override // org.apache.activemq.thread.Task
    public boolean iterate() {
        synchronized (this.messagesWaitingForSpace) {
            while (!this.memoryUsage.isFull() && !this.messagesWaitingForSpace.isEmpty()) {
                this.messagesWaitingForSpace.removeFirst().run();
            }
            if (!this.messagesWaitingForSpace.isEmpty()) {
                registerCallbackForNotFullNotification();
            }
        }
        return false;
    }

    private void registerCallbackForNotFullNotification() {
        if (this.memoryUsage.notifyCallbackWhenNotFull(this.sendMessagesWaitingForSpaceTask)) {
            return;
        }
        this.sendMessagesWaitingForSpaceTask.run();
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
        return this.subscriptionRecoveryPolicy;
    }

    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
        if (this.subscriptionRecoveryPolicy == null || !(this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy)) {
            this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
        } else {
            ((RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy).setWrapped(subscriptionRecoveryPolicy);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public final void wakeup() {
    }

    protected void dispatch(ConnectionContext connectionContext, Message message) throws Exception {
        this.destinationStatistics.getEnqueues().increment();
        this.destinationStatistics.getMessageSize().addSize(message.getSize());
        MessageEvaluationContext messageEvaluationContext = null;
        this.dispatchLock.readLock().lock();
        try {
            if (!this.subscriptionRecoveryPolicy.add(connectionContext, message)) {
                if (messageEvaluationContext != null) {
                    return;
                } else {
                    return;
                }
            }
            synchronized (this.consumers) {
                if (this.consumers.isEmpty()) {
                    onMessageWithNoConsumers(connectionContext, message);
                    this.dispatchLock.readLock().unlock();
                    if (0 != 0) {
                        messageEvaluationContext.clear();
                        return;
                    }
                    return;
                }
                MessageEvaluationContext messageEvaluationContext2 = connectionContext.getMessageEvaluationContext();
                messageEvaluationContext2.setDestination(this.destination);
                messageEvaluationContext2.setMessageReference(message);
                if (!this.dispatchPolicy.dispatch(message, messageEvaluationContext2, this.consumers)) {
                    onMessageWithNoConsumers(connectionContext, message);
                }
                this.dispatchLock.readLock().unlock();
                if (messageEvaluationContext2 != null) {
                    messageEvaluationContext2.clear();
                }
            }
        } finally {
            this.dispatchLock.readLock().unlock();
            if (0 != 0) {
                messageEvaluationContext.clear();
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void messageExpired(ConnectionContext connectionContext, Subscription subscription, MessageReference messageReference) {
        this.broker.messageExpired(connectionContext, messageReference, subscription);
        this.destinationStatistics.getExpired().increment();
        MessageAck messageAck = new MessageAck();
        messageAck.setAckType((byte) 2);
        messageAck.setDestination(this.destination);
        messageAck.setMessageID(messageReference.getMessageId());
        try {
            if (subscription instanceof DurableTopicSubscription) {
                ((DurableTopicSubscription) subscription).removePending(messageReference);
            }
            acknowledge(connectionContext, subscription, messageAck, messageReference);
        } catch (Exception e) {
            LOG.error("Failed to remove expired Message from the store ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.region.BaseDestination
    public Logger getLog() {
        return LOG;
    }

    protected boolean isOptimizeStorage() {
        boolean z = false;
        if (isDoOptimzeMessageStorage() && !this.durableSubscribers.isEmpty()) {
            z = true;
            Iterator<DurableTopicSubscription> it = this.durableSubscribers.values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                DurableTopicSubscription next = it.next();
                if (!next.isActive()) {
                    z = false;
                    break;
                }
                if (next.getPrefetchSize() == 0) {
                    z = false;
                    break;
                }
                if (next.isSlowConsumer()) {
                    z = false;
                    break;
                }
                if (next.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()) {
                    z = false;
                    break;
                }
            }
        }
        return z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void clearPendingMessages() {
        this.dispatchLock.readLock().lock();
        try {
            Iterator<DurableTopicSubscription> it = this.durableSubscribers.values().iterator();
            while (it.hasNext()) {
                clearPendingAndDispatch(it.next());
            }
        } finally {
            this.dispatchLock.readLock().unlock();
        }
    }

    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
        synchronized (durableTopicSubscription.pendingLock) {
            durableTopicSubscription.pending.clear();
            try {
                durableTopicSubscription.dispatchPending();
            } catch (IOException e) {
                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{durableTopicSubscription, this.destination, durableTopicSubscription.pending}, e);
            }
        }
    }

    private void rollback(MessageId messageId) {
        this.dispatchLock.readLock().lock();
        try {
            Iterator<DurableTopicSubscription> it = this.durableSubscribers.values().iterator();
            while (it.hasNext()) {
                it.next().getPending().rollback(messageId);
            }
        } finally {
            this.dispatchLock.readLock().unlock();
        }
    }

    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
        return this.durableSubscribers;
    }
}
