package org.apache.activemq.transport.mqtt.strategy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.activemq.broker.region.QueueRegion;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTProtocolException;
import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
import org.apache.activemq.transport.mqtt.MQTTSubscription;
import org.apache.activemq.transport.mqtt.ResponseHandler;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.codec.CONNECT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.PropertiesBeanDefinitionReader;

/* loaded from: input_file:activemq-mqtt-5.11.0.redhat-630416-03.jar:org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.class */
public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
    private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic.";
    private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer.";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MQTTVirtualTopicSubscriptionStrategy.class);
    private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet());

    @Override // org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public void onConnect(CONNECT connect) throws MQTTProtocolException {
        List<ActiveMQQueue> lookupQueues = lookupQueues(this.protocol.getClientId());
        List<SubscriptionInfo> lookupSubscription = lookupSubscription(this.protocol.getClientId());
        if (connect.cleanSession()) {
            deleteDurableQueues(lookupQueues);
            deleteDurableSubs(lookupSubscription);
        } else {
            restoreDurableQueue(lookupQueues);
            restoreDurableSubs(lookupSubscription);
        }
    }

    @Override // org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public byte onSubscribe(String str, QoS qoS) throws MQTTProtocolException {
        ActiveMQDestination activeMQTopic;
        int i;
        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
        String convertMQTTToActiveMQ = MQTTProtocolSupport.convertMQTTToActiveMQ(str);
        if (this.protocol.isCleanSession() || this.protocol.getClientId() == null || qoS.ordinal() < QoS.AT_LEAST_ONCE.ordinal()) {
            if (!convertMQTTToActiveMQ.startsWith(VIRTUALTOPIC_PREFIX)) {
                convertMQTTToActiveMQ = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ;
            }
            activeMQTopic = new ActiveMQTopic(convertMQTTToActiveMQ);
            i = 32767;
        } else if (convertMQTTToActiveMQ.startsWith(VIRTUALTOPIC_PREFIX)) {
            activeMQTopic = new ActiveMQTopic(convertMQTTToActiveMQ);
            i = 100;
            consumerInfo.setSubscriptionName(qoS + ":" + str);
        } else {
            activeMQTopic = new ActiveMQQueue(VIRTUALTOPIC_CONSUMER_PREFIX + MQTTProtocolSupport.convertMQTTToActiveMQ(this.protocol.getClientId()) + ":" + qoS + "." + VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ);
            i = 1000;
        }
        consumerInfo.setDestination(activeMQTopic);
        if (this.protocol.getActiveMQSubscriptionPrefetch() > 0) {
            consumerInfo.setPrefetchSize(this.protocol.getActiveMQSubscriptionPrefetch());
        } else {
            consumerInfo.setPrefetchSize(i);
        }
        consumerInfo.setRetroactive(true);
        consumerInfo.setDispatchAsync(true);
        return doSubscribe(consumerInfo, str, qoS);
    }

    @Override // org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy, org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public void onReSubscribe(MQTTSubscription mQTTSubscription) throws MQTTProtocolException {
        ActiveMQDestination destination = mQTTSubscription.getDestination();
        if (destination.isQueue() && this.restoredQueues.remove(destination)) {
            return;
        }
        if (destination.isTopic() && this.restoredDurableSubs.remove(destination.getPhysicalName())) {
            return;
        }
        if (mQTTSubscription.getDestination().isTopic()) {
            super.onReSubscribe(mQTTSubscription);
            return;
        }
        doUnSubscribe(mQTTSubscription);
        ConsumerInfo consumerInfo = mQTTSubscription.getConsumerInfo();
        consumerInfo.setConsumerId(getNextConsumerId());
        doSubscribe(consumerInfo, mQTTSubscription.getTopicName(), mQTTSubscription.getQoS());
    }

    @Override // org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public void onUnSubscribe(String str) throws MQTTProtocolException {
        MQTTSubscription remove = this.mqttSubscriptionByTopic.remove(str);
        if (remove != null) {
            doUnSubscribe(remove);
            if (remove.getDestination().isQueue()) {
                DestinationInfo destinationInfo = new DestinationInfo();
                destinationInfo.setConnectionId(this.protocol.getConnectionId());
                destinationInfo.setDestination(remove.getDestination());
                destinationInfo.setOperationType((byte) 1);
                this.protocol.sendToActiveMQ(destinationInfo, new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.strategy.MQTTVirtualTopicSubscriptionStrategy.1
                    @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                    public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                    }
                });
                return;
            }
            if (remove.getConsumerInfo().getSubscriptionName() != null) {
                this.restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(remove.getTopicName()));
                RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
                removeSubscriptionInfo.setConnectionId(this.protocol.getConnectionId());
                removeSubscriptionInfo.setSubscriptionName(remove.getConsumerInfo().getSubscriptionName());
                removeSubscriptionInfo.setClientId(this.protocol.getClientId());
                this.protocol.sendToActiveMQ(removeSubscriptionInfo, new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.strategy.MQTTVirtualTopicSubscriptionStrategy.2
                    @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                    public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                    }
                });
            }
        }
    }

    @Override // org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy, org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public ActiveMQDestination onSend(String str) {
        return !str.startsWith(VIRTUALTOPIC_PREFIX) ? new ActiveMQTopic(VIRTUALTOPIC_PREFIX + str) : new ActiveMQTopic(str);
    }

    @Override // org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy, org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public String onSend(ActiveMQDestination activeMQDestination) {
        String physicalName = activeMQDestination.getPhysicalName();
        int indexOf = physicalName.indexOf(VIRTUALTOPIC_PREFIX);
        if (indexOf >= 0) {
            physicalName = physicalName.substring(indexOf + VIRTUALTOPIC_PREFIX.length()).substring(0);
        }
        return physicalName;
    }

    @Override // org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy, org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy
    public boolean isControlTopic(ActiveMQDestination activeMQDestination) {
        String physicalName = activeMQDestination.getPhysicalName();
        return physicalName.startsWith(PropertiesBeanDefinitionReader.CONSTRUCTOR_ARG_PREFIX) || physicalName.startsWith("VirtualTopic.$");
    }

    private void deleteDurableQueues(List<ActiveMQQueue> list) {
        try {
            for (ActiveMQQueue activeMQQueue : list) {
                LOG.debug("Removing queue subscription for {} ", activeMQQueue.getPhysicalName());
                DestinationInfo destinationInfo = new DestinationInfo();
                destinationInfo.setConnectionId(this.protocol.getConnectionId());
                destinationInfo.setDestination(activeMQQueue);
                destinationInfo.setOperationType((byte) 1);
                this.protocol.sendToActiveMQ(destinationInfo, new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.strategy.MQTTVirtualTopicSubscriptionStrategy.3
                    @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                    public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                    }
                });
            }
        } catch (Throwable th) {
            LOG.warn("Could not delete the MQTT queue subscriptions.", th);
        }
    }

    private void restoreDurableQueue(List<ActiveMQQueue> list) {
        try {
            for (ActiveMQQueue activeMQQueue : list) {
                StringTokenizer stringTokenizer = new StringTokenizer(activeMQQueue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length()));
                stringTokenizer.nextToken(":.");
                String nextToken = stringTokenizer.nextToken();
                stringTokenizer.nextToken();
                String convertActiveMQToMQTT = MQTTProtocolSupport.convertActiveMQToMQTT(stringTokenizer.nextToken("").substring(1));
                QoS valueOf = QoS.valueOf(nextToken);
                LOG.trace("Restoring queue subscription: {}:{}", convertActiveMQToMQTT, valueOf);
                ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
                consumerInfo.setDestination(activeMQQueue);
                consumerInfo.setPrefetchSize(1000);
                if (this.protocol.getActiveMQSubscriptionPrefetch() > 0) {
                    consumerInfo.setPrefetchSize(this.protocol.getActiveMQSubscriptionPrefetch());
                }
                consumerInfo.setRetroactive(true);
                consumerInfo.setDispatchAsync(true);
                doSubscribe(consumerInfo, convertActiveMQToMQTT, valueOf);
                this.restoredQueues.add(activeMQQueue);
            }
        } catch (IOException e) {
            LOG.warn("Could not restore the MQTT queue subscriptions.", (Throwable) e);
        }
    }

    List<ActiveMQQueue> lookupQueues(String str) throws MQTTProtocolException {
        ArrayList arrayList = new ArrayList();
        try {
            for (ActiveMQDestination activeMQDestination : ((QueueRegion) ((RegionBroker) this.brokerService.getBroker().getAdaptor(RegionBroker.class)).getQueueRegion()).getDestinationMap().keySet()) {
                if (activeMQDestination.isQueue() && !activeMQDestination.isTemporary() && activeMQDestination.getPhysicalName().startsWith(VIRTUALTOPIC_CONSUMER_PREFIX + str + ":")) {
                    LOG.debug("Recovered client sub: {} on connect", activeMQDestination.getPhysicalName());
                    arrayList.add((ActiveMQQueue) activeMQDestination);
                }
            }
            return arrayList;
        } catch (Exception e) {
            throw new MQTTProtocolException("Error recovering queues: " + e.getMessage(), false, e);
        }
    }
}
