package org.apache.activemq.store.jdbc;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.derby.iapi.sql.compile.TypeCompiler;
import org.apache.derby.impl.store.raw.log.LogCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-core-5.5.1-fuse-01-20.jar:org/apache/activemq/store/jdbc/JDBCTopicMessageStore.class */
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
    private Map<String, LastRecovered> subscriberLastRecoveredMap;
    private final ReentrantReadWriteLock sequenceIdCacheSizeLock;
    private Map<MessageId, long[]> sequenceIdCache;
    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
    public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
    private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:activemq-core-5.5.1-fuse-01-20.jar:org/apache/activemq/store/jdbc/JDBCTopicMessageStore$LastRecovered.class */
    public class LastRecovered implements Iterable<LastRecoveredEntry> {
        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:activemq-core-5.5.1-fuse-01-20.jar:org/apache/activemq/store/jdbc/JDBCTopicMessageStore$LastRecovered$PriorityIterator.class */
        public class PriorityIterator implements Iterator<LastRecoveredEntry> {
            int current = 9;

            PriorityIterator() {
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                for (int i = this.current; i >= 0; i--) {
                    if (LastRecovered.this.perPriority[i].hasMessages()) {
                        this.current = i;
                        return true;
                    }
                }
                return false;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public LastRecoveredEntry next() {
                return LastRecovered.this.perPriority[this.current];
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new RuntimeException("not implemented");
            }
        }

        LastRecovered() {
            for (int i = 0; i < this.perPriority.length; i++) {
                this.perPriority[i] = new LastRecoveredEntry(i);
            }
        }

        public void updateStored(long j, int i) {
            this.perPriority[i].stored = j;
        }

        public LastRecoveredEntry defaultPriority() {
            return this.perPriority[4];
        }

        public String toString() {
            return Arrays.deepToString(this.perPriority);
        }

        @Override // java.lang.Iterable
        public Iterator<LastRecoveredEntry> iterator() {
            return new PriorityIterator();
        }
    }

    /* loaded from: input_file:activemq-core-5.5.1-fuse-01-20.jar:org/apache/activemq/store/jdbc/JDBCTopicMessageStore$LastRecoveredAwareListener.class */
    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
        final MessageRecoveryListener delegate;
        final int maxMessages;
        LastRecoveredEntry lastRecovered;
        int recoveredCount;
        int recoveredMarker;

        public LastRecoveredAwareListener(MessageRecoveryListener messageRecoveryListener, int i) {
            this.delegate = messageRecoveryListener;
            this.maxMessages = i;
        }

        @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
        public boolean recoverMessage(long j, byte[] bArr) throws Exception {
            if (!this.delegate.hasSpace() || this.recoveredCount >= this.maxMessages) {
                return false;
            }
            Message message = (Message) JDBCTopicMessageStore.this.wireFormat.unmarshal(new ByteSequence(bArr));
            message.getMessageId().setBrokerSequenceId(j);
            this.lastRecovered.recovered = j;
            if (!this.delegate.recoverMessage(message)) {
                return false;
            }
            this.recoveredCount++;
            return true;
        }

        @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
        public boolean recoverMessageReference(String str) throws Exception {
            return this.delegate.recoverMessageReference(new MessageId(str));
        }

        public void setLastRecovered(LastRecoveredEntry lastRecoveredEntry) {
            this.lastRecovered = lastRecoveredEntry;
            this.recoveredMarker = this.recoveredCount;
        }

        public boolean complete() {
            return !this.delegate.hasSpace() || this.recoveredCount == this.maxMessages;
        }

        public boolean stalled() {
            return this.recoveredMarker == this.recoveredCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:activemq-core-5.5.1-fuse-01-20.jar:org/apache/activemq/store/jdbc/JDBCTopicMessageStore$LastRecoveredEntry.class */
    public class LastRecoveredEntry {
        final int priority;
        long recovered = 0;
        long stored = LogCounter.MAX_LOGFILE_NUMBER;

        public LastRecoveredEntry(int i) {
            this.priority = i;
        }

        public String toString() {
            return this.priority + TypeCompiler.MINUS_OP + this.stored + ":" + this.recovered;
        }

        public void exhausted() {
            this.stored = this.recovered;
        }

        public boolean hasMessages() {
            return this.stored > this.recovered;
        }
    }

    public JDBCTopicMessageStore(JDBCPersistenceAdapter jDBCPersistenceAdapter, JDBCAdapter jDBCAdapter, WireFormat wireFormat, ActiveMQTopic activeMQTopic, ActiveMQMessageAudit activeMQMessageAudit) {
        super(jDBCPersistenceAdapter, jDBCAdapter, wireFormat, activeMQTopic, activeMQMessageAudit);
        this.subscriberLastRecoveredMap = new ConcurrentHashMap();
        this.sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
        this.sequenceIdCache = new LinkedHashMap<MessageId, long[]>() { // from class: org.apache.activemq.store.jdbc.JDBCTopicMessageStore.1
            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> entry) {
                return size() > JDBCTopicMessageStore.SEQUENCE_ID_CACHE_SIZE;
            }
        };
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
        if (messageAck != null && messageAck.isUnmatchedAck()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
                return;
            }
            return;
        }
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext(connectionContext);
        try {
            try {
                long[] cachedStoreSequenceId = getCachedStoreSequenceId(transactionContext, this.destination, messageId);
                if (isPrioritizedMessages()) {
                    this.adapter.doSetLastAckWithPriority(transactionContext, this.destination, str, str2, cachedStoreSequenceId[0], cachedStoreSequenceId[1]);
                } else {
                    this.adapter.doSetLastAck(transactionContext, this.destination, str, str2, cachedStoreSequenceId[0], cachedStoreSequenceId[1]);
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace(str + ":" + str2 + " ack, seq: " + cachedStoreSequenceId[0] + ", priority: " + cachedStoreSequenceId[1] + " mid:" + messageId);
                }
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to store acknowledgment for: " + str + " on message " + messageId + " in container: " + e, (Exception) e);
            }
        } finally {
            transactionContext.close();
        }
    }

    private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination activeMQDestination, MessageId messageId) throws SQLException, IOException {
        this.sequenceIdCacheSizeLock.readLock().lock();
        try {
            long[] jArr = this.sequenceIdCache.get(messageId);
            this.sequenceIdCacheSizeLock.readLock().unlock();
            if (jArr == null) {
                jArr = this.adapter.getStoreSequenceId(transactionContext, activeMQDestination, messageId);
            }
            return jArr;
        } catch (Throwable th) {
            this.sequenceIdCacheSizeLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverSubscription(String str, String str2, final MessageRecoveryListener messageRecoveryListener) throws Exception {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                this.adapter.doRecoverSubscription(transactionContext, this.destination, str, str2, new JDBCMessageRecoveryListener() { // from class: org.apache.activemq.store.jdbc.JDBCTopicMessageStore.2
                    @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
                    public boolean recoverMessage(long j, byte[] bArr) throws Exception {
                        Message message = (Message) JDBCTopicMessageStore.this.wireFormat.unmarshal(new ByteSequence(bArr));
                        message.getMessageId().setBrokerSequenceId(j);
                        return messageRecoveryListener.recoverMessage(message);
                    }

                    @Override // org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener
                    public boolean recoverMessageReference(String str3) throws Exception {
                        return messageRecoveryListener.recoverMessageReference(new MessageId(str3));
                    }
                });
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to recover subscription: " + str + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void recoverNextMessages(String str, String str2, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        String subscriptionKey = getSubscriptionKey(str, str2);
        if (!this.subscriberLastRecoveredMap.containsKey(subscriptionKey)) {
            this.subscriberLastRecoveredMap.put(subscriptionKey, new LastRecovered());
        }
        LastRecovered lastRecovered = this.subscriberLastRecoveredMap.get(subscriptionKey);
        LastRecoveredAwareListener lastRecoveredAwareListener = new LastRecoveredAwareListener(messageRecoveryListener, i);
        try {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace(subscriptionKey + " existing last recovered: " + lastRecovered);
                }
                if (isPrioritizedMessages()) {
                    Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
                    while (it.hasNext() && !lastRecoveredAwareListener.complete()) {
                        LastRecoveredEntry next = it.next();
                        lastRecoveredAwareListener.setLastRecovered(next);
                        this.adapter.doRecoverNextMessagesWithPriority(transactionContext, this.destination, str, str2, next.recovered, next.priority, i, lastRecoveredAwareListener);
                        if (lastRecoveredAwareListener.stalled()) {
                            if (lastRecoveredAwareListener.complete()) {
                                break;
                            } else {
                                next.exhausted();
                            }
                        }
                    }
                } else {
                    LastRecoveredEntry defaultPriority = lastRecovered.defaultPriority();
                    lastRecoveredAwareListener.setLastRecovered(defaultPriority);
                    this.adapter.doRecoverNextMessages(transactionContext, this.destination, str, str2, defaultPriority.recovered, 0L, i, lastRecoveredAwareListener);
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace(subscriptionKey + " last recovered: " + lastRecovered);
                }
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                transactionContext.close();
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void resetBatching(String str, String str2) {
        this.subscriberLastRecoveredMap.remove(getSubscriptionKey(str, str2));
    }

    @Override // org.apache.activemq.store.jdbc.JDBCMessageStore
    protected void onAdd(MessageId messageId, long j, byte b) {
        Iterator<LastRecovered> it = this.subscriberLastRecoveredMap.values().iterator();
        while (it.hasNext()) {
            it.next().updateStored(j, b);
        }
        this.sequenceIdCacheSizeLock.writeLock().lock();
        try {
            this.sequenceIdCache.put(messageId, new long[]{j, b});
            this.sequenceIdCacheSizeLock.writeLock().unlock();
        } catch (Throwable th) {
            this.sequenceIdCacheSizeLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                transactionContext = this.persistenceAdapter.getTransactionContext();
                this.adapter.doSetSubscriberEntry(transactionContext, subscriptionInfo, z, isPrioritizedMessages());
                transactionContext.close();
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                SubscriptionInfo doGetSubscriberEntry = this.adapter.doGetSubscriberEntry(transactionContext, this.destination, str, str2);
                transactionContext.close();
                return doGetSubscriberEntry;
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to lookup subscription for: " + str + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void deleteSubscription(String str, String str2) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                this.adapter.doDeleteSubscription(transactionContext, this.destination, str, str2);
                transactionContext.close();
                resetBatching(str, str2);
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to remove subscription for: " + str + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            resetBatching(str, str2);
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                SubscriptionInfo[] doGetAllSubscriptions = this.adapter.doGetAllSubscriptions(transactionContext, this.destination);
                transactionContext.close();
                return doGetAllSubscriptions;
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public int getMessageCount(String str, String str2) throws IOException {
        TransactionContext transactionContext = this.persistenceAdapter.getTransactionContext();
        try {
            try {
                int doGetDurableSubscriberMessageCount = this.adapter.doGetDurableSubscriberMessageCount(transactionContext, this.destination, str, str2, isPrioritizedMessages());
                transactionContext.close();
                if (LOG.isTraceEnabled()) {
                    LOG.trace(str + ":" + str2 + ", messageCount: " + doGetDurableSubscriberMessageCount);
                }
                return doGetDurableSubscriberMessageCount;
            } catch (SQLException e) {
                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
                throw IOExceptionSupport.create("Failed to get Message Count: " + str + ". Reason: " + e, (Exception) e);
            }
        } catch (Throwable th) {
            transactionContext.close();
            throw th;
        }
    }

    protected String getSubscriptionKey(String str, String str2) {
        return (str + ":") + (str2 != null ? str2 : "NOT_SET");
    }
}
