package org.apache.activemq.store.kahadb;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore.class */
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
    private static final int MAX_ASYNC_JOBS = 10000;
    protected ExecutorService queueExecutor;
    protected ExecutorService topicExecutor;
    private SystemUsage usageManager;
    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
    Semaphore globalQueueSemaphore;
    Semaphore globalTopicSemaphore;
    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, CustomBooleanEditor.VALUE_1), 10);
    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList();
    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList();
    final WireFormat wireFormat = new OpenWireFormat();
    private boolean concurrentStoreAndDispatchQueues = true;
    private boolean concurrentStoreAndDispatchTopics = false;
    private boolean concurrentStoreAndDispatchTransactions = false;
    private int maxAsyncJobs = 10000;
    private final KahaDBTransactionStore transactionStore = new KahaDBTransactionStore(this);
    private TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.1
        @Override // org.apache.activemq.store.kahadb.TransactionIdTransformer
        public KahaTransactionInfo transform(TransactionId transactionId) {
            return TransactionIdConversion.convert(transactionId);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$AsyncJobKey.class */
    public static class AsyncJobKey {
        MessageId id;
        ActiveMQDestination destination;

        AsyncJobKey(MessageId messageId, ActiveMQDestination activeMQDestination) {
            this.id = messageId;
            this.destination = activeMQDestination;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            return (obj instanceof AsyncJobKey) && this.id.equals(((AsyncJobKey) obj).id) && this.destination.equals(((AsyncJobKey) obj).destination);
        }

        public int hashCode() {
            return this.id.hashCode() + this.destination.hashCode();
        }

        public String toString() {
            return this.destination.getPhysicalName() + "-" + this.id;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$KahaDBMessageStore.class */
    public class KahaDBMessageStore extends AbstractMessageStore {
        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap;
        protected KahaDestination dest;
        private final int maxAsyncJobs;
        private final Semaphore localDestinationSemaphore;
        double doneTasks;
        double canceledTasks;

        public KahaDBMessageStore(ActiveMQDestination activeMQDestination) {
            super(activeMQDestination);
            this.asyncTaskMap = new HashMap();
            this.canceledTasks = 0.0d;
            this.dest = KahaDBStore.this.convert(activeMQDestination);
            this.maxAsyncJobs = KahaDBStore.this.getMaxAsyncJobs();
            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public ActiveMQDestination getDestination() {
            return this.destination;
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public Future<Object> asyncAddQueueMessage(ConnectionContext connectionContext, Message message) throws IOException {
            if (!KahaDBStore.this.isConcurrentStoreAndDispatchQueues()) {
                return super.asyncAddQueueMessage(connectionContext, message);
            }
            StoreQueueTask storeQueueTask = new StoreQueueTask(this, connectionContext, message);
            storeQueueTask.aquireLocks();
            KahaDBStore.this.addQueueTask(this, storeQueueTask);
            return storeQueueTask.getFuture();
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public void removeAsyncMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
            StoreQueueTask storeQueueTask;
            if (!KahaDBStore.this.isConcurrentStoreAndDispatchQueues()) {
                removeMessage(connectionContext, messageAck);
                return;
            }
            AsyncJobKey asyncJobKey = new AsyncJobKey(messageAck.getLastMessageId(), getDestination());
            synchronized (this.asyncTaskMap) {
                storeQueueTask = (StoreQueueTask) this.asyncTaskMap.get(asyncJobKey);
            }
            if (storeQueueTask == null) {
                removeMessage(connectionContext, messageAck);
                return;
            }
            if (storeQueueTask.cancel()) {
                synchronized (this.asyncTaskMap) {
                    this.asyncTaskMap.remove(asyncJobKey);
                }
            } else {
                try {
                    storeQueueTask.future.get();
                } catch (InterruptedException e) {
                    throw new InterruptedIOException(e.toString());
                } catch (Exception e2) {
                    KahaDBStore.LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", (Throwable) e2);
                }
                removeMessage(connectionContext, messageAck);
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
            KahaAddMessageCommand kahaAddMessageCommand = new KahaAddMessageCommand();
            kahaAddMessageCommand.setDestination(this.dest);
            kahaAddMessageCommand.setMessageId(message.getMessageId().toString());
            kahaAddMessageCommand.setTransactionInfo(KahaDBStore.this.transactionIdTransformer.transform(message.getTransactionId()));
            kahaAddMessageCommand.setPriority(message.getPriority());
            kahaAddMessageCommand.setPrioritySupported(isPrioritizedMessages());
            ByteSequence marshal = KahaDBStore.this.wireFormat.marshal(message);
            kahaAddMessageCommand.setMessage(new Buffer(marshal.getData(), marshal.getOffset(), marshal.getLength()));
            KahaDBStore.this.store(kahaAddMessageCommand, KahaDBStore.this.isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
        }

        @Override // org.apache.activemq.store.MessageStore
        public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
            KahaRemoveMessageCommand kahaRemoveMessageCommand = new KahaRemoveMessageCommand();
            kahaRemoveMessageCommand.setDestination(this.dest);
            kahaRemoveMessageCommand.setMessageId(messageAck.getLastMessageId().toString());
            kahaRemoveMessageCommand.setTransactionInfo(KahaDBStore.this.transactionIdTransformer.transform(messageAck.getTransactionId()));
            ByteSequence marshal = KahaDBStore.this.wireFormat.marshal(messageAck);
            kahaRemoveMessageCommand.setAck(new Buffer(marshal.getData(), marshal.getOffset(), marshal.getLength()));
            KahaDBStore.this.store(kahaRemoveMessageCommand, KahaDBStore.this.isEnableJournalDiskSyncs() && messageAck.isResponseRequired(), null, null);
        }

        @Override // org.apache.activemq.store.MessageStore
        public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
            KahaRemoveDestinationCommand kahaRemoveDestinationCommand = new KahaRemoveDestinationCommand();
            kahaRemoveDestinationCommand.setDestination(this.dest);
            KahaDBStore.this.store(kahaRemoveDestinationCommand, true, null, null);
        }

        @Override // org.apache.activemq.store.MessageStore
        public Message getMessage(MessageId messageId) throws IOException {
            String messageId2 = messageId.toString();
            KahaDBStore.this.indexLock.readLock().lock();
            try {
                Location findMessageLocation = KahaDBStore.this.findMessageLocation(messageId2, this.dest);
                KahaDBStore.this.indexLock.readLock().unlock();
                if (findMessageLocation == null) {
                    return null;
                }
                return KahaDBStore.this.loadMessage(findMessageLocation);
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.readLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public int getMessageCount() throws IOException {
            try {
                lockAsyncJobQueue();
                KahaDBStore.this.indexLock.readLock().lock();
                try {
                    int intValue = ((Integer) KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // org.apache.kahadb.page.Transaction.CallableClosure
                        public Integer execute(Transaction transaction) throws IOException {
                            int i = 0;
                            Iterator<Map.Entry<Location, Long>> it = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction).locationIndex.iterator(transaction);
                            while (it.hasNext()) {
                                it.next();
                                i++;
                            }
                            return Integer.valueOf(i);
                        }
                    })).intValue();
                    KahaDBStore.this.indexLock.readLock().unlock();
                    unlockAsyncJobQueue();
                    return intValue;
                } catch (Throwable th) {
                    KahaDBStore.this.indexLock.readLock().unlock();
                    throw th;
                }
            } catch (Throwable th2) {
                unlockAsyncJobQueue();
                throw th2;
            }
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public boolean isEmpty() throws IOException {
            KahaDBStore.this.indexLock.readLock().lock();
            try {
                boolean booleanValue = ((Boolean) KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.kahadb.page.Transaction.CallableClosure
                    public Boolean execute(Transaction transaction) throws IOException {
                        return Boolean.valueOf(KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction).locationIndex.isEmpty(transaction));
                    }
                })).booleanValue();
                KahaDBStore.this.indexLock.readLock().unlock();
                return booleanValue;
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.readLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public void recover(final MessageRecoveryListener messageRecoveryListener) throws Exception {
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore.3
                    @Override // org.apache.kahadb.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction);
                        storedDestination.orderIndex.resetCursorPosition();
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> it = storedDestination.orderIndex.iterator(transaction);
                        while (messageRecoveryListener.hasSpace() && it.hasNext()) {
                            Map.Entry<Long, MessageDatabase.MessageKeys> next = it.next();
                            if (!KahaDBStore.this.ackedAndPrepared.contains(next.getValue().messageId)) {
                                messageRecoveryListener.recoverMessage(KahaDBStore.this.loadMessage(next.getValue().location));
                            }
                        }
                    }
                });
                KahaDBStore.this.indexLock.writeLock().unlock();
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.writeLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public void recoverNextMessages(final int i, final MessageRecoveryListener messageRecoveryListener) throws Exception {
            KahaDBStore.this.indexLock.readLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore.4
                    @Override // org.apache.kahadb.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction);
                        int i2 = 0;
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> it = storedDestination.orderIndex.iterator(transaction);
                        while (messageRecoveryListener.hasSpace() && it.hasNext()) {
                            Map.Entry<Long, MessageDatabase.MessageKeys> next = it.next();
                            if (!KahaDBStore.this.ackedAndPrepared.contains(next.getValue().messageId)) {
                                messageRecoveryListener.recoverMessage(KahaDBStore.this.loadMessage(next.getValue().location));
                                i2++;
                                if (i2 >= i) {
                                    break;
                                }
                            }
                        }
                        storedDestination.orderIndex.stoppedIterating();
                    }
                });
                KahaDBStore.this.indexLock.readLock().unlock();
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.readLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public void resetBatching() {
            if (KahaDBStore.this.pageFile.isLoaded()) {
                try {
                    KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore.5
                        @Override // org.apache.kahadb.page.Transaction.Closure
                        public void execute(Transaction transaction) throws Exception {
                            MessageDatabase.StoredDestination existingStoredDestination = KahaDBStore.this.getExistingStoredDestination(KahaDBMessageStore.this.dest, transaction);
                            if (existingStoredDestination != null) {
                                existingStoredDestination.orderIndex.resetCursorPosition();
                            }
                        }
                    });
                } catch (Exception e) {
                    KahaDBStore.LOG.error("Failed to reset batching", (Throwable) e);
                }
            }
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public void setBatch(MessageId messageId) throws IOException {
            try {
                final String messageId2 = messageId.toString();
                lockAsyncJobQueue();
                KahaDBStore.this.indexLock.writeLock().lock();
                try {
                    KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore.6
                        @Override // org.apache.kahadb.page.Transaction.Closure
                        public void execute(Transaction transaction) throws IOException {
                            MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction);
                            Long l = storedDestination.messageIdIndex.get(transaction, messageId2);
                            if (l != null) {
                                storedDestination.orderIndex.setBatch(transaction, l);
                            }
                        }
                    });
                    KahaDBStore.this.indexLock.writeLock().unlock();
                } catch (Throwable th) {
                    KahaDBStore.this.indexLock.writeLock().unlock();
                    throw th;
                }
            } finally {
                unlockAsyncJobQueue();
            }
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public void setMemoryUsage(MemoryUsage memoryUsage) {
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.Service
        public void start() throws Exception {
            super.start();
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.Service
        public void stop() throws Exception {
            super.stop();
        }

        protected void lockAsyncJobQueue() {
            try {
                this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60L, TimeUnit.SECONDS);
            } catch (Exception e) {
                KahaDBStore.LOG.error("Failed to lock async jobs for " + this.destination, (Throwable) e);
            }
        }

        protected void unlockAsyncJobQueue() {
            this.localDestinationSemaphore.release(this.maxAsyncJobs);
        }

        protected void acquireLocalAsyncLock() {
            try {
                this.localDestinationSemaphore.acquire();
            } catch (InterruptedException e) {
                KahaDBStore.LOG.error("Failed to aquire async lock for " + this.destination, (Throwable) e);
            }
        }

        protected void releaseLocalAsyncLock() {
            this.localDestinationSemaphore.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$KahaDBTopicMessageStore.class */
    public class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
        private final AtomicInteger subscriptionCount;

        public KahaDBTopicMessageStore(ActiveMQTopic activeMQTopic) throws IOException {
            super(activeMQTopic);
            this.subscriptionCount = new AtomicInteger();
            this.subscriptionCount.set(getAllSubscriptions().length);
            KahaDBStore.this.asyncTopicMaps.add(this.asyncTaskMap);
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public Future<Object> asyncAddTopicMessage(ConnectionContext connectionContext, Message message) throws IOException {
            if (!KahaDBStore.this.isConcurrentStoreAndDispatchTopics()) {
                return super.asyncAddTopicMessage(connectionContext, message);
            }
            StoreTopicTask storeTopicTask = new StoreTopicTask(this, connectionContext, message, this.subscriptionCount.get());
            storeTopicTask.aquireLocks();
            KahaDBStore.this.addTopicTask(this, storeTopicTask);
            return storeTopicTask.getFuture();
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
            StoreTopicTask storeTopicTask;
            String str3 = KahaDBStore.this.subscriptionKey(str, str2).toString();
            if (!KahaDBStore.this.isConcurrentStoreAndDispatchTopics()) {
                doAcknowledge(connectionContext, str3, messageId, messageAck);
                return;
            }
            AsyncJobKey asyncJobKey = new AsyncJobKey(messageId, getDestination());
            synchronized (this.asyncTaskMap) {
                storeTopicTask = (StoreTopicTask) this.asyncTaskMap.get(asyncJobKey);
            }
            if (storeTopicTask == null) {
                doAcknowledge(connectionContext, str3, messageId, messageAck);
                return;
            }
            if (storeTopicTask.addSubscriptionKey(str3)) {
                KahaDBStore.this.removeTopicTask(this, messageId);
                if (storeTopicTask.cancel()) {
                    synchronized (this.asyncTaskMap) {
                        this.asyncTaskMap.remove(asyncJobKey);
                    }
                }
            }
        }

        protected void doAcknowledge(ConnectionContext connectionContext, String str, MessageId messageId, MessageAck messageAck) throws IOException {
            KahaRemoveMessageCommand kahaRemoveMessageCommand = new KahaRemoveMessageCommand();
            kahaRemoveMessageCommand.setDestination(this.dest);
            kahaRemoveMessageCommand.setSubscriptionKey(str);
            kahaRemoveMessageCommand.setMessageId(messageId.toString());
            kahaRemoveMessageCommand.setTransactionInfo(KahaDBStore.this.transactionIdTransformer.transform(messageAck.getTransactionId()));
            if (messageAck != null && messageAck.isUnmatchedAck()) {
                kahaRemoveMessageCommand.setAck(MessageDatabase.UNMATCHED);
            }
            KahaDBStore.this.store(kahaRemoveMessageCommand, false, null, null);
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
            String subscriptionKey = KahaDBStore.this.subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
            KahaSubscriptionCommand kahaSubscriptionCommand = new KahaSubscriptionCommand();
            kahaSubscriptionCommand.setDestination(this.dest);
            kahaSubscriptionCommand.setSubscriptionKey(subscriptionKey.toString());
            kahaSubscriptionCommand.setRetroactive(z);
            ByteSequence marshal = KahaDBStore.this.wireFormat.marshal(subscriptionInfo);
            kahaSubscriptionCommand.setSubscriptionInfo(new Buffer(marshal.getData(), marshal.getOffset(), marshal.getLength()));
            KahaDBStore.this.store(kahaSubscriptionCommand, KahaDBStore.this.isEnableJournalDiskSyncs(), null, null);
            this.subscriptionCount.incrementAndGet();
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void deleteSubscription(String str, String str2) throws IOException {
            KahaSubscriptionCommand kahaSubscriptionCommand = new KahaSubscriptionCommand();
            kahaSubscriptionCommand.setDestination(this.dest);
            kahaSubscriptionCommand.setSubscriptionKey(KahaDBStore.this.subscriptionKey(str, str2).toString());
            KahaDBStore.this.store(kahaSubscriptionCommand, KahaDBStore.this.isEnableJournalDiskSyncs(), null, null);
            this.subscriptionCount.decrementAndGet();
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
            final ArrayList arrayList = new ArrayList();
            KahaDBStore.this.indexLock.readLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore.1
                    @Override // org.apache.kahadb.page.Transaction.Closure
                    public void execute(Transaction transaction) throws IOException {
                        Iterator<Map.Entry<String, KahaSubscriptionCommand>> it = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction).subscriptions.iterator(transaction);
                        while (it.hasNext()) {
                            arrayList.add((SubscriptionInfo) KahaDBStore.this.wireFormat.unmarshal(new DataInputStream(it.next().getValue().getSubscriptionInfo().newInput())));
                        }
                    }
                });
                KahaDBStore.this.indexLock.readLock().unlock();
                SubscriptionInfo[] subscriptionInfoArr = new SubscriptionInfo[arrayList.size()];
                arrayList.toArray(subscriptionInfoArr);
                return subscriptionInfoArr;
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.readLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(str, str2);
            KahaDBStore.this.indexLock.readLock().lock();
            try {
                SubscriptionInfo subscriptionInfo = (SubscriptionInfo) KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.kahadb.page.Transaction.CallableClosure
                    public SubscriptionInfo execute(Transaction transaction) throws IOException {
                        KahaSubscriptionCommand kahaSubscriptionCommand = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction).subscriptions.get(transaction, subscriptionKey);
                        if (kahaSubscriptionCommand == null) {
                            return null;
                        }
                        return (SubscriptionInfo) KahaDBStore.this.wireFormat.unmarshal(new DataInputStream(kahaSubscriptionCommand.getSubscriptionInfo().newInput()));
                    }
                });
                KahaDBStore.this.indexLock.readLock().unlock();
                return subscriptionInfo;
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.readLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public int getMessageCount(String str, String str2) throws IOException {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(str, str2);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                int intValue = ((Integer) KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.kahadb.page.Transaction.CallableClosure
                    public Integer execute(Transaction transaction) throws IOException {
                        MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction);
                        if (KahaDBStore.this.getLastAck(transaction, storedDestination, subscriptionKey) == null) {
                            return 0;
                        }
                        return Integer.valueOf((int) KahaDBStore.this.getStoredMessageCount(transaction, storedDestination, subscriptionKey));
                    }
                })).intValue();
                KahaDBStore.this.indexLock.writeLock().unlock();
                return intValue;
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.writeLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void recoverSubscription(String str, String str2, final MessageRecoveryListener messageRecoveryListener) throws Exception {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(str, str2);
            lookupSubscription(str, str2);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore.4
                    @Override // org.apache.kahadb.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction);
                        storedDestination.orderIndex.setBatch(transaction, KahaDBStore.this.getLastAck(transaction, storedDestination, subscriptionKey));
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> it = storedDestination.orderIndex.iterator(transaction);
                        while (it.hasNext()) {
                            messageRecoveryListener.recoverMessage(KahaDBStore.this.loadMessage(it.next().getValue().location));
                        }
                        storedDestination.orderIndex.resetCursorPosition();
                    }
                });
                KahaDBStore.this.indexLock.writeLock().unlock();
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.writeLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void recoverNextMessages(String str, String str2, final int i, final MessageRecoveryListener messageRecoveryListener) throws Exception {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(str, str2);
            lookupSubscription(str, str2);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore.5
                    @Override // org.apache.kahadb.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction);
                        storedDestination.orderIndex.resetCursorPosition();
                        MessageDatabase.MessageOrderCursor messageOrderCursor = storedDestination.subscriptionCursors.get(subscriptionKey);
                        if (messageOrderCursor == null) {
                            MessageDatabase.LastAck lastAck = KahaDBStore.this.getLastAck(transaction, storedDestination, subscriptionKey);
                            if (lastAck == null) {
                                return;
                            }
                            storedDestination.orderIndex.setBatch(transaction, lastAck);
                            messageOrderCursor = storedDestination.orderIndex.cursor;
                        } else {
                            storedDestination.orderIndex.cursor.sync(messageOrderCursor);
                        }
                        Map.Entry<Long, MessageDatabase.MessageKeys> entry = null;
                        int i2 = 0;
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> it = storedDestination.orderIndex.iterator(transaction, messageOrderCursor);
                        while (it.hasNext()) {
                            entry = it.next();
                            if (messageRecoveryListener.recoverMessage(KahaDBStore.this.loadMessage(entry.getValue().location))) {
                                i2++;
                            }
                            if (i2 >= i || !messageRecoveryListener.hasSpace()) {
                                break;
                            }
                        }
                        storedDestination.orderIndex.stoppedIterating();
                        if (entry != null) {
                            storedDestination.subscriptionCursors.put(subscriptionKey, storedDestination.orderIndex.cursor.copy());
                        }
                    }
                });
                KahaDBStore.this.indexLock.writeLock().unlock();
            } catch (Throwable th) {
                KahaDBStore.this.indexLock.writeLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void resetBatching(String str, String str2) {
            try {
                final String subscriptionKey = KahaDBStore.this.subscriptionKey(str, str2);
                KahaDBStore.this.indexLock.writeLock().lock();
                try {
                    KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.KahaDBTopicMessageStore.6
                        @Override // org.apache.kahadb.page.Transaction.Closure
                        public void execute(Transaction transaction) throws IOException {
                            KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction).subscriptionCursors.remove(subscriptionKey);
                        }
                    });
                    KahaDBStore.this.indexLock.writeLock().unlock();
                } catch (Throwable th) {
                    KahaDBStore.this.indexLock.writeLock().unlock();
                    throw th;
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$StoreQueueTask.class */
    public class StoreQueueTask implements Runnable, StoreTask {
        protected final Message message;
        protected final ConnectionContext context;
        protected final KahaDBMessageStore store;
        protected final AtomicBoolean done = new AtomicBoolean();
        protected final AtomicBoolean locked = new AtomicBoolean();
        protected final InnerFutureTask future = new InnerFutureTask(this);

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$StoreQueueTask$InnerFutureTask.class */
        public class InnerFutureTask extends FutureTask<Object> {
            public InnerFutureTask(Runnable runnable) {
                super(runnable, null);
            }

            public void setException(Exception exc) {
                super.setException((Throwable) exc);
            }

            public void complete() {
                super.set(null);
            }
        }

        public StoreQueueTask(KahaDBMessageStore kahaDBMessageStore, ConnectionContext connectionContext, Message message) {
            this.store = kahaDBMessageStore;
            this.context = connectionContext;
            this.message = message;
        }

        public Future<Object> getFuture() {
            return this.future;
        }

        @Override // org.apache.activemq.store.kahadb.KahaDBStore.StoreTask
        public boolean cancel() {
            if (this.done.compareAndSet(false, true)) {
                return this.future.cancel(false);
            }
            return false;
        }

        @Override // org.apache.activemq.store.kahadb.KahaDBStore.StoreTask
        public void aquireLocks() {
            if (this.locked.compareAndSet(false, true)) {
                try {
                    KahaDBStore.this.globalQueueSemaphore.acquire();
                    this.store.acquireLocalAsyncLock();
                    this.message.incrementReferenceCount();
                } catch (InterruptedException e) {
                    KahaDBStore.LOG.warn("Failed to aquire lock", (Throwable) e);
                }
            }
        }

        @Override // org.apache.activemq.store.kahadb.KahaDBStore.StoreTask
        public void releaseLocks() {
            if (this.locked.compareAndSet(true, false)) {
                this.store.releaseLocalAsyncLock();
                KahaDBStore.this.globalQueueSemaphore.release();
                this.message.decrementReferenceCount();
            }
        }

        /*  JADX ERROR: Failed to decode insn: 0x0053: MOVE_MULTI, method: org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask.run():void
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        /*  JADX ERROR: Failed to decode insn: 0x00A4: MOVE_MULTI, method: org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask.run():void
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        @Override // java.lang.Runnable
        public void run() {
            /*
                r8 = this;
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r0 = r0.store
                r1 = r0
                double r1 = r1.doneTasks
                r2 = 4607182418800017408(0x3ff0000000000000, double:1.0)
                double r1 = r1 + r2
                r0.doneTasks = r1
                r0 = r8
                java.util.concurrent.atomic.AtomicBoolean r0 = r0.done
                r1 = 0
                r2 = 1
                boolean r0 = r0.compareAndSet(r1, r2)
                if (r0 == 0) goto L45
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r0 = r0.store
                r1 = r8
                org.apache.activemq.broker.ConnectionContext r1 = r1.context
                r2 = r8
                org.apache.activemq.command.Message r2 = r2.message
                r0.addMessage(r1, r2)
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore r0 = org.apache.activemq.store.kahadb.KahaDBStore.this
                r1 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r1 = r1.store
                r2 = r8
                org.apache.activemq.command.Message r2 = r2.message
                org.apache.activemq.command.MessageId r2 = r2.getMessageId()
                org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask r0 = r0.removeQueueTask(r1, r2)
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask$InnerFutureTask r0 = r0.future
                r0.complete()
                goto Lab
                int r0 = org.apache.activemq.store.kahadb.KahaDBStore.cancelledTaskModMetric
                if (r0 <= 0) goto Lab
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r0 = r0.store
                r1 = r0
                double r1 = r1.canceledTasks
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 4607182418800017408(0x3ff0000000000000, double:1.0)
                double r1 = r1 + r2
                r0.canceledTasks = r1
                int r0 = org.apache.activemq.store.kahadb.KahaDBStore.cancelledTaskModMetric
                double r0 = (double) r0
                double r-1 = r-1 % r0
                r0 = 0
                int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
                if (r-1 != 0) goto Lab
                java.io.PrintStream r-1 = java.lang.System.err
                java.lang.StringBuilder r0 = new java.lang.StringBuilder
                r1 = r0
                r1.<init>()
                r1 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r1 = r1.store
                org.apache.activemq.store.kahadb.data.KahaDestination r1 = r1.dest
                java.lang.String r1 = r1.getName()
                java.lang.StringBuilder r0 = r0.append(r1)
                java.lang.String r1 = " cancelled: "
                java.lang.StringBuilder r0 = r0.append(r1)
                r1 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r1 = r1.store
                double r1 = r1.canceledTasks
                r2 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r2 = r2.store
                double r2 = r2.doneTasks
                double r1 = r1 / r2
                r2 = 4636737291354636288(0x4059000000000000, double:100.0)
                double r1 = r1 * r2
                java.lang.StringBuilder r0 = r0.append(r1)
                java.lang.String r0 = r0.toString()
                r-1.println(r0)
                r-1 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r-1 = r-1.store
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore r0 = r0.store
                r1 = 0
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r0.doneTasks = r1
                r-2.canceledTasks = r-1
                goto Lb7
                r9 = move-exception
                r0 = r8
                org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask$InnerFutureTask r0 = r0.future
                r1 = r9
                r0.setException(r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask.run():void");
        }

        protected Message getMessage() {
            return this.message;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$StoreTask.class */
    public interface StoreTask {
        boolean cancel();

        void aquireLocks();

        void releaseLocks();
    }

    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$StoreTaskExecutor.class */
    public class StoreTaskExecutor extends ThreadPoolExecutor {
        public StoreTaskExecutor(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory) {
            super(i, i2, j, timeUnit, blockingQueue, threadFactory);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void afterExecute(Runnable runnable, Throwable th) {
            super.afterExecute(runnable, th);
            if (runnable instanceof StoreTask) {
                ((StoreTask) runnable).releaseLocks();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-01-13.jar:org/apache/activemq/store/kahadb/KahaDBStore$StoreTopicTask.class */
    public class StoreTopicTask extends StoreQueueTask {
        private final int subscriptionCount;
        private final List<String> subscriptionKeys;
        private final KahaDBTopicMessageStore topicStore;

        public StoreTopicTask(KahaDBTopicMessageStore kahaDBTopicMessageStore, ConnectionContext connectionContext, Message message, int i) {
            super(kahaDBTopicMessageStore, connectionContext, message);
            this.subscriptionKeys = new ArrayList(1);
            this.topicStore = kahaDBTopicMessageStore;
            this.subscriptionCount = i;
        }

        @Override // org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask, org.apache.activemq.store.kahadb.KahaDBStore.StoreTask
        public void aquireLocks() {
            if (this.locked.compareAndSet(false, true)) {
                try {
                    KahaDBStore.this.globalTopicSemaphore.acquire();
                    this.store.acquireLocalAsyncLock();
                    this.message.incrementReferenceCount();
                } catch (InterruptedException e) {
                    KahaDBStore.LOG.warn("Failed to aquire lock", (Throwable) e);
                }
            }
        }

        @Override // org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask, org.apache.activemq.store.kahadb.KahaDBStore.StoreTask
        public void releaseLocks() {
            if (this.locked.compareAndSet(true, false)) {
                this.message.decrementReferenceCount();
                this.store.releaseLocalAsyncLock();
                KahaDBStore.this.globalTopicSemaphore.release();
            }
        }

        public boolean addSubscriptionKey(String str) {
            synchronized (this.subscriptionKeys) {
                this.subscriptionKeys.add(str);
            }
            return this.subscriptionKeys.size() >= this.subscriptionCount;
        }

        /*  JADX ERROR: Failed to decode insn: 0x009A: MOVE_MULTI, method: org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask.run():void
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        /*  JADX ERROR: Failed to decode insn: 0x00EB: MOVE_MULTI, method: org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask.run():void
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        @Override // org.apache.activemq.store.kahadb.KahaDBStore.StoreQueueTask, java.lang.Runnable
        public void run() {
            /*
                Method dump skipped, instructions count: 255
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.store.kahadb.KahaDBStore.StoreTopicTask.run():void");
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setBrokerName(String str) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setUsageManager(SystemUsage systemUsage) {
        this.usageManager = systemUsage;
    }

    public SystemUsage getUsageManager() {
        return this.usageManager;
    }

    public boolean isConcurrentStoreAndDispatchQueues() {
        return this.concurrentStoreAndDispatchQueues;
    }

    public void setConcurrentStoreAndDispatchQueues(boolean z) {
        this.concurrentStoreAndDispatchQueues = z;
    }

    public boolean isConcurrentStoreAndDispatchTopics() {
        return this.concurrentStoreAndDispatchTopics;
    }

    public void setConcurrentStoreAndDispatchTopics(boolean z) {
        this.concurrentStoreAndDispatchTopics = z;
    }

    public boolean isConcurrentStoreAndDispatchTransactions() {
        return this.concurrentStoreAndDispatchTransactions;
    }

    public int getMaxAsyncJobs() {
        return this.maxAsyncJobs;
    }

    public void setMaxAsyncJobs(int i) {
        this.maxAsyncJobs = i;
    }

    @Override // org.apache.activemq.store.kahadb.MessageDatabase, org.apache.activemq.util.ServiceSupport
    public void doStart() throws Exception {
        super.doStart();
        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
        this.asyncQueueJobQueue = new LinkedBlockingQueue<>(getMaxAsyncJobs());
        this.asyncTopicJobQueue = new LinkedBlockingQueue<>(getMaxAsyncJobs());
        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, this.asyncQueueJobQueue, new ThreadFactory() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.2
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
                thread.setDaemon(true);
                return thread;
            }
        });
        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, this.asyncTopicJobQueue, new ThreadFactory() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
                thread.setDaemon(true);
                return thread;
            }
        });
    }

    @Override // org.apache.activemq.store.kahadb.MessageDatabase, org.apache.activemq.util.ServiceSupport
    public void doStop(ServiceStopper serviceStopper) throws Exception {
        LOG.info("Stopping async queue tasks");
        if (this.globalQueueSemaphore != null) {
            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60L, TimeUnit.SECONDS);
        }
        synchronized (this.asyncQueueMaps) {
            for (Map<AsyncJobKey, StoreTask> map : this.asyncQueueMaps) {
                synchronized (map) {
                    Iterator<StoreTask> it = map.values().iterator();
                    while (it.hasNext()) {
                        it.next().cancel();
                    }
                }
            }
            this.asyncQueueMaps.clear();
        }
        LOG.info("Stopping async topic tasks");
        if (this.globalTopicSemaphore != null) {
            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60L, TimeUnit.SECONDS);
        }
        synchronized (this.asyncTopicMaps) {
            for (Map<AsyncJobKey, StoreTask> map2 : this.asyncTopicMaps) {
                synchronized (map2) {
                    Iterator<StoreTask> it2 = map2.values().iterator();
                    while (it2.hasNext()) {
                        it2.next().cancel();
                    }
                }
            }
            this.asyncTopicMaps.clear();
        }
        if (this.globalQueueSemaphore != null) {
            this.globalQueueSemaphore.drainPermits();
        }
        if (this.globalTopicSemaphore != null) {
            this.globalTopicSemaphore.drainPermits();
        }
        if (this.queueExecutor != null) {
            this.queueExecutor.shutdownNow();
        }
        if (this.topicExecutor != null) {
            this.topicExecutor.shutdownNow();
        }
        LOG.info("Stopped KahaDB");
        super.doStop(serviceStopper);
    }

    @Override // org.apache.activemq.store.kahadb.MessageDatabase
    void incrementRedeliveryAndReWrite(final String str, final KahaDestination kahaDestination) throws IOException {
        this.indexLock.writeLock().lock();
        try {
            Location findMessageLocation = findMessageLocation(str, kahaDestination);
            this.indexLock.writeLock().unlock();
            if (findMessageLocation != null) {
                KahaAddMessageCommand kahaAddMessageCommand = (KahaAddMessageCommand) load(findMessageLocation);
                Message message = (Message) this.wireFormat.unmarshal(new DataInputStream(kahaAddMessageCommand.getMessage().newInput()));
                message.incrementRedeliveryCounter();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("rewriting: " + str + " with deliveryCount: " + message.getRedeliveryCounter());
                }
                ByteSequence marshal = this.wireFormat.marshal(message);
                kahaAddMessageCommand.setMessage(new Buffer(marshal.getData(), marshal.getOffset(), marshal.getLength()));
                final Location write = this.journal.write(toByteSequence(kahaAddMessageCommand), true);
                this.indexLock.writeLock().lock();
                try {
                    this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.4
                        @Override // org.apache.kahadb.page.Transaction.Closure
                        public void execute(Transaction transaction) throws IOException {
                            MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(kahaDestination, transaction);
                            Long l = storedDestination.messageIdIndex.get(transaction, str);
                            storedDestination.orderIndex.put(transaction, storedDestination.orderIndex.lastGetPriority(), l, new MessageDatabase.MessageKeys(storedDestination.orderIndex.get(transaction, l).messageId, write));
                        }
                    });
                    this.indexLock.writeLock().unlock();
                } finally {
                }
            }
        } finally {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Location findMessageLocation(final String str, final KahaDestination kahaDestination) throws IOException {
        return (Location) this.pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.kahadb.page.Transaction.CallableClosure
            public Location execute(Transaction transaction) throws IOException {
                MessageDatabase.StoredDestination storedDestination = KahaDBStore.this.getStoredDestination(kahaDestination, transaction);
                Long l = storedDestination.messageIdIndex.get(transaction, str);
                if (l == null) {
                    return null;
                }
                return storedDestination.orderIndex.get(transaction, l).location;
            }
        });
    }

    protected StoreQueueTask removeQueueTask(KahaDBMessageStore kahaDBMessageStore, MessageId messageId) {
        StoreQueueTask storeQueueTask;
        synchronized (kahaDBMessageStore.asyncTaskMap) {
            storeQueueTask = (StoreQueueTask) kahaDBMessageStore.asyncTaskMap.remove(new AsyncJobKey(messageId, kahaDBMessageStore.getDestination()));
        }
        return storeQueueTask;
    }

    protected void addQueueTask(KahaDBMessageStore kahaDBMessageStore, StoreQueueTask storeQueueTask) throws IOException {
        synchronized (kahaDBMessageStore.asyncTaskMap) {
            kahaDBMessageStore.asyncTaskMap.put(new AsyncJobKey(storeQueueTask.getMessage().getMessageId(), kahaDBMessageStore.getDestination()), storeQueueTask);
        }
        this.queueExecutor.execute(storeQueueTask);
    }

    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore kahaDBTopicMessageStore, MessageId messageId) {
        StoreTopicTask storeTopicTask;
        synchronized (kahaDBTopicMessageStore.asyncTaskMap) {
            storeTopicTask = (StoreTopicTask) kahaDBTopicMessageStore.asyncTaskMap.remove(new AsyncJobKey(messageId, kahaDBTopicMessageStore.getDestination()));
        }
        return storeTopicTask;
    }

    protected void addTopicTask(KahaDBTopicMessageStore kahaDBTopicMessageStore, StoreTopicTask storeTopicTask) throws IOException {
        synchronized (kahaDBTopicMessageStore.asyncTaskMap) {
            kahaDBTopicMessageStore.asyncTaskMap.put(new AsyncJobKey(storeTopicTask.getMessage().getMessageId(), kahaDBTopicMessageStore.getDestination()), storeTopicTask);
        }
        this.topicExecutor.execute(storeTopicTask);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws IOException {
        return this.transactionStore;
    }

    public boolean getForceRecoverIndex() {
        return this.forceRecoverIndex;
    }

    public void setForceRecoverIndex(boolean z) {
        this.forceRecoverIndex = z;
    }

    String subscriptionKey(String str, String str2) {
        return str + ":" + str2;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public MessageStore createQueueMessageStore(ActiveMQQueue activeMQQueue) throws IOException {
        return this.transactionStore.proxy(new KahaDBMessageStore(activeMQQueue));
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TopicMessageStore createTopicMessageStore(ActiveMQTopic activeMQTopic) throws IOException {
        return this.transactionStore.proxy((TopicMessageStore) new KahaDBTopicMessageStore(activeMQTopic));
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void removeQueueMessageStore(ActiveMQQueue activeMQQueue) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void removeTopicMessageStore(ActiveMQTopic activeMQTopic) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void deleteAllMessages() throws IOException {
        this.deleteAllMessages = true;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public Set<ActiveMQDestination> getDestinations() {
        try {
            final HashSet hashSet = new HashSet();
            this.indexLock.readLock().lock();
            try {
                this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.KahaDBStore.6
                    @Override // org.apache.kahadb.page.Transaction.Closure
                    public void execute(Transaction transaction) throws IOException {
                        Iterator<Map.Entry<String, MessageDatabase.StoredDestination>> it = KahaDBStore.this.metadata.destinations.iterator(transaction);
                        while (it.hasNext()) {
                            Map.Entry<String, MessageDatabase.StoredDestination> next = it.next();
                            if (!isEmptyTopic(next, transaction)) {
                                hashSet.add(KahaDBStore.this.convert(next.getKey()));
                            }
                        }
                    }

                    private boolean isEmptyTopic(Map.Entry<String, MessageDatabase.StoredDestination> entry, Transaction transaction) throws IOException {
                        boolean z = false;
                        ActiveMQDestination convert = KahaDBStore.this.convert(entry.getKey());
                        if (convert.isTopic() && KahaDBStore.this.getStoredDestination(KahaDBStore.this.convert(convert), transaction).subscriptionAcks.isEmpty(transaction)) {
                            z = true;
                        }
                        return z;
                    }
                });
                this.indexLock.readLock().unlock();
                return hashSet;
            } catch (Throwable th) {
                this.indexLock.readLock().unlock();
                throw th;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastMessageBrokerSequenceId() throws IOException {
        return 0L;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastProducerSequenceId(ProducerId producerId) {
        this.indexLock.readLock().lock();
        try {
            long lastSeqId = this.metadata.producerSequenceIdTracker.getLastSeqId(producerId);
            this.indexLock.readLock().unlock();
            return lastSeqId;
        } catch (Throwable th) {
            this.indexLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long size() {
        return this.storeSize.get();
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void beginTransaction(ConnectionContext connectionContext) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void commitTransaction(ConnectionContext connectionContext) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void rollbackTransaction(ConnectionContext connectionContext) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void checkpoint(boolean z) throws IOException {
        super.checkpointCleanup(false);
    }

    Message loadMessage(Location location) throws IOException {
        return (Message) this.wireFormat.unmarshal(new DataInputStream(((KahaAddMessageCommand) load(location)).getMessage().newInput()));
    }

    KahaLocation convert(Location location) {
        KahaLocation kahaLocation = new KahaLocation();
        kahaLocation.setLogId(location.getDataFileId());
        kahaLocation.setOffset(location.getOffset());
        return kahaLocation;
    }

    KahaDestination convert(ActiveMQDestination activeMQDestination) {
        KahaDestination kahaDestination = new KahaDestination();
        kahaDestination.setName(activeMQDestination.getPhysicalName());
        switch (activeMQDestination.getDestinationType()) {
            case 1:
                kahaDestination.setType(KahaDestination.DestinationType.QUEUE);
                return kahaDestination;
            case 2:
                kahaDestination.setType(KahaDestination.DestinationType.TOPIC);
                return kahaDestination;
            case 3:
            case 4:
            default:
                return null;
            case 5:
                kahaDestination.setType(KahaDestination.DestinationType.TEMP_QUEUE);
                return kahaDestination;
            case 6:
                kahaDestination.setType(KahaDestination.DestinationType.TEMP_TOPIC);
                return kahaDestination;
        }
    }

    ActiveMQDestination convert(String str) {
        int indexOf = str.indexOf(":");
        if (indexOf < 0) {
            throw new IllegalArgumentException("Not in the valid destination format");
        }
        Integer.parseInt(str.substring(0, indexOf));
        String substring = str.substring(indexOf + 1);
        switch (KahaDestination.DestinationType.valueOf(r0)) {
            case QUEUE:
                return new ActiveMQQueue(substring);
            case TOPIC:
                return new ActiveMQTopic(substring);
            case TEMP_QUEUE:
                return new ActiveMQTempQueue(substring);
            case TEMP_TOPIC:
                return new ActiveMQTempTopic(substring);
            default:
                throw new IllegalArgumentException("Not in the valid destination format");
        }
    }

    public TransactionIdTransformer getTransactionIdTransformer() {
        return this.transactionIdTransformer;
    }

    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
        this.transactionIdTransformer = transactionIdTransformer;
    }
}
