/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.JournalCommand;
import org.apache.activemq.store.kahadb.KahaDBTransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.store.kahadb.TransactionIdConversion;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KahaDBStore
extends MessageDatabase
implements PersistenceAdapter {
    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
    private static final int MAX_ASYNC_JOBS = 10000;
    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty("org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC", "0"), 10);
    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty("org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS", "1"), 10);
    protected ExecutorService queueExecutor;
    protected ExecutorService topicExecutor;
    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
    final WireFormat wireFormat = new OpenWireFormat();
    private SystemUsage usageManager;
    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
    Semaphore globalQueueSemaphore;
    Semaphore globalTopicSemaphore;
    private boolean concurrentStoreAndDispatchQueues = true;
    private boolean concurrentStoreAndDispatchTopics = false;
    private final boolean concurrentStoreAndDispatchTransactions = false;
    private int maxAsyncJobs = 10000;
    private final KahaDBTransactionStore transactionStore = new KahaDBTransactionStore(this);
    private TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer(){

        public TransactionId transform(TransactionId txid) {
            return txid;
        }
    };

    public String toString() {
        return "KahaDB:[" + this.directory.getAbsolutePath() + "]";
    }

    public void setBrokerName(String brokerName) {
    }

    public void setUsageManager(SystemUsage usageManager) {
        this.usageManager = usageManager;
    }

    public SystemUsage getUsageManager() {
        return this.usageManager;
    }

    public boolean isConcurrentStoreAndDispatchQueues() {
        return this.concurrentStoreAndDispatchQueues;
    }

    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
    }

    public boolean isConcurrentStoreAndDispatchTopics() {
        return this.concurrentStoreAndDispatchTopics;
    }

    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
    }

    public boolean isConcurrentStoreAndDispatchTransactions() {
        return this.concurrentStoreAndDispatchTransactions;
    }

    public int getMaxAsyncJobs() {
        return this.maxAsyncJobs;
    }

    public void setMaxAsyncJobs(int maxAsyncJobs) {
        this.maxAsyncJobs = maxAsyncJobs;
    }

    @Override
    public void doStart() throws Exception {
        if (this.brokerService != null) {
            this.metadata.openwireVersion = this.brokerService.getStoreOpenWireVersion();
            this.wireFormat.setVersion(this.metadata.openwireVersion);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Store OpenWire version configured as: {}", (Object)this.metadata.openwireVersion);
            }
        }
        super.doStart();
        if (this.brokerService != null && this.metadata.openwireVersion != this.brokerService.getStoreOpenWireVersion()) {
            LOG.warn("Recovered Store uses a different OpenWire version[{}] than the version configured[{}].", (Object)this.metadata.openwireVersion, (Object)this.brokerService.getStoreOpenWireVersion());
        }
        this.globalQueueSemaphore = new Semaphore(this.getMaxAsyncJobs());
        this.globalTopicSemaphore = new Semaphore(this.getMaxAsyncJobs());
        this.asyncQueueJobQueue = new LinkedBlockingQueue(this.getMaxAsyncJobs());
        this.asyncTopicJobQueue = new LinkedBlockingQueue(this.getMaxAsyncJobs());
        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, this.asyncQueueJobQueue, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
                thread.setDaemon(true);
                return thread;
            }
        });
        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, this.asyncTopicJobQueue, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
                thread.setDaemon(true);
                return thread;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doStop(ServiceStopper stopper) throws Exception {
        Map<AsyncJobKey, StoreTask> map;
        Map<AsyncJobKey, StoreTask> m;
        Iterator<Map<AsyncJobKey, StoreTask>> i$;
        LOG.info("Stopping async queue tasks");
        if (this.globalQueueSemaphore != null) {
            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60L, TimeUnit.SECONDS);
        }
        List<Map<AsyncJobKey, StoreTask>> list = this.asyncQueueMaps;
        synchronized (list) {
            i$ = this.asyncQueueMaps.iterator();
            while (i$.hasNext()) {
                map = m = i$.next();
                synchronized (map) {
                    for (StoreTask task : m.values()) {
                        task.cancel();
                    }
                }
            }
            this.asyncQueueMaps.clear();
        }
        LOG.info("Stopping async topic tasks");
        if (this.globalTopicSemaphore != null) {
            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60L, TimeUnit.SECONDS);
        }
        list = this.asyncTopicMaps;
        synchronized (list) {
            i$ = this.asyncTopicMaps.iterator();
            while (i$.hasNext()) {
                map = m = i$.next();
                synchronized (map) {
                    for (StoreTask task : m.values()) {
                        task.cancel();
                    }
                }
            }
            this.asyncTopicMaps.clear();
        }
        if (this.globalQueueSemaphore != null) {
            this.globalQueueSemaphore.drainPermits();
        }
        if (this.globalTopicSemaphore != null) {
            this.globalTopicSemaphore.drainPermits();
        }
        if (this.queueExecutor != null) {
            ThreadPoolUtils.shutdownNow((ExecutorService)this.queueExecutor);
            this.queueExecutor = null;
        }
        if (this.topicExecutor != null) {
            ThreadPoolUtils.shutdownNow((ExecutorService)this.topicExecutor);
            this.topicExecutor = null;
        }
        LOG.info("Stopped KahaDB");
        super.doStop(stopper);
    }

    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
        return this.pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){

            @Override
            public Location execute(Transaction tx) throws IOException {
                MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(destination, tx);
                Long sequence = sd.messageIdIndex.get(tx, key);
                if (sequence == null) {
                    return null;
                }
                return sd.orderIndex.get((Transaction)tx, (Long)sequence).location;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
        StoreQueueTask task = null;
        Map<AsyncJobKey, StoreTask> map = store.asyncTaskMap;
        synchronized (map) {
            task = (StoreQueueTask)store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
        }
        return task;
    }

    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
        this.queueExecutor.execute(task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
        StoreTopicTask task = null;
        Map map = store.asyncTaskMap;
        synchronized (map) {
            task = (StoreTopicTask)store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
        }
        return task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
        Map map = store.asyncTaskMap;
        synchronized (map) {
            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
        }
        this.topicExecutor.execute(task);
    }

    public TransactionStore createTransactionStore() throws IOException {
        return this.transactionStore;
    }

    public boolean getForceRecoverIndex() {
        return this.forceRecoverIndex;
    }

    public void setForceRecoverIndex(boolean forceRecoverIndex) {
        this.forceRecoverIndex = forceRecoverIndex;
    }

    String subscriptionKey(String clientId, String subscriptionName) {
        return clientId + ":" + subscriptionName;
    }

    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
        return this.transactionStore.proxy((MessageStore)new KahaDBMessageStore((ActiveMQDestination)destination));
    }

    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
    }

    public void removeQueueMessageStore(ActiveMQQueue destination) {
    }

    public void removeTopicMessageStore(ActiveMQTopic destination) {
    }

    public void deleteAllMessages() throws IOException {
        this.deleteAllMessages = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Set<ActiveMQDestination> getDestinations() {
        try {
            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
            this.indexLock.writeLock().lock();
            try {
                this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                    @Override
                    public void execute(Transaction tx) throws IOException {
                        Iterator<Map.Entry<String, MessageDatabase.StoredDestination>> iterator = KahaDBStore.this.metadata.destinations.iterator(tx);
                        while (iterator.hasNext()) {
                            Map.Entry<String, MessageDatabase.StoredDestination> entry = iterator.next();
                            if (this.isEmptyTopic(entry, tx)) continue;
                            rc.add(KahaDBStore.this.convert(entry.getKey()));
                        }
                    }

                    private boolean isEmptyTopic(Map.Entry<String, MessageDatabase.StoredDestination> entry, Transaction tx) throws IOException {
                        boolean isEmptyTopic = false;
                        ActiveMQDestination dest = KahaDBStore.this.convert(entry.getKey());
                        if (dest.isTopic()) {
                            MessageDatabase.StoredDestination loadedStore = KahaDBStore.this.getStoredDestination(KahaDBStore.this.convert(dest), tx);
                            if (loadedStore.subscriptionAcks.isEmpty(tx)) {
                                isEmptyTopic = true;
                            }
                        }
                        return isEmptyTopic;
                    }
                });
            }
            finally {
                this.indexLock.writeLock().unlock();
            }
            return rc;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public long getLastMessageBrokerSequenceId() throws IOException {
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getLastProducerSequenceId(ProducerId id) {
        this.indexLock.writeLock().lock();
        try {
            long l = this.metadata.producerSequenceIdTracker.getLastSeqId(id);
            return l;
        }
        finally {
            this.indexLock.writeLock().unlock();
        }
    }

    public long size() {
        try {
            return this.journalSize.get() + this.getPageFile().getDiskSize();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void beginTransaction(ConnectionContext context) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    public void commitTransaction(ConnectionContext context) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    public void rollbackTransaction(ConnectionContext context) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    public void checkpoint(boolean sync) throws IOException {
        super.checkpointCleanup(sync);
    }

    Message loadMessage(Location location) throws IOException {
        try {
            JournalCommand<?> command = this.load(location);
            KahaAddMessageCommand addMessage = null;
            switch (command.type()) {
                case KAHA_UPDATE_MESSAGE_COMMAND: {
                    addMessage = ((KahaUpdateMessageCommand)command).getMessage();
                    break;
                }
                default: {
                    addMessage = (KahaAddMessageCommand)command;
                }
            }
            Message msg = (Message)this.wireFormat.unmarshal((DataInput)new DataInputStream((InputStream)addMessage.getMessage().newInput()));
            return msg;
        }
        catch (IOException ioe) {
            LOG.error("Failed to load message at: {}", (Object)location, (Object)ioe);
            this.brokerService.handleIOException(ioe);
            throw ioe;
        }
    }

    KahaLocation convert(Location location) {
        KahaLocation rc = new KahaLocation();
        rc.setLogId(location.getDataFileId());
        rc.setOffset(location.getOffset());
        return rc;
    }

    KahaDestination convert(ActiveMQDestination dest) {
        KahaDestination rc = new KahaDestination();
        rc.setName(dest.getPhysicalName());
        switch (dest.getDestinationType()) {
            case 1: {
                rc.setType(KahaDestination.DestinationType.QUEUE);
                return rc;
            }
            case 2: {
                rc.setType(KahaDestination.DestinationType.TOPIC);
                return rc;
            }
            case 5: {
                rc.setType(KahaDestination.DestinationType.TEMP_QUEUE);
                return rc;
            }
            case 6: {
                rc.setType(KahaDestination.DestinationType.TEMP_TOPIC);
                return rc;
            }
        }
        return null;
    }

    ActiveMQDestination convert(String dest) {
        int p = dest.indexOf(":");
        if (p < 0) {
            throw new IllegalArgumentException("Not in the valid destination format");
        }
        int type = Integer.parseInt(dest.substring(0, p));
        String name = dest.substring(p + 1);
        return this.convert(type, name);
    }

    private ActiveMQDestination convert(KahaDestination commandDestination) {
        return this.convert(commandDestination.getType().getNumber(), commandDestination.getName());
    }

    private ActiveMQDestination convert(int type, String name) {
        switch (KahaDestination.DestinationType.valueOf(type)) {
            case QUEUE: {
                return new ActiveMQQueue(name);
            }
            case TOPIC: {
                return new ActiveMQTopic(name);
            }
            case TEMP_QUEUE: {
                return new ActiveMQTempQueue(name);
            }
            case TEMP_TOPIC: {
                return new ActiveMQTempTopic(name);
            }
        }
        throw new IllegalArgumentException("Not in the valid destination format");
    }

    public TransactionIdTransformer getTransactionIdTransformer() {
        return this.transactionIdTransformer;
    }

    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
        this.transactionIdTransformer = transactionIdTransformer;
    }

    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
        return new JobSchedulerStoreImpl();
    }

    public class StoreTaskExecutor
    extends ThreadPoolExecutor {
        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
        }

        @Override
        protected void afterExecute(Runnable runnable, Throwable throwable) {
            super.afterExecute(runnable, throwable);
            if (runnable instanceof StoreTask) {
                ((StoreTask)((Object)runnable)).releaseLocks();
            }
        }
    }

    class StoreTopicTask
    extends StoreQueueTask {
        private final int subscriptionCount;
        private final List<String> subscriptionKeys;
        private final KahaDBTopicMessageStore topicStore;

        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, int subscriptionCount) {
            super(store, context, message);
            this.subscriptionKeys = new ArrayList<String>(1);
            this.topicStore = store;
            this.subscriptionCount = subscriptionCount;
        }

        @Override
        public void aquireLocks() {
            if (this.locked.compareAndSet(false, true)) {
                try {
                    KahaDBStore.this.globalTopicSemaphore.acquire();
                    this.store.acquireLocalAsyncLock();
                    this.message.incrementReferenceCount();
                }
                catch (InterruptedException e) {
                    LOG.warn("Failed to aquire lock", (Throwable)e);
                }
            }
        }

        @Override
        public void releaseLocks() {
            if (this.locked.compareAndSet(true, false)) {
                this.message.decrementReferenceCount();
                this.store.releaseLocalAsyncLock();
                KahaDBStore.this.globalTopicSemaphore.release();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean addSubscriptionKey(String key) {
            List<String> list = this.subscriptionKeys;
            synchronized (list) {
                this.subscriptionKeys.add(key);
            }
            return this.subscriptionKeys.size() >= this.subscriptionCount;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block9: {
                this.store.doneTasks += 1.0;
                try {
                    if (this.done.compareAndSet(false, true)) {
                        this.topicStore.addMessage(this.context, this.message);
                        List<String> list = this.subscriptionKeys;
                        synchronized (list) {
                            for (String key : this.subscriptionKeys) {
                                this.topicStore.doAcknowledge(this.context, key, this.message.getMessageId(), null);
                            }
                        }
                        KahaDBStore.this.removeTopicTask(this.topicStore, this.message.getMessageId());
                        this.future.complete();
                        break block9;
                    }
                    if (cancelledTaskModMetric > 0) {
                        double d = this.store.canceledTasks;
                        this.store.canceledTasks = d + 1.0;
                        if (d % (double)cancelledTaskModMetric == 0.0) {
                            System.err.println(this.store.dest.getName() + " cancelled: " + this.store.canceledTasks / this.store.doneTasks * 100.0);
                            this.store.doneTasks = 0.0;
                            this.store.canceledTasks = 0.0;
                        }
                    }
                }
                catch (Throwable t) {
                    this.future.setException(t);
                    KahaDBStore.this.removeTopicTask(this.topicStore, this.message.getMessageId());
                }
            }
        }
    }

    class StoreQueueTask
    implements Runnable,
    StoreTask {
        protected final Message message;
        protected final ConnectionContext context;
        protected final KahaDBMessageStore store;
        protected final InnerFutureTask future;
        protected final AtomicBoolean done = new AtomicBoolean();
        protected final AtomicBoolean locked = new AtomicBoolean();

        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
            this.store = store;
            this.context = context;
            this.message = message;
            this.future = new InnerFutureTask(this);
        }

        public ListenableFuture<Object> getFuture() {
            return this.future;
        }

        @Override
        public boolean cancel() {
            if (this.done.compareAndSet(false, true)) {
                return this.future.cancel(false);
            }
            return false;
        }

        @Override
        public void aquireLocks() {
            if (this.locked.compareAndSet(false, true)) {
                try {
                    KahaDBStore.this.globalQueueSemaphore.acquire();
                    this.store.acquireLocalAsyncLock();
                    this.message.incrementReferenceCount();
                }
                catch (InterruptedException e) {
                    LOG.warn("Failed to aquire lock", (Throwable)e);
                }
            }
        }

        @Override
        public void releaseLocks() {
            if (this.locked.compareAndSet(true, false)) {
                this.store.releaseLocalAsyncLock();
                KahaDBStore.this.globalQueueSemaphore.release();
                this.message.decrementReferenceCount();
            }
        }

        @Override
        public void run() {
            this.store.doneTasks += 1.0;
            try {
                if (this.done.compareAndSet(false, true)) {
                    this.store.addMessage(this.context, this.message);
                    KahaDBStore.this.removeQueueTask(this.store, this.message.getMessageId());
                    this.future.complete();
                } else if (cancelledTaskModMetric > 0) {
                    double d = this.store.canceledTasks;
                    this.store.canceledTasks = d + 1.0;
                    if (d % (double)cancelledTaskModMetric == 0.0) {
                        System.err.println(this.store.dest.getName() + " cancelled: " + this.store.canceledTasks / this.store.doneTasks * 100.0);
                        this.store.doneTasks = 0.0;
                        this.store.canceledTasks = 0.0;
                    }
                }
            }
            catch (Throwable t) {
                this.future.setException(t);
                KahaDBStore.this.removeQueueTask(this.store, this.message.getMessageId());
            }
        }

        protected Message getMessage() {
            return this.message;
        }

        private class InnerFutureTask
        extends FutureTask<Object>
        implements ListenableFuture<Object> {
            private Runnable listener;

            public InnerFutureTask(Runnable runnable) {
                super(runnable, null);
            }

            @Override
            public void setException(Throwable e) {
                super.setException(e);
            }

            public void complete() {
                super.set(null);
            }

            @Override
            public void done() {
                this.fireListener();
            }

            public void addListener(Runnable listener) {
                this.listener = listener;
                if (this.isDone()) {
                    this.fireListener();
                }
            }

            private void fireListener() {
                if (this.listener != null) {
                    try {
                        this.listener.run();
                    }
                    catch (Exception ignored) {
                        LOG.warn("Unexpected exception from future {} listener callback {}", new Object[]{this, this.listener, ignored});
                    }
                }
            }
        }
    }

    public static interface StoreTask {
        public boolean cancel();

        public void aquireLocks();

        public void releaseLocks();
    }

    static class AsyncJobKey {
        MessageId id;
        ActiveMQDestination destination;

        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
            this.id = id;
            this.destination = destination;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            return obj instanceof AsyncJobKey && this.id.equals((Object)((AsyncJobKey)obj).id) && this.destination.equals((Object)((AsyncJobKey)obj).destination);
        }

        public int hashCode() {
            return this.id.hashCode() + this.destination.hashCode();
        }

        public String toString() {
            return this.destination.getPhysicalName() + "-" + this.id;
        }
    }

    class KahaDBTopicMessageStore
    extends KahaDBMessageStore
    implements TopicMessageStore {
        private final AtomicInteger subscriptionCount;

        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
            super((ActiveMQDestination)destination);
            this.subscriptionCount = new AtomicInteger();
            this.subscriptionCount.set(this.getAllSubscriptions().length);
            if (KahaDBStore.this.isConcurrentStoreAndDispatchTopics()) {
                KahaDBStore.this.asyncTopicMaps.add(this.asyncTaskMap);
            }
        }

        public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
            if (KahaDBStore.this.isConcurrentStoreAndDispatchTopics()) {
                StoreTopicTask result = new StoreTopicTask(this, context, message, this.subscriptionCount.get());
                result.aquireLocks();
                KahaDBStore.this.addTopicTask(this, result);
                return result.getFuture();
            }
            return super.asyncAddTopicMessage(context, message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
            String subscriptionKey = KahaDBStore.this.subscriptionKey(clientId, subscriptionName).toString();
            if (KahaDBStore.this.isConcurrentStoreAndDispatchTopics()) {
                AsyncJobKey key = new AsyncJobKey(messageId, this.getDestination());
                StoreTopicTask task = null;
                Map map = this.asyncTaskMap;
                synchronized (map) {
                    task = (StoreTopicTask)this.asyncTaskMap.get(key);
                }
                if (task != null) {
                    if (task.addSubscriptionKey(subscriptionKey)) {
                        KahaDBStore.this.removeTopicTask(this, messageId);
                        if (task.cancel()) {
                            map = this.asyncTaskMap;
                            synchronized (map) {
                                this.asyncTaskMap.remove(key);
                            }
                        }
                    }
                } else {
                    this.doAcknowledge(context, subscriptionKey, messageId, ack);
                }
            } else {
                this.doAcknowledge(context, subscriptionKey, messageId, ack);
            }
        }

        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) throws IOException {
            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
            command.setDestination(this.dest);
            command.setSubscriptionKey(subscriptionKey);
            command.setMessageId(messageId.toProducerKey());
            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(KahaDBStore.this.transactionIdTransformer.transform(ack.getTransactionId())) : null);
            if (ack != null && ack.isUnmatchedAck()) {
                command.setAck(MessageDatabase.UNMATCHED);
            } else {
                ByteSequence packet = KahaDBStore.this.wireFormat.marshal((Object)ack);
                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
            }
            KahaDBStore.this.store(command, false, null, null);
        }

        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
            String subscriptionKey = KahaDBStore.this.subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
            command.setDestination(this.dest);
            command.setSubscriptionKey(subscriptionKey.toString());
            command.setRetroactive(retroactive);
            ByteSequence packet = KahaDBStore.this.wireFormat.marshal((Object)subscriptionInfo);
            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
            KahaDBStore.this.store(command, KahaDBStore.this.isEnableJournalDiskSyncs(), null, null);
            this.subscriptionCount.incrementAndGet();
        }

        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
            command.setDestination(this.dest);
            command.setSubscriptionKey(KahaDBStore.this.subscriptionKey(clientId, subscriptionName).toString());
            KahaDBStore.this.store(command, KahaDBStore.this.isEnableJournalDiskSyncs(), null, null);
            this.subscriptionCount.decrementAndGet();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
            final ArrayList subscriptions = new ArrayList();
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                    @Override
                    public void execute(Transaction tx) throws IOException {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        Iterator<Map.Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx);
                        while (iterator.hasNext()) {
                            Map.Entry<String, KahaSubscriptionCommand> entry = iterator.next();
                            SubscriptionInfo info = (SubscriptionInfo)KahaDBStore.this.wireFormat.unmarshal((DataInput)new DataInputStream((InputStream)entry.getValue().getSubscriptionInfo().newInput()));
                            subscriptions.add(info);
                        }
                    }
                });
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
            subscriptions.toArray(rc);
            return rc;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                SubscriptionInfo subscriptionInfo = KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){

                    @Override
                    public SubscriptionInfo execute(Transaction tx) throws IOException {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
                        if (command == null) {
                            return null;
                        }
                        return (SubscriptionInfo)KahaDBStore.this.wireFormat.unmarshal((DataInput)new DataInputStream((InputStream)command.getSubscriptionInfo().newInput()));
                    }
                });
                return subscriptionInfo;
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                int n = KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){

                    @Override
                    public Integer execute(Transaction tx) throws IOException {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        MessageDatabase.LastAck cursorPos = KahaDBStore.this.getLastAck(tx, sd, subscriptionKey);
                        if (cursorPos == null) {
                            return 0;
                        }
                        return (int)KahaDBStore.this.getStoredMessageCount(tx, sd, subscriptionKey);
                    }
                });
                return n;
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            SubscriptionInfo info = this.lookupSubscription(clientId, subscriptionName);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        MessageDatabase.LastAck cursorPos = KahaDBStore.this.getLastAck(tx, sd, subscriptionKey);
                        sd.orderIndex.setBatch(tx, cursorPos);
                        KahaDBTopicMessageStore.this.recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> iterator = sd.orderIndex.iterator(tx);
                        while (iterator.hasNext()) {
                            Map.Entry<Long, MessageDatabase.MessageKeys> entry = iterator.next();
                            if (KahaDBStore.this.ackedAndPrepared.contains(entry.getValue().messageId)) continue;
                            listener.recoverMessage(KahaDBStore.this.loadMessage(entry.getValue().location));
                        }
                        sd.orderIndex.resetCursorPosition();
                    }
                });
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
            final String subscriptionKey = KahaDBStore.this.subscriptionKey(clientId, subscriptionName);
            SubscriptionInfo info = this.lookupSubscription(clientId, subscriptionName);
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                        sd.orderIndex.resetCursorPosition();
                        MessageDatabase.MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                        if (moc == null) {
                            MessageDatabase.LastAck pos = KahaDBStore.this.getLastAck(tx, sd, subscriptionKey);
                            if (pos == null) {
                                return;
                            }
                            sd.orderIndex.setBatch(tx, pos);
                            moc = sd.orderIndex.cursor;
                        } else {
                            sd.orderIndex.cursor.sync(moc);
                        }
                        Map.Entry<Long, MessageDatabase.MessageKeys> entry = null;
                        int counter = KahaDBTopicMessageStore.this.recoverRolledBackAcks(sd, tx, maxReturned, listener);
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc);
                        while (iterator.hasNext()) {
                            entry = iterator.next();
                            if (KahaDBStore.this.ackedAndPrepared.contains(entry.getValue().messageId)) continue;
                            if (listener.recoverMessage(KahaDBStore.this.loadMessage(entry.getValue().location))) {
                                ++counter;
                            }
                            if (counter < maxReturned && listener.hasSpace()) continue;
                        }
                        sd.orderIndex.stoppedIterating();
                        if (entry != null) {
                            MessageDatabase.MessageOrderCursor copy = sd.orderIndex.cursor.copy();
                            sd.subscriptionCursors.put(subscriptionKey, copy);
                        }
                    }
                });
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void resetBatching(String clientId, String subscriptionName) {
            try {
                final String subscriptionKey = KahaDBStore.this.subscriptionKey(clientId, subscriptionName);
                KahaDBStore.this.indexLock.writeLock().lock();
                try {
                    KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                        @Override
                        public void execute(Transaction tx) throws IOException {
                            MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, tx);
                            sd.subscriptionCursors.remove(subscriptionKey);
                        }
                    });
                }
                finally {
                    KahaDBStore.this.indexLock.writeLock().unlock();
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public class KahaDBMessageStore
    extends AbstractMessageStore {
        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap;
        protected KahaDestination dest;
        private final int maxAsyncJobs;
        private final Semaphore localDestinationSemaphore;
        double doneTasks;
        double canceledTasks;

        public KahaDBMessageStore(ActiveMQDestination destination) {
            super(destination);
            this.asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
            this.canceledTasks = 0.0;
            this.dest = KahaDBStore.this.convert(destination);
            this.maxAsyncJobs = KahaDBStore.this.getMaxAsyncJobs();
            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
        }

        public ActiveMQDestination getDestination() {
            return this.destination;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
            if (KahaDBStore.this.isConcurrentStoreAndDispatchQueues()) {
                StoreQueueTask result = new StoreQueueTask(this, context, message);
                ListenableFuture<Object> future = result.getFuture();
                message.getMessageId().setFutureOrSequenceLong(future);
                message.setRecievedByDFBridge(true);
                result.aquireLocks();
                Map<AsyncJobKey, StoreTask> map = this.asyncTaskMap;
                synchronized (map) {
                    KahaDBStore.this.addQueueTask(this, result);
                    if (this.indexListener != null) {
                        this.indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
                    }
                }
                return future;
            }
            return super.asyncAddQueueMessage(context, message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
            if (KahaDBStore.this.isConcurrentStoreAndDispatchQueues()) {
                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), this.getDestination());
                StoreQueueTask task = null;
                Map<AsyncJobKey, StoreTask> map = this.asyncTaskMap;
                synchronized (map) {
                    task = (StoreQueueTask)this.asyncTaskMap.get(key);
                }
                if (task != null) {
                    if (ack.isInTransaction() || !task.cancel()) {
                        try {
                            task.future.get();
                        }
                        catch (InterruptedException e) {
                            throw new InterruptedIOException(e.toString());
                        }
                        catch (Exception ignored) {
                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", (Throwable)ignored);
                        }
                        this.removeMessage(context, ack);
                        return;
                    }
                    KahaDBStore.this.indexLock.writeLock().lock();
                    try {
                        KahaDBStore.this.metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
                    }
                    finally {
                        KahaDBStore.this.indexLock.writeLock().unlock();
                    }
                    map = this.asyncTaskMap;
                    synchronized (map) {
                        this.asyncTaskMap.remove(key);
                        return;
                    }
                }
                this.removeMessage(context, ack);
                return;
            }
            this.removeMessage(context, ack);
        }

        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
            KahaAddMessageCommand command = new KahaAddMessageCommand();
            command.setDestination(this.dest);
            command.setMessageId(message.getMessageId().toProducerKey());
            command.setTransactionInfo(TransactionIdConversion.convert(KahaDBStore.this.transactionIdTransformer.transform(message.getTransactionId())));
            command.setPriority(message.getPriority());
            command.setPrioritySupported(this.isPrioritizedMessages());
            ByteSequence packet = KahaDBStore.this.wireFormat.marshal((Object)message);
            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
            KahaDBStore.this.store(command, KahaDBStore.this.isEnableJournalDiskSyncs() && message.isResponseRequired(), new MessageDatabase.IndexAware(){
                Object possibleFuture;
                {
                    this.possibleFuture = message.getMessageId().getFutureOrSequenceLong();
                }

                @Override
                public void sequenceAssignedWithIndexLocked(final long sequence) {
                    message.getMessageId().setFutureOrSequenceLong((Object)sequence);
                    if (KahaDBMessageStore.this.indexListener != null && this.possibleFuture == null) {
                        KahaDBStore.this.trackPendingAdd(KahaDBMessageStore.this.dest, sequence);
                        KahaDBMessageStore.this.indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable(){

                            @Override
                            public void run() {
                                KahaDBStore.this.trackPendingAddComplete(KahaDBMessageStore.this.dest, sequence);
                            }
                        }));
                    }
                }
            }, null);
        }

        public void updateMessage(Message message) throws IOException {
            if (LOG.isTraceEnabled()) {
                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
            }
            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
            KahaAddMessageCommand command = new KahaAddMessageCommand();
            command.setDestination(this.dest);
            command.setMessageId(message.getMessageId().toProducerKey());
            command.setPriority(message.getPriority());
            command.setPrioritySupported(this.prioritizedMessages);
            ByteSequence packet = KahaDBStore.this.wireFormat.marshal((Object)message);
            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
            updateMessageCommand.setMessage(command);
            KahaDBStore.this.store(updateMessageCommand, KahaDBStore.this.isEnableJournalDiskSyncs(), null, null);
        }

        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
            command.setDestination(this.dest);
            command.setMessageId(ack.getLastMessageId().toProducerKey());
            command.setTransactionInfo(TransactionIdConversion.convert(KahaDBStore.this.transactionIdTransformer.transform(ack.getTransactionId())));
            ByteSequence packet = KahaDBStore.this.wireFormat.marshal((Object)ack);
            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
            KahaDBStore.this.store(command, KahaDBStore.this.isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
        }

        public void removeAllMessages(ConnectionContext context) throws IOException {
            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
            command.setDestination(this.dest);
            KahaDBStore.this.store(command, true, null, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Message getMessage(MessageId identity) throws IOException {
            Location location;
            String key = identity.toProducerKey();
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                location = KahaDBStore.this.findMessageLocation(key, this.dest);
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
            if (location == null) {
                return null;
            }
            return KahaDBStore.this.loadMessage(location);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int getMessageCount() throws IOException {
            try {
                this.lockAsyncJobQueue();
                KahaDBStore.this.indexLock.writeLock().lock();
                try {
                    int n = KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){

                        @Override
                        public Integer execute(Transaction tx) throws IOException {
                            MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                            int rc = 0;
                            Iterator<Map.Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
                            while (iterator.hasNext()) {
                                iterator.next();
                                ++rc;
                            }
                            return rc;
                        }
                    });
                    KahaDBStore.this.indexLock.writeLock().unlock();
                    return n;
                }
                catch (Throwable throwable) {
                    KahaDBStore.this.indexLock.writeLock().unlock();
                    throw throwable;
                }
            }
            finally {
                this.unlockAsyncJobQueue();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean isEmpty() throws IOException {
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                boolean bl = KahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){

                    @Override
                    public Boolean execute(Transaction tx) throws IOException {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        return sd.locationIndex.isEmpty(tx);
                    }
                });
                return bl;
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recover(final MessageRecoveryListener listener) throws Exception {
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        KahaDBMessageStore.this.recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
                        sd.orderIndex.resetCursorPosition();
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> iterator = sd.orderIndex.iterator(tx);
                        while (listener.hasSpace() && iterator.hasNext()) {
                            Map.Entry<Long, MessageDatabase.MessageKeys> entry = iterator.next();
                            if (KahaDBStore.this.ackedAndPrepared.contains(entry.getValue().messageId)) continue;
                            Message msg = KahaDBStore.this.loadMessage(entry.getValue().location);
                            listener.recoverMessage(msg);
                        }
                    }
                });
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                    @Override
                    public void execute(Transaction tx) throws Exception {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        Map.Entry<Long, MessageDatabase.MessageKeys> entry = null;
                        int counter = KahaDBMessageStore.this.recoverRolledBackAcks(sd, tx, maxReturned, listener);
                        Iterator<Map.Entry<Long, MessageDatabase.MessageKeys>> iterator = sd.orderIndex.iterator(tx);
                        while (iterator.hasNext()) {
                            entry = iterator.next();
                            if (KahaDBStore.this.ackedAndPrepared.contains(entry.getValue().messageId)) continue;
                            Message msg = KahaDBStore.this.loadMessage(entry.getValue().location);
                            msg.getMessageId().setFutureOrSequenceLong((Object)entry.getKey());
                            listener.recoverMessage(msg);
                            if (++counter < maxReturned) continue;
                            break;
                        }
                        sd.orderIndex.stoppedIterating();
                    }
                });
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        protected int recoverRolledBackAcks(MessageDatabase.StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
            int counter = 0;
            Iterator iterator = KahaDBStore.this.rolledBackAcks.iterator();
            while (iterator.hasNext()) {
                String id = (String)iterator.next();
                iterator.remove();
                Long sequence = sd.messageIdIndex.get(tx, id);
                if (sequence != null) {
                    if (sd.orderIndex.alreadyDispatched(sequence)) {
                        listener.recoverMessage(KahaDBStore.this.loadMessage(sd.orderIndex.get((Transaction)tx, (Long)sequence).location));
                        if (++counter < maxReturned) continue;
                        break;
                    }
                    LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", new Object[]{id, sequence, sd.orderIndex.cursor});
                    continue;
                }
                LOG.warn("Failed to locate rolled back ack message {} in {}", (Object)id, (Object)sd);
            }
            return counter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void resetBatching() {
            if (KahaDBStore.this.pageFile.isLoaded()) {
                KahaDBStore.this.indexLock.writeLock().lock();
                try {
                    KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>(){

                        @Override
                        public void execute(Transaction tx) throws Exception {
                            MessageDatabase.StoredDestination sd = KahaDBStore.this.getExistingStoredDestination(KahaDBMessageStore.this.dest, tx);
                            if (sd != null) {
                                sd.orderIndex.resetCursorPosition();
                            }
                        }
                    });
                }
                catch (Exception e) {
                    LOG.error("Failed to reset batching", (Throwable)e);
                }
                finally {
                    KahaDBStore.this.indexLock.writeLock().unlock();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setBatch(final MessageId identity) throws IOException {
            KahaDBStore.this.indexLock.writeLock().lock();
            try {
                KahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>(){

                    @Override
                    public void execute(Transaction tx) throws IOException {
                        MessageDatabase.StoredDestination sd = KahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, tx);
                        Long location = (Long)identity.getFutureOrSequenceLong();
                        Long pending = sd.orderIndex.minPendingAdd();
                        if (pending != null) {
                            location = Math.min(location, pending - 1L);
                        }
                        sd.orderIndex.setBatch(tx, location);
                    }
                });
            }
            finally {
                KahaDBStore.this.indexLock.writeLock().unlock();
            }
        }

        public void setMemoryUsage(MemoryUsage memoryUsage) {
        }

        public void start() throws Exception {
            super.start();
        }

        public void stop() throws Exception {
            super.stop();
        }

        protected void lockAsyncJobQueue() {
            try {
                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60L, TimeUnit.SECONDS)) {
                    throw new TimeoutException((Object)((Object)this) + " timeout waiting for localDestSem:" + this.localDestinationSemaphore);
                }
            }
            catch (Exception e) {
                LOG.error("Failed to lock async jobs for " + this.destination, (Throwable)e);
            }
        }

        protected void unlockAsyncJobQueue() {
            this.localDestinationSemaphore.release(this.maxAsyncJobs);
        }

        protected void acquireLocalAsyncLock() {
            try {
                this.localDestinationSemaphore.acquire();
            }
            catch (InterruptedException e) {
                LOG.error("Failed to aquire async lock for " + this.destination, (Throwable)e);
            }
        }

        protected void releaseLocalAsyncLock() {
            this.localDestinationSemaphore.release();
        }

        public String toString() {
            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + KahaDBStore.this.storedDestinations.get(KahaDBStore.this.key(this.dest));
        }
    }
}

