package org.apache.activemq.store.kahadaptor;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;

/* loaded from: input_file:fuse-esb-7.0.1.fuse-SNAPSHOT/system/org/apache/activemq/activemq-core/5.5.1.fuse-70-079/activemq-core-5.5.1.fuse-70-079.jar:org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.class */
public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
    protected ListContainer<TopicSubAck> ackContainer;
    protected Map<Object, TopicSubContainer> subscriberMessages;
    private Map<String, SubscriptionInfo> subscriberContainer;
    private Store store;

    public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> mapContainer, ListContainer<TopicSubAck> listContainer, MapContainer<String, SubscriptionInfo> mapContainer2, ActiveMQDestination activeMQDestination) throws IOException {
        super(mapContainer, activeMQDestination);
        this.subscriberMessages = new ConcurrentHashMap();
        this.store = store;
        this.ackContainer = listContainer;
        this.subscriberContainer = mapContainer2;
        Iterator<String> it = this.subscriberContainer.keySet().iterator();
        while (it.hasNext()) {
            addSubscriberMessageContainer(it.next());
        }
    }

    @Override // org.apache.activemq.store.kahadaptor.KahaMessageStore, org.apache.activemq.store.MessageStore
    public synchronized void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        int size = this.subscriberMessages.size();
        if (size > 0) {
            MessageId messageId = message.getMessageId();
            StoreEntry place = this.messageContainer.place(messageId, message);
            TopicSubAck topicSubAck = new TopicSubAck();
            topicSubAck.setCount(size);
            topicSubAck.setMessageEntry(place);
            StoreEntry placeLast = this.ackContainer.placeLast(topicSubAck);
            for (TopicSubContainer topicSubContainer : this.subscriberMessages.values()) {
                ConsumerMessageRef consumerMessageRef = new ConsumerMessageRef();
                consumerMessageRef.setAckEntry(placeLast);
                consumerMessageRef.setMessageEntry(place);
                consumerMessageRef.setMessageId(messageId);
                topicSubContainer.add(consumerMessageRef);
            }
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
        TopicSubAck topicSubAck;
        TopicSubContainer topicSubContainer = this.subscriberMessages.get(getSubscriptionKey(str, str2));
        if (topicSubContainer != null) {
            ConsumerMessageRef remove = topicSubContainer.remove(messageId);
            if (topicSubContainer.isEmpty()) {
                topicSubContainer.reset();
            }
            if (remove == null || (topicSubAck = this.ackContainer.get(remove.getAckEntry())) == null) {
                return;
            }
            if (topicSubAck.decrementCount() > 0) {
                this.ackContainer.update(remove.getAckEntry(), topicSubAck);
            } else {
                this.ackContainer.remove(this.ackContainer.refresh(remove.getAckEntry()));
                this.messageContainer.remove(this.messageContainer.refresh(topicSubAck.getMessageEntry()));
            }
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return this.subscriberContainer.get(getSubscriptionKey(str, str2));
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void addSubsciption(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
        String subscriptionKey = getSubscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
        if (!this.subscriberContainer.containsKey(subscriptionKey)) {
            this.subscriberContainer.put(subscriptionKey, subscriptionInfo);
        }
        addSubscriberMessageContainer(subscriptionKey);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void deleteSubscription(String str, String str2) throws IOException {
        removeSubscriberMessageContainer(getSubscriptionKey(str, str2));
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        TopicSubContainer topicSubContainer = this.subscriberMessages.get(getSubscriptionKey(str, str2));
        if (topicSubContainer != null) {
            Iterator it = topicSubContainer.iterator();
            while (it.hasNext()) {
                Message message = this.messageContainer.get(((ConsumerMessageRef) it.next()).getMessageEntry());
                if (message != null && !recoverMessage(messageRecoveryListener, message)) {
                    return;
                }
            }
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void recoverNextMessages(String str, String str2, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        StoreEntry refreshEntry;
        TopicSubContainer topicSubContainer = this.subscriberMessages.get(getSubscriptionKey(str, str2));
        if (topicSubContainer != null) {
            int i2 = 0;
            StoreEntry batchEntry = topicSubContainer.getBatchEntry();
            if (batchEntry == null) {
                refreshEntry = topicSubContainer.getEntry();
            } else {
                refreshEntry = topicSubContainer.refreshEntry(batchEntry);
                if (refreshEntry != null) {
                    refreshEntry = topicSubContainer.getNextEntry(refreshEntry);
                }
            }
            if (refreshEntry == null) {
                return;
            }
            do {
                Message value = this.messageContainer.getValue(topicSubContainer.get(refreshEntry).getMessageEntry());
                if (value != null) {
                    recoverMessage(messageRecoveryListener, value);
                    i2++;
                    topicSubContainer.setBatchEntry(value.getMessageId().toString(), refreshEntry);
                } else {
                    topicSubContainer.reset();
                }
                refreshEntry = topicSubContainer.getNextEntry(refreshEntry);
                if (refreshEntry == null || i2 >= i) {
                    return;
                }
            } while (messageRecoveryListener.hasSpace());
        }
    }

    @Override // org.apache.activemq.store.kahadaptor.KahaMessageStore
    public synchronized void delete() {
        super.delete();
        this.ackContainer.clear();
        this.subscriberContainer.clear();
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return (SubscriptionInfo[]) this.subscriberContainer.values().toArray(new SubscriptionInfo[this.subscriberContainer.size()]);
    }

    protected String getSubscriptionKey(String str, String str2) {
        return (str + ":") + (str2 != null ? str2 : "NOT_SET");
    }

    protected MapContainer addSubscriberMessageContainer(Object obj) throws IOException {
        MapContainer mapContainer = this.store.getMapContainer(obj, "topic-subs");
        mapContainer.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
        mapContainer.setValueMarshaller(new ConsumerMessageRefMarshaller());
        this.subscriberMessages.put(obj, new TopicSubContainer(mapContainer));
        return mapContainer;
    }

    protected synchronized void removeSubscriberMessageContainer(Object obj) throws IOException {
        TopicSubAck topicSubAck;
        this.subscriberContainer.remove(obj);
        TopicSubContainer remove = this.subscriberMessages.remove(obj);
        if (remove != null) {
            Iterator it = remove.iterator();
            while (it.hasNext()) {
                ConsumerMessageRef consumerMessageRef = (ConsumerMessageRef) it.next();
                if (consumerMessageRef != null && (topicSubAck = this.ackContainer.get(consumerMessageRef.getAckEntry())) != null) {
                    if (topicSubAck.decrementCount() <= 0) {
                        this.ackContainer.remove(consumerMessageRef.getAckEntry());
                        this.messageContainer.remove(topicSubAck.getMessageEntry());
                    } else {
                        this.ackContainer.update(consumerMessageRef.getAckEntry(), topicSubAck);
                    }
                }
            }
            remove.clear();
        }
        this.store.deleteListContainer(obj, "topic-subs");
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized int getMessageCount(String str, String str2) throws IOException {
        TopicSubContainer topicSubContainer = this.subscriberMessages.get(getSubscriptionKey(str, str2));
        if (topicSubContainer != null) {
            return topicSubContainer.size();
        }
        return 0;
    }

    @Override // org.apache.activemq.store.kahadaptor.KahaMessageStore, org.apache.activemq.store.MessageStore
    public synchronized void removeAllMessages(ConnectionContext connectionContext) throws IOException {
        this.messageContainer.clear();
        this.ackContainer.clear();
        Iterator<TopicSubContainer> it = this.subscriberMessages.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void resetBatching(String str, String str2) {
        TopicSubContainer topicSubContainer = this.subscriberMessages.get(getSubscriptionKey(str, str2));
        if (topicSubContainer != null) {
            topicSubContainer.reset();
        }
    }
}
