package org.apache.activemq.store.kahadaptor;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
import org.apache.activemq.kaha.impl.data.DataManagerImpl;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-core-5.5.0-fuse-00-39.jar:org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.class */
public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
    private static final int STORE_LOCKED_WAIT_DELAY = 10000;
    private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class);
    private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
    protected OpenWireFormat wireFormat;
    protected KahaTransactionStore transactionStore;
    protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics;
    protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues;
    protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores;
    private long maxDataFileLength;
    private File directory;
    private String brokerName;
    private Store theStore;
    private boolean initialized;
    private final AtomicLong storeSize;
    private boolean persistentIndex;
    private BrokerService brokerService;

    public KahaPersistenceAdapter(AtomicLong atomicLong) {
        this.wireFormat = new OpenWireFormat();
        this.topics = new ConcurrentHashMap<>();
        this.queues = new ConcurrentHashMap<>();
        this.messageStores = new ConcurrentHashMap<>();
        this.maxDataFileLength = DataManagerImpl.MAX_FILE_LENGTH;
        this.persistentIndex = true;
        this.storeSize = atomicLong;
    }

    public KahaPersistenceAdapter() {
        this(new AtomicLong());
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public Set<ActiveMQDestination> getDestinations() {
        HashSet hashSet = new HashSet();
        try {
            Iterator<ContainerId> it = getStore().getMapContainerIds().iterator();
            while (it.hasNext()) {
                Object key = it.next().getKey();
                if (key instanceof ActiveMQDestination) {
                    hashSet.add((ActiveMQDestination) key);
                }
            }
        } catch (IOException e) {
            LOG.error("Failed to get destinations ", (Throwable) e);
        }
        return hashSet;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue activeMQQueue) throws IOException {
        MessageStore messageStore = this.queues.get(activeMQQueue);
        if (messageStore == null) {
            messageStore = new KahaMessageStore(getMapContainer(activeMQQueue, "queue-data"), activeMQQueue);
            this.messageStores.put(activeMQQueue, messageStore);
            if (this.transactionStore != null) {
                messageStore = this.transactionStore.proxy(messageStore);
            }
            this.queues.put(activeMQQueue, messageStore);
        }
        return messageStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic activeMQTopic) throws IOException {
        TopicMessageStore topicMessageStore = this.topics.get(activeMQTopic);
        if (topicMessageStore == null) {
            Store store = getStore();
            MapContainer<MessageId, Message> mapContainer = getMapContainer(activeMQTopic, "topic-data");
            MapContainer subsMapContainer = getSubsMapContainer(activeMQTopic.toString() + "-Subscriptions", "topic-subs");
            ListContainer listContainer = store.getListContainer(activeMQTopic.toString(), "topic-acks");
            listContainer.setMarshaller(new TopicSubAckMarshaller());
            topicMessageStore = new KahaTopicMessageStore(store, mapContainer, listContainer, subsMapContainer, activeMQTopic);
            this.messageStores.put(activeMQTopic, topicMessageStore);
            if (this.transactionStore != null) {
                topicMessageStore = this.transactionStore.proxy(topicMessageStore);
            }
            this.topics.put(activeMQTopic, topicMessageStore);
        }
        return topicMessageStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void removeQueueMessageStore(ActiveMQQueue activeMQQueue) {
        this.queues.remove(activeMQQueue);
        try {
            if (this.theStore != null) {
                this.theStore.deleteMapContainer(activeMQQueue, "queue-data");
            }
        } catch (IOException e) {
            LOG.error("Failed to remove store map container for queue:" + activeMQQueue, (Throwable) e);
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void removeTopicMessageStore(ActiveMQTopic activeMQTopic) {
        this.topics.remove(activeMQTopic);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageStore retrieveMessageStore(Object obj) {
        return this.messageStores.get(obj);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws IOException {
        if (this.transactionStore == null) {
            while (true) {
                try {
                    MapContainer mapContainer = getStore().getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
                    mapContainer.setKeyMarshaller(new CommandMarshaller(this.wireFormat));
                    mapContainer.setValueMarshaller(new TransactionMarshaller(this.wireFormat));
                    mapContainer.load();
                    this.transactionStore = new KahaTransactionStore(this, mapContainer);
                    this.transactionStore.setBrokerService(this.brokerService);
                    break;
                } catch (StoreLockedExcpetion e) {
                    LOG.info("Store is locked... waiting 10 seconds for the Store to be unlocked.");
                    try {
                        Thread.sleep(StompConnection.RECEIVE_TIMEOUT);
                    } catch (InterruptedException e2) {
                    }
                }
            }
        }
        return this.transactionStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void beginTransaction(ConnectionContext connectionContext) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void commitTransaction(ConnectionContext connectionContext) throws IOException {
        if (this.theStore != null) {
            this.theStore.force();
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void rollbackTransaction(ConnectionContext connectionContext) {
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        initialize();
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        if (this.theStore != null) {
            this.theStore.close();
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastMessageBrokerSequenceId() throws IOException {
        return 0L;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void deleteAllMessages() throws IOException {
        if (this.theStore == null) {
            StoreFactory.delete(getStoreDirectory());
        } else if (this.theStore.isInitialized()) {
            this.theStore.clear();
        } else {
            this.theStore.delete();
        }
    }

    protected MapContainer<MessageId, Message> getMapContainer(Object obj, String str) throws IOException {
        MapContainer<MessageId, Message> mapContainer = getStore().getMapContainer(obj, str);
        mapContainer.setKeyMarshaller(new MessageIdMarshaller());
        mapContainer.setValueMarshaller(new MessageMarshaller(this.wireFormat));
        mapContainer.load();
        return mapContainer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MapContainer getSubsMapContainer(Object obj, String str) throws IOException {
        MapContainer mapContainer = getStore().getMapContainer(obj, str);
        mapContainer.setKeyMarshaller(Store.STRING_MARSHALLER);
        mapContainer.setValueMarshaller(createMessageMarshaller());
        mapContainer.load();
        return mapContainer;
    }

    protected Marshaller<Object> createMessageMarshaller() {
        return new CommandMarshaller(this.wireFormat);
    }

    protected ListContainer<TopicSubAck> getListContainer(Object obj, String str) throws IOException {
        ListContainer<TopicSubAck> listContainer = getStore().getListContainer(obj, str);
        listContainer.setMarshaller(createMessageMarshaller());
        listContainer.load();
        return listContainer;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setUsageManager(SystemUsage systemUsage) {
    }

    public long getMaxDataFileLength() {
        return this.maxDataFileLength;
    }

    public boolean isPersistentIndex() {
        return this.persistentIndex;
    }

    public void setPersistentIndex(boolean z) {
        this.persistentIndex = z;
    }

    public void setMaxDataFileLength(long j) {
        this.maxDataFileLength = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final synchronized Store getStore() throws IOException {
        if (this.theStore == null) {
            this.theStore = createStore();
        }
        return this.theStore;
    }

    protected final Store createStore() throws IOException {
        Store open = StoreFactory.open(getStoreDirectory(), "rw", this.storeSize);
        open.setMaxDataFileLength(this.maxDataFileLength);
        open.setPersistentIndex(isPersistentIndex());
        open.setDefaultContainerName("container-roots");
        return open;
    }

    private String getStoreName() {
        initialize();
        return this.directory.getAbsolutePath();
    }

    private File getStoreDirectory() {
        initialize();
        return this.directory;
    }

    public String toString() {
        return "KahaPersistenceAdapter(" + getStoreName() + ")";
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setBrokerName(String str) {
        this.brokerName = str;
    }

    public String getBrokerName() {
        return this.brokerName;
    }

    public File getDirectory() {
        return this.directory;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setDirectory(File file) {
        this.directory = file;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void checkpoint(boolean z) throws IOException {
        if (z) {
            getStore().force();
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long size() {
        return this.storeSize.get();
    }

    private void initialize() {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        if (this.directory == null) {
            setDirectory(new File(new File(IOHelper.getDefaultDataDirectory()), IOHelper.toFileSystemSafeName(this.brokerName) + "-kahastore"));
        }
        try {
            IOHelper.mkdirs(this.directory);
            this.wireFormat.setCacheEnabled(false);
            this.wireFormat.setTightEncodingEnabled(true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.broker.BrokerServiceAware
    public void setBrokerService(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastProducerSequenceId(ProducerId producerId) {
        return -1L;
    }
}
