package org.apache.activemq.store.amq;

import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-cluster-SNAPSHOT.jar:org/apache/activemq/store/amq/AMQTopicMessageStore.class */
public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
    private static final Logger LOG = LoggerFactory.getLogger(AMQTopicMessageStore.class);
    private TopicReferenceStore topicReferenceStore;

    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-cluster-SNAPSHOT.jar:org/apache/activemq/store/amq/AMQTopicMessageStore$MessageCounter.class */
    private class MessageCounter implements MessageRecoveryListener {
        int count = 0;
        SubscriptionInfo info;
        BooleanExpression selectorExpression;
        TopicMessageStore store;

        public MessageCounter(SubscriptionInfo subscriptionInfo, TopicMessageStore topicMessageStore) throws Exception {
            String selector;
            this.info = subscriptionInfo;
            if (subscriptionInfo != null && (selector = subscriptionInfo.getSelector()) != null) {
                this.selectorExpression = SelectorParser.parse(selector);
            }
            this.store = topicMessageStore;
        }

        @Override // org.apache.activemq.store.MessageRecoveryListener
        public boolean recoverMessageReference(MessageId messageId) throws Exception {
            if (this.selectorExpression == null) {
                this.count++;
                return true;
            }
            MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
            messageEvaluationContext.setMessageReference(this.store.getMessage(messageId));
            if (!this.selectorExpression.matches(messageEvaluationContext)) {
                return true;
            }
            this.count++;
            return true;
        }

        @Override // org.apache.activemq.store.MessageRecoveryListener
        public boolean recoverMessage(Message message) throws Exception {
            if (this.selectorExpression == null) {
                this.count++;
                return true;
            }
            MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
            messageEvaluationContext.setMessageReference(this.store.getMessage(message.getMessageId()));
            if (!this.selectorExpression.matches(messageEvaluationContext)) {
                return true;
            }
            this.count++;
            return true;
        }

        @Override // org.apache.activemq.store.MessageRecoveryListener
        public boolean isDuplicate(MessageId messageId) {
            return false;
        }

        @Override // org.apache.activemq.store.MessageRecoveryListener
        public boolean hasSpace() {
            return true;
        }
    }

    public AMQTopicMessageStore(AMQPersistenceAdapter aMQPersistenceAdapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic activeMQTopic) {
        super(aMQPersistenceAdapter, topicReferenceStore, activeMQTopic);
        this.topicReferenceStore = topicReferenceStore;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        flush();
        this.topicReferenceStore.recoverSubscription(str, str2, new RecoveryListenerAdapter(this, messageRecoveryListener));
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverNextMessages(String str, String str2, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        RecoveryListenerAdapter recoveryListenerAdapter = new RecoveryListenerAdapter(this, messageRecoveryListener);
        this.topicReferenceStore.recoverNextMessages(str, str2, i, recoveryListenerAdapter);
        if (recoveryListenerAdapter.size() == 0) {
            flush();
            this.topicReferenceStore.recoverNextMessages(str, str2, i, recoveryListenerAdapter);
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return this.topicReferenceStore.lookupSubscription(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
        this.peristenceAdapter.writeCommand(subscriptionInfo, false);
        this.topicReferenceStore.addSubsciption(subscriptionInfo, z);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void acknowledge(final ConnectionContext connectionContext, final String str, final String str2, final MessageId messageId, MessageAck messageAck) throws IOException {
        final boolean isDebugEnabled = LOG.isDebugEnabled();
        JournalTopicAck journalTopicAck = new JournalTopicAck();
        journalTopicAck.setDestination(this.destination);
        journalTopicAck.setMessageId(messageId);
        journalTopicAck.setMessageSequenceId(messageId.getBrokerSequenceId());
        journalTopicAck.setSubscritionName(str2);
        journalTopicAck.setClientId(str);
        journalTopicAck.setTransactionId(connectionContext.getTransaction() != null ? connectionContext.getTransaction().getTransactionId() : null);
        final Location writeCommand = this.peristenceAdapter.writeCommand(journalTopicAck, false);
        new SubscriptionKey(str, str2);
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + writeCommand);
            }
            acknowledge(connectionContext, messageId, writeCommand, str, str2);
            return;
        }
        if (isDebugEnabled) {
            LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + writeCommand);
        }
        this.lock.lock();
        try {
            this.inFlightTxLocations.add(writeCommand);
            this.lock.unlock();
            this.transactionStore.acknowledge(this, journalTopicAck, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.store.amq.AMQTopicMessageStore.1
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (isDebugEnabled) {
                        AMQTopicMessageStore.LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + writeCommand);
                    }
                    AMQTopicMessageStore.this.lock.lock();
                    try {
                        AMQTopicMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        AMQTopicMessageStore.this.acknowledge(connectionContext, messageId, writeCommand, str, str2);
                        AMQTopicMessageStore.this.lock.unlock();
                    } catch (Throwable th) {
                        AMQTopicMessageStore.this.lock.unlock();
                        throw th;
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (isDebugEnabled) {
                        AMQTopicMessageStore.LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + writeCommand);
                    }
                    AMQTopicMessageStore.this.lock.lock();
                    try {
                        AMQTopicMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        AMQTopicMessageStore.this.lock.unlock();
                    } catch (Throwable th) {
                        AMQTopicMessageStore.this.lock.unlock();
                        throw th;
                    }
                }
            });
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public boolean replayAcknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId) {
        try {
            if (this.topicReferenceStore.lookupSubscription(str, str2) == null) {
                return false;
            }
            this.topicReferenceStore.acknowledge(connectionContext, str, str2, messageId, null);
            return true;
        } catch (Throwable th) {
            LOG.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + th);
            return false;
        }
    }

    protected void acknowledge(ConnectionContext connectionContext, MessageId messageId, Location location, String str, String str2) throws IOException {
        MessageAck messageAck = null;
        this.lock.lock();
        try {
            this.lastLocation = location;
            this.lock.unlock();
            if (this.topicReferenceStore.acknowledgeReference(connectionContext, str, str2, messageId)) {
                messageAck = new MessageAck();
                messageAck.setLastMessageId(messageId);
            }
            if (messageAck != null) {
                removeMessage(connectionContext, messageAck);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public TopicReferenceStore getTopicReferenceStore() {
        return this.topicReferenceStore;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void deleteSubscription(String str, String str2) throws IOException {
        this.topicReferenceStore.deleteSubscription(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return this.topicReferenceStore.getAllSubscriptions();
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public int getMessageCount(String str, String str2) throws IOException {
        flush();
        try {
            MessageCounter messageCounter = new MessageCounter(lookupSubscription(str, str2), this);
            this.topicReferenceStore.recoverSubscription(str, str2, messageCounter);
            return messageCounter.count;
        } catch (Exception e) {
            throw IOExceptionSupport.create(e);
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void resetBatching(String str, String str2) {
        this.topicReferenceStore.resetBatching(str, str2);
    }
}
