package org.apache.activemq.store.kahadb;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.TempMessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;

/* loaded from: input_file:WEB-INF/lib/activemq-kahadb-store-5.11.0.redhat-621216-01.jar:org/apache/activemq/store/kahadb/TempKahaDBStore.class */
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
    private final WireFormat wireFormat = new OpenWireFormat();
    private BrokerService brokerService;

    /* loaded from: input_file:WEB-INF/lib/activemq-kahadb-store-5.11.0.redhat-621216-01.jar:org/apache/activemq/store/kahadb/TempKahaDBStore$KahaDBMessageStore.class */
    public class KahaDBMessageStore extends AbstractMessageStore {
        protected KahaDestination dest;
        long cursorPos;

        public KahaDBMessageStore(ActiveMQDestination activeMQDestination) {
            super(activeMQDestination);
            this.cursorPos = 0L;
            this.dest = TempKahaDBStore.this.convert(activeMQDestination);
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public ActiveMQDestination getDestination() {
            return this.destination;
        }

        @Override // org.apache.activemq.store.MessageStore
        public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
            KahaAddMessageCommand kahaAddMessageCommand = new KahaAddMessageCommand();
            kahaAddMessageCommand.setDestination(this.dest);
            kahaAddMessageCommand.setMessageId(message.getMessageId().toProducerKey());
            TempKahaDBStore.this.processAdd(kahaAddMessageCommand, message.getTransactionId(), TempKahaDBStore.this.wireFormat.marshal(message));
        }

        @Override // org.apache.activemq.store.MessageStore
        public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
            KahaRemoveMessageCommand kahaRemoveMessageCommand = new KahaRemoveMessageCommand();
            kahaRemoveMessageCommand.setDestination(this.dest);
            kahaRemoveMessageCommand.setMessageId(messageAck.getLastMessageId().toProducerKey());
            TempKahaDBStore.this.processRemove(kahaRemoveMessageCommand, messageAck.getTransactionId());
        }

        @Override // org.apache.activemq.store.MessageStore
        public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
            KahaRemoveDestinationCommand kahaRemoveDestinationCommand = new KahaRemoveDestinationCommand();
            kahaRemoveDestinationCommand.setDestination(this.dest);
            TempKahaDBStore.this.process(kahaRemoveDestinationCommand);
        }

        @Override // org.apache.activemq.store.MessageStore
        public Message getMessage(MessageId messageId) throws IOException {
            ByteSequence byteSequence;
            final String producerKey = messageId.toProducerKey();
            synchronized (TempKahaDBStore.this.indexMutex) {
                byteSequence = (ByteSequence) TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBMessageStore.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure
                    public ByteSequence execute(Transaction transaction) throws IOException {
                        TempMessageDatabase.StoredDestination storedDestination = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction);
                        Long l = storedDestination.messageIdIndex.get(transaction, producerKey);
                        if (l == null) {
                            return null;
                        }
                        return storedDestination.orderIndex.get(transaction, l).data;
                    }
                });
            }
            if (byteSequence == null) {
                return null;
            }
            return (Message) TempKahaDBStore.this.wireFormat.unmarshal(byteSequence);
        }

        @Override // org.apache.activemq.store.MessageStore
        public int getMessageCount() throws IOException {
            int intValue;
            synchronized (TempKahaDBStore.this.indexMutex) {
                intValue = ((Integer) TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBMessageStore.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure
                    public Integer execute(Transaction transaction) throws IOException {
                        int i = 0;
                        Iterator<Map.Entry<String, Long>> it = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction).messageIdIndex.iterator(transaction);
                        while (it.hasNext()) {
                            it.next();
                            i++;
                        }
                        return Integer.valueOf(i);
                    }
                })).intValue();
            }
            return intValue;
        }

        @Override // org.apache.activemq.store.MessageStore
        public void recover(final MessageRecoveryListener messageRecoveryListener) throws Exception {
            synchronized (TempKahaDBStore.this.indexMutex) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBMessageStore.3
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> it = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction).orderIndex.iterator(transaction);
                        while (it.hasNext()) {
                            messageRecoveryListener.recoverMessage((Message) TempKahaDBStore.this.wireFormat.unmarshal(it.next().getValue().data));
                        }
                    }
                });
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public void recoverNextMessages(final int i, final MessageRecoveryListener messageRecoveryListener) throws Exception {
            synchronized (TempKahaDBStore.this.indexMutex) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBMessageStore.4
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        Map.Entry<Long, TempMessageDatabase.MessageRecord> entry = null;
                        int i2 = 0;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> it = TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction).orderIndex.iterator(transaction, Long.valueOf(KahaDBMessageStore.this.cursorPos));
                        while (it.hasNext()) {
                            entry = it.next();
                            messageRecoveryListener.recoverMessage((Message) TempKahaDBStore.this.wireFormat.unmarshal(entry.getValue().data));
                            i2++;
                            if (i2 >= i) {
                                break;
                            }
                        }
                        if (entry != null) {
                            KahaDBMessageStore.this.cursorPos = entry.getKey().longValue() + 1;
                        }
                    }
                });
            }
        }

        @Override // org.apache.activemq.store.MessageStore
        public void resetBatching() {
            this.cursorPos = 0L;
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public void setBatch(MessageId messageId) throws IOException {
            Long l;
            final String producerKey = messageId.toProducerKey();
            synchronized (TempKahaDBStore.this.indexMutex) {
                l = (Long) TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBMessageStore.5
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure
                    public Long execute(Transaction transaction) throws IOException {
                        return TempKahaDBStore.this.getStoredDestination(KahaDBMessageStore.this.dest, transaction).messageIdIndex.get(transaction, producerKey);
                    }
                });
            }
            if (l != null) {
                this.cursorPos = l.longValue() + 1;
            }
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
        public void setMemoryUsage(MemoryUsage memoryUsage) {
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.Service
        public void start() throws Exception {
        }

        @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.Service
        public void stop() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/activemq-kahadb-store-5.11.0.redhat-621216-01.jar:org/apache/activemq/store/kahadb/TempKahaDBStore$KahaDBTopicMessageStore.class */
    public class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
        public KahaDBTopicMessageStore(ActiveMQTopic activeMQTopic) {
            super(activeMQTopic);
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
            KahaRemoveMessageCommand kahaRemoveMessageCommand = new KahaRemoveMessageCommand();
            kahaRemoveMessageCommand.setDestination(this.dest);
            kahaRemoveMessageCommand.setSubscriptionKey(TempKahaDBStore.this.subscriptionKey(str, str2));
            kahaRemoveMessageCommand.setMessageId(messageId.toProducerKey());
            TempKahaDBStore.this.processRemove(kahaRemoveMessageCommand, null);
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
            String subscriptionKey = TempKahaDBStore.this.subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
            KahaSubscriptionCommand kahaSubscriptionCommand = new KahaSubscriptionCommand();
            kahaSubscriptionCommand.setDestination(this.dest);
            kahaSubscriptionCommand.setSubscriptionKey(subscriptionKey);
            kahaSubscriptionCommand.setRetroactive(z);
            ByteSequence marshal = TempKahaDBStore.this.wireFormat.marshal(subscriptionInfo);
            kahaSubscriptionCommand.setSubscriptionInfo(new Buffer(marshal.getData(), marshal.getOffset(), marshal.getLength()));
            TempKahaDBStore.this.process(kahaSubscriptionCommand);
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void deleteSubscription(String str, String str2) throws IOException {
            KahaSubscriptionCommand kahaSubscriptionCommand = new KahaSubscriptionCommand();
            kahaSubscriptionCommand.setDestination(this.dest);
            kahaSubscriptionCommand.setSubscriptionKey(TempKahaDBStore.this.subscriptionKey(str, str2));
            TempKahaDBStore.this.process(kahaSubscriptionCommand);
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
            final ArrayList arrayList = new ArrayList();
            synchronized (TempKahaDBStore.this.indexMutex) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBTopicMessageStore.1
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                    public void execute(Transaction transaction) throws IOException {
                        Iterator<Map.Entry<String, KahaSubscriptionCommand>> it = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction).subscriptions.iterator(transaction);
                        while (it.hasNext()) {
                            arrayList.add((SubscriptionInfo) TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(it.next().getValue().getSubscriptionInfo().newInput())));
                        }
                    }
                });
            }
            SubscriptionInfo[] subscriptionInfoArr = new SubscriptionInfo[arrayList.size()];
            arrayList.toArray(subscriptionInfoArr);
            return subscriptionInfoArr;
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
            SubscriptionInfo subscriptionInfo;
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(str, str2);
            synchronized (TempKahaDBStore.this.indexMutex) {
                subscriptionInfo = (SubscriptionInfo) TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBTopicMessageStore.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure
                    public SubscriptionInfo execute(Transaction transaction) throws IOException {
                        KahaSubscriptionCommand kahaSubscriptionCommand = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction).subscriptions.get(transaction, subscriptionKey);
                        if (kahaSubscriptionCommand == null) {
                            return null;
                        }
                        return (SubscriptionInfo) TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(kahaSubscriptionCommand.getSubscriptionInfo().newInput()));
                    }
                });
            }
            return subscriptionInfo;
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public int getMessageCount(String str, String str2) throws IOException {
            int intValue;
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(str, str2);
            synchronized (TempKahaDBStore.this.indexMutex) {
                intValue = ((Integer) TempKahaDBStore.this.pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBTopicMessageStore.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure
                    public Integer execute(Transaction transaction) throws IOException {
                        TempMessageDatabase.StoredDestination storedDestination = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction);
                        Long l = storedDestination.subscriptionAcks.get(transaction, subscriptionKey);
                        if (l == null) {
                            return 0;
                        }
                        int i = 0;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> it = storedDestination.orderIndex.iterator(transaction, Long.valueOf(l.longValue() + 1));
                        while (it.hasNext()) {
                            it.next();
                            i++;
                        }
                        return Integer.valueOf(i);
                    }
                })).intValue();
            }
            return intValue;
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void recoverSubscription(String str, String str2, final MessageRecoveryListener messageRecoveryListener) throws Exception {
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(str, str2);
            synchronized (TempKahaDBStore.this.indexMutex) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBTopicMessageStore.4
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        TempMessageDatabase.StoredDestination storedDestination = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction);
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> it = storedDestination.orderIndex.iterator(transaction, Long.valueOf(storedDestination.subscriptionAcks.get(transaction, subscriptionKey).longValue() + 1));
                        while (it.hasNext()) {
                            messageRecoveryListener.recoverMessage((Message) TempKahaDBStore.this.wireFormat.unmarshal(it.next().getValue().data));
                        }
                    }
                });
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void recoverNextMessages(String str, String str2, final int i, final MessageRecoveryListener messageRecoveryListener) throws Exception {
            final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(str, str2);
            synchronized (TempKahaDBStore.this.indexMutex) {
                TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<Exception>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBTopicMessageStore.5
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                    public void execute(Transaction transaction) throws Exception {
                        TempMessageDatabase.StoredDestination storedDestination = TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction);
                        Long l = storedDestination.subscriptionCursors.get(subscriptionKey);
                        if (l == null) {
                            l = Long.valueOf(storedDestination.subscriptionAcks.get(transaction, subscriptionKey).longValue() + 1);
                        }
                        Map.Entry<Long, TempMessageDatabase.MessageRecord> entry = null;
                        int i2 = 0;
                        Iterator<Map.Entry<Long, TempMessageDatabase.MessageRecord>> it = storedDestination.orderIndex.iterator(transaction, l);
                        while (it.hasNext()) {
                            entry = it.next();
                            messageRecoveryListener.recoverMessage((Message) TempKahaDBStore.this.wireFormat.unmarshal(entry.getValue().data));
                            i2++;
                            if (i2 >= i) {
                                break;
                            }
                        }
                        if (entry != null) {
                            storedDestination.subscriptionCursors.put(subscriptionKey, Long.valueOf(entry.getKey().longValue() + 1));
                        }
                    }
                });
            }
        }

        @Override // org.apache.activemq.store.TopicMessageStore
        public void resetBatching(String str, String str2) {
            try {
                final String subscriptionKey = TempKahaDBStore.this.subscriptionKey(str, str2);
                synchronized (TempKahaDBStore.this.indexMutex) {
                    TempKahaDBStore.this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.KahaDBTopicMessageStore.6
                        @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                        public void execute(Transaction transaction) throws IOException {
                            TempKahaDBStore.this.getStoredDestination(KahaDBTopicMessageStore.this.dest, transaction).subscriptionCursors.remove(subscriptionKey);
                        }
                    });
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setBrokerName(String str) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setUsageManager(SystemUsage systemUsage) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws IOException {
        return new TransactionStore() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.1
            @Override // org.apache.activemq.store.TransactionStore
            public void commit(TransactionId transactionId, boolean z, Runnable runnable, Runnable runnable2) throws IOException {
                if (runnable != null) {
                    runnable.run();
                }
                TempKahaDBStore.this.processCommit(transactionId);
                if (runnable2 != null) {
                    runnable2.run();
                }
            }

            @Override // org.apache.activemq.store.TransactionStore
            public void prepare(TransactionId transactionId) throws IOException {
                TempKahaDBStore.this.processPrepare(transactionId);
            }

            @Override // org.apache.activemq.store.TransactionStore
            public void rollback(TransactionId transactionId) throws IOException {
                TempKahaDBStore.this.processRollback(transactionId);
            }

            @Override // org.apache.activemq.store.TransactionStore
            public void recover(TransactionRecoveryListener transactionRecoveryListener) throws IOException {
                for (Map.Entry<TransactionId, ArrayList<TempMessageDatabase.Operation>> entry : TempKahaDBStore.this.preparedTransactions.entrySet()) {
                    XATransactionId xATransactionId = (XATransactionId) entry.getKey();
                    ArrayList arrayList = new ArrayList();
                    ArrayList arrayList2 = new ArrayList();
                    Iterator<TempMessageDatabase.Operation> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        TempMessageDatabase.Operation next = it.next();
                        if (next.getClass() == TempMessageDatabase.AddOpperation.class) {
                            arrayList.add((Message) TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(((TempMessageDatabase.AddOpperation) next).getCommand().getMessage().newInput())));
                        } else {
                            arrayList2.add((MessageAck) TempKahaDBStore.this.wireFormat.unmarshal(new DataInputStream(((TempMessageDatabase.RemoveOpperation) next).getCommand().getAck().newInput())));
                        }
                    }
                    Message[] messageArr = new Message[arrayList.size()];
                    MessageAck[] messageAckArr = new MessageAck[arrayList2.size()];
                    arrayList.toArray(messageArr);
                    arrayList2.toArray(messageAckArr);
                    transactionRecoveryListener.recover(xATransactionId, messageArr, messageAckArr);
                }
            }

            @Override // org.apache.activemq.Service
            public void start() throws Exception {
            }

            @Override // org.apache.activemq.Service
            public void stop() throws Exception {
            }
        };
    }

    String subscriptionKey(String str, String str2) {
        return str + ":" + str2;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public MessageStore createQueueMessageStore(ActiveMQQueue activeMQQueue) throws IOException {
        return new KahaDBMessageStore(activeMQQueue);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TopicMessageStore createTopicMessageStore(ActiveMQTopic activeMQTopic) throws IOException {
        return new KahaDBTopicMessageStore(activeMQTopic);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void removeQueueMessageStore(ActiveMQQueue activeMQQueue) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void removeTopicMessageStore(ActiveMQTopic activeMQTopic) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void deleteAllMessages() throws IOException {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public Set<ActiveMQDestination> getDestinations() {
        try {
            final HashSet hashSet = new HashSet();
            synchronized (this.indexMutex) {
                this.pageFile.tx().execute(new Transaction.Closure<IOException>() { // from class: org.apache.activemq.store.kahadb.TempKahaDBStore.2
                    @Override // org.apache.activemq.store.kahadb.disk.page.Transaction.Closure
                    public void execute(Transaction transaction) throws IOException {
                        Iterator<Map.Entry<String, TempMessageDatabase.StoredDestination>> it = TempKahaDBStore.this.destinations.iterator(transaction);
                        while (it.hasNext()) {
                            hashSet.add(TempKahaDBStore.this.convert(it.next().getKey()));
                        }
                    }
                });
            }
            return hashSet;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastMessageBrokerSequenceId() throws IOException {
        return 0L;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long size() {
        if (!this.started.get()) {
            return 0L;
        }
        try {
            return this.pageFile.getDiskSize();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void beginTransaction(ConnectionContext connectionContext) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void commitTransaction(ConnectionContext connectionContext) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void rollbackTransaction(ConnectionContext connectionContext) throws IOException {
        throw new IOException("Not yet implemented.");
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void checkpoint(boolean z) throws IOException {
    }

    KahaLocation convert(Location location) {
        KahaLocation kahaLocation = new KahaLocation();
        kahaLocation.setLogId(location.getDataFileId());
        kahaLocation.setOffset(location.getOffset());
        return kahaLocation;
    }

    KahaDestination convert(ActiveMQDestination activeMQDestination) {
        KahaDestination kahaDestination = new KahaDestination();
        kahaDestination.setName(activeMQDestination.getPhysicalName());
        switch (activeMQDestination.getDestinationType()) {
            case 1:
                kahaDestination.setType(KahaDestination.DestinationType.QUEUE);
                return kahaDestination;
            case 2:
                kahaDestination.setType(KahaDestination.DestinationType.TOPIC);
                return kahaDestination;
            case 3:
            case 4:
            default:
                return null;
            case 5:
                kahaDestination.setType(KahaDestination.DestinationType.TEMP_QUEUE);
                return kahaDestination;
            case 6:
                kahaDestination.setType(KahaDestination.DestinationType.TEMP_TOPIC);
                return kahaDestination;
        }
    }

    ActiveMQDestination convert(String str) {
        int indexOf = str.indexOf(":");
        if (indexOf < 0) {
            throw new IllegalArgumentException("Not in the valid destination format");
        }
        Integer.parseInt(str.substring(0, indexOf));
        String substring = str.substring(indexOf + 1);
        switch (KahaDestination.DestinationType.valueOf(r0)) {
            case QUEUE:
                return new ActiveMQQueue(substring);
            case TOPIC:
                return new ActiveMQTopic(substring);
            case TEMP_QUEUE:
                return new ActiveMQTempQueue(substring);
            case TEMP_TOPIC:
                return new ActiveMQTempTopic(substring);
            default:
                throw new IllegalArgumentException("Not in the valid destination format");
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastProducerSequenceId(ProducerId producerId) {
        return -1L;
    }

    @Override // org.apache.activemq.broker.BrokerServiceAware
    public void setBrokerService(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    @Override // org.apache.activemq.store.kahadb.TempMessageDatabase
    public void load() throws IOException {
        if (this.brokerService != null) {
            this.wireFormat.setVersion(this.brokerService.getStoreOpenWireVersion());
        }
        super.load();
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
        throw new UnsupportedOperationException();
    }
}
