package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;

/* loaded from: input_file:org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.class */
public class MQTTSubscriptionManager {
    private MQTTSession session;
    private SimpleString managementFilter;
    private ConcurrentMap<String, ServerConsumer> consumers = new ConcurrentHashMap();
    private ConcurrentMap<Long, Integer> consumerQoSLevels = new ConcurrentHashMap();

    public MQTTSubscriptionManager(MQTTSession mQTTSession) {
        this.session = mQTTSession;
        this.managementFilter = new SimpleString("NOT ((" + ((CharSequence) FilterConstants.ACTIVEMQ_ADDRESS) + " = '" + ((CharSequence) mQTTSession.getServer().getConfiguration().getManagementAddress()) + "') OR (" + ((CharSequence) FilterConstants.ACTIVEMQ_ADDRESS) + " = '" + ((CharSequence) mQTTSession.getServer().getConfiguration().getManagementNotificationAddress()) + "'))");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void start() throws Exception {
        for (MqttTopicSubscription mqttTopicSubscription : this.session.getSessionState().getSubscriptions()) {
            createConsumerForSubscriptionQueue(createQueueForSubscription(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService().value()), mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService().value());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stop(boolean z) throws Exception {
        for (ServerConsumer serverConsumer : this.consumers.values()) {
            serverConsumer.setStarted(false);
            serverConsumer.disconnect();
            serverConsumer.getQueue().removeConsumer(serverConsumer);
            serverConsumer.close(false);
        }
        if (z) {
            Iterator<ServerConsumer> it = this.consumers.values().iterator();
            while (it.hasNext()) {
                this.session.getServer().destroyQueue(it.next().getQueue().getName());
            }
        }
    }

    private Queue createQueueForSubscription(String str, int i) throws Exception {
        String convertMQTTAddressFilterToCore = MQTTUtil.convertMQTTAddressFilterToCore(str);
        SimpleString queueNameForTopic = getQueueNameForTopic(convertMQTTAddressFilterToCore);
        Queue locateQueue = this.session.getServer().locateQueue(queueNameForTopic);
        if (locateQueue == null) {
            locateQueue = this.session.getServerSession().createQueue(new SimpleString(convertMQTTAddressFilterToCore), queueNameForTopic, this.managementFilter, false, i >= 0);
        }
        return locateQueue;
    }

    private void createConsumerForSubscriptionQueue(Queue queue, String str, int i) throws Exception {
        long generateID = this.session.getServer().getStorageManager().generateID();
        ServerConsumer createConsumer = this.session.getServerSession().createConsumer(generateID, queue.getName(), (SimpleString) null, false, true, -1);
        createConsumer.setStarted(true);
        this.consumers.put(str, createConsumer);
        this.consumerQoSLevels.put(Long.valueOf(generateID), Integer.valueOf(i));
    }

    private void addSubscription(MqttTopicSubscription mqttTopicSubscription) throws Exception {
        MqttTopicSubscription subscription = this.session.getSessionState().getSubscription(mqttTopicSubscription.topicName());
        int value = mqttTopicSubscription.qualityOfService().value();
        String str = mqttTopicSubscription.topicName();
        this.session.getSessionState().addSubscription(mqttTopicSubscription);
        Queue createQueueForSubscription = createQueueForSubscription(str, value);
        if (subscription == null) {
            createConsumerForSubscriptionQueue(createQueueForSubscription, str, value);
        } else {
            this.consumerQoSLevels.put(Long.valueOf(this.consumers.get(str).getID()), Integer.valueOf(value));
        }
        this.session.getRetainMessageManager().addRetainedMessagesToQueue(createQueueForSubscription, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSubscriptions(List<String> list) throws Exception {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            removeSubscription(it.next());
        }
    }

    private synchronized void removeSubscription(String str) throws Exception {
        ServerConsumer serverConsumer = this.consumers.get(str);
        this.session.getServer().locateQueue(getQueueNameForTopic(MQTTUtil.convertMQTTAddressFilterToCore(str))).deleteQueue(true);
        this.session.getSessionState().removeSubscription(str);
        this.consumers.remove(str);
        this.consumerQoSLevels.remove(Long.valueOf(serverConsumer.getID()));
    }

    private SimpleString getQueueNameForTopic(String str) {
        return new SimpleString(this.session.getSessionState().getClientId() + "." + str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int[] addSubscriptions(List<MqttTopicSubscription> list) throws Exception {
        int[] iArr = new int[list.size()];
        for (int i = 0; i < list.size(); i++) {
            addSubscription(list.get(i));
            iArr[i] = list.get(i).qualityOfService().value();
        }
        return iArr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<Long, Integer> getConsumerQoSLevels() {
        return this.consumerQoSLevels;
    }
}
