package org.apache.activemq.store.journal;

import java.io.IOException;
import java.util.HashMap;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:fuse-esb-7.1.0.fuse-SNAPSHOT/system/org/fusesource/fabric/fabric-rest/7.1.0.fuse-046/fabric-rest-7.1.0.fuse-046.war:WEB-INF/lib/activemq-core-5.7.0.fuse-71-046.jar:org/apache/activemq/store/journal/JournalTopicMessageStore.class
 */
/* loaded from: input_file:fuse-esb-7.1.0.fuse-SNAPSHOT/system/org/apache/activemq/activemq-core/5.7.0.fuse-71-046/activemq-core-5.7.0.fuse-71-046.jar:org/apache/activemq/store/journal/JournalTopicMessageStore.class */
public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
    private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class);
    private TopicMessageStore longTermStore;
    private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations;

    public JournalTopicMessageStore(JournalPersistenceAdapter journalPersistenceAdapter, TopicMessageStore topicMessageStore, ActiveMQTopic activeMQTopic) {
        super(journalPersistenceAdapter, topicMessageStore, activeMQTopic);
        this.ackedLastAckLocations = new HashMap<>();
        this.longTermStore = topicMessageStore;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recoverSubscription(str, str2, messageRecoveryListener);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverNextMessages(String str, String str2, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recoverNextMessages(str, str2, i, messageRecoveryListener);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return this.longTermStore.lookupSubscription(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.addSubsciption(subscriptionInfo, z);
    }

    @Override // org.apache.activemq.store.journal.JournalMessageStore, org.apache.activemq.store.MessageStore
    public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        super.addMessage(connectionContext, message);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void acknowledge(ConnectionContext connectionContext, String str, String str2, final MessageId messageId, MessageAck messageAck) throws IOException {
        final boolean isDebugEnabled = LOG.isDebugEnabled();
        JournalTopicAck journalTopicAck = new JournalTopicAck();
        journalTopicAck.setDestination(this.destination);
        journalTopicAck.setMessageId(messageId);
        journalTopicAck.setMessageSequenceId(messageId.getBrokerSequenceId());
        journalTopicAck.setSubscritionName(str2);
        journalTopicAck.setClientId(str);
        journalTopicAck.setTransactionId(connectionContext.getTransaction() != null ? connectionContext.getTransaction().getTransactionId() : null);
        final RecordLocation writeCommand = this.peristenceAdapter.writeCommand(journalTopicAck, false);
        final SubscriptionKey subscriptionKey = new SubscriptionKey(str, str2);
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + writeCommand);
            }
            acknowledge(messageId, writeCommand, subscriptionKey);
        } else {
            if (isDebugEnabled) {
                LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + writeCommand);
            }
            synchronized (this) {
                this.inFlightTxLocations.add(writeCommand);
            }
            this.transactionStore.acknowledge(this, journalTopicAck, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.store.journal.JournalTopicMessageStore.1
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (isDebugEnabled) {
                        JournalTopicMessageStore.LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + writeCommand);
                    }
                    synchronized (JournalTopicMessageStore.this) {
                        JournalTopicMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        JournalTopicMessageStore.this.acknowledge(messageId, writeCommand, subscriptionKey);
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (isDebugEnabled) {
                        JournalTopicMessageStore.LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + writeCommand);
                    }
                    synchronized (JournalTopicMessageStore.this) {
                        JournalTopicMessageStore.this.inFlightTxLocations.remove(writeCommand);
                    }
                }
            });
        }
    }

    public void replayAcknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId) {
        try {
            if (this.longTermStore.lookupSubscription(str, str2) != null) {
                this.longTermStore.acknowledge(connectionContext, str, str2, messageId, null);
            }
        } catch (Throwable th) {
            LOG.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + th);
        }
    }

    protected void acknowledge(MessageId messageId, RecordLocation recordLocation, SubscriptionKey subscriptionKey) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            this.ackedLastAckLocations.put(subscriptionKey, messageId);
        }
    }

    @Override // org.apache.activemq.store.journal.JournalMessageStore
    public RecordLocation checkpoint() throws IOException {
        final HashMap<SubscriptionKey, MessageId> hashMap;
        synchronized (this) {
            hashMap = this.ackedLastAckLocations;
            this.ackedLastAckLocations = new HashMap<>();
        }
        return super.checkpoint(new Callback() { // from class: org.apache.activemq.store.journal.JournalTopicMessageStore.2
            @Override // org.apache.activemq.util.Callback
            public void execute() throws Exception {
                for (SubscriptionKey subscriptionKey : hashMap.keySet()) {
                    JournalTopicMessageStore.this.longTermStore.acknowledge(JournalTopicMessageStore.this.transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, (MessageId) hashMap.get(subscriptionKey), null);
                }
            }
        });
    }

    public TopicMessageStore getLongTermTopicMessageStore() {
        return this.longTermStore;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void deleteSubscription(String str, String str2) throws IOException {
        this.longTermStore.deleteSubscription(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return this.longTermStore.getAllSubscriptions();
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public int getMessageCount(String str, String str2) throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        return this.longTermStore.getMessageCount(str, str2);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void resetBatching(String str, String str2) {
        this.longTermStore.resetBatching(str, str2);
    }
}
