package org.apache.activemq.store.memory;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630291.jar:org/apache/activemq/store/memory/MemoryTopicMessageStore.class */
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;

    public MemoryTopicMessageStore(ActiveMQDestination activeMQDestination) {
        this(activeMQDestination, new LRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
    }

    public MemoryTopicMessageStore(ActiveMQDestination activeMQDestination, Map<MessageId, Message> map, Map<SubscriptionKey, SubscriptionInfo> map2) {
        super(activeMQDestination, map);
        this.subscriberDatabase = map2;
        this.topicSubMap = makeSubMap();
    }

    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
        return Collections.synchronizedMap(new HashMap());
    }

    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
        return Collections.synchronizedMap(new HashMap());
    }

    @Override // org.apache.activemq.store.memory.MemoryMessageStore, org.apache.activemq.store.MessageStore
    public synchronized void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        super.addMessage(connectionContext, message);
        Iterator<MemoryTopicSub> it = this.topicSubMap.values().iterator();
        while (it.hasNext()) {
            it.next().addMessage(message.getMessageId(), message);
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.removeMessage(messageId);
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return this.subscriberDatabase.get(new SubscriptionKey(str, str2));
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void addSubscription(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
        SubscriptionKey subscriptionKey = new SubscriptionKey(subscriptionInfo);
        MemoryTopicSub memoryTopicSub = new MemoryTopicSub();
        this.topicSubMap.put(subscriptionKey, memoryTopicSub);
        if (z) {
            for (Map.Entry<MessageId, Message> entry : this.messageTable.entrySet()) {
                memoryTopicSub.addMessage(entry.getKey(), entry.getValue());
            }
        }
        this.subscriberDatabase.put(subscriptionKey, subscriptionInfo);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void deleteSubscription(String str, String str2) {
        SubscriptionKey subscriptionKey = new SubscriptionKey(str, str2);
        this.subscriberDatabase.remove(subscriptionKey);
        this.topicSubMap.remove(subscriptionKey);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.recoverSubscription(messageRecoveryListener);
        }
    }

    @Override // org.apache.activemq.store.memory.MemoryMessageStore
    public synchronized void delete() {
        super.delete();
        this.subscriberDatabase.clear();
        this.topicSubMap.clear();
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return (SubscriptionInfo[]) this.subscriberDatabase.values().toArray(new SubscriptionInfo[this.subscriberDatabase.size()]);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized int getMessageCount(String str, String str2) throws IOException {
        int i = 0;
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            i = memoryTopicSub.size();
        }
        return i;
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void recoverNextMessages(String str, String str2, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.recoverNextMessages(i, messageRecoveryListener);
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void resetBatching(String str, String str2) {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.resetBatching();
        }
    }
}
