package org.apache.activemq.transport.mqtt;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.security.auth.login.CredentialException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-mqtt-5.11.0.redhat-630283-10.jar:org/apache/activemq/transport/mqtt/MQTTProtocolConverter.class */
public class MQTTProtocolConverter {
    public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
    public static final int V3_1 = 3;
    public static final int V3_1_1 = 4;
    public static final String SINGLE_LEVEL_WILDCARD = "+";
    public static final String MULTI_LEVEL_WILDCARD = "#";
    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5d;
    static final int DEFAULT_CACHE_SIZE = 5000;
    private final MQTTTransport mqttTransport;
    private final BrokerService brokerService;
    private int lastCommandId;
    private CONNECT connect;
    private String clientId;
    private final MQTTPacketIdGenerator packetIdGenerator;
    private boolean publishDollarTopics;
    public int version;
    private MQTTSubscriptionStrategy subsciptionStrategy;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MQTTProtocolConverter.class);
    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
    private final SessionId sessionId = new SessionId(this.connectionId, -1);
    private final ProducerId producerId = new ProducerId(this.sessionId, 1);
    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap();
    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache(5000);
    private final Map<Destination, String> mqttTopicMap = new LRUCache(5000);
    private final Map<Short, MessageAck> consumerAcks = new LRUCache(5000);
    private final Map<Short, PUBREC> publisherRecs = new LRUCache(5000);
    private final Object commnadIdMutex = new Object();
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final ConnectionInfo connectionInfo = new ConnectionInfo();
    private int activeMQSubscriptionPrefetch = -1;
    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
    private String subscriptionStrategyName = "mqtt-default-subscriptions";
    boolean willSent = false;
    private long defaultKeepAlive = 0;

    public MQTTProtocolConverter(MQTTTransport mQTTTransport, BrokerService brokerService) {
        this.mqttTransport = mQTTTransport;
        this.brokerService = brokerService;
        this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
    }

    int generateCommandId() {
        int i;
        synchronized (this.commnadIdMutex) {
            i = this.lastCommandId;
            this.lastCommandId = i + 1;
        }
        return i;
    }

    public void sendToActiveMQ(Command command, ResponseHandler responseHandler) {
        if (command instanceof ActiveMQMessage) {
            ActiveMQMessage activeMQMessage = (ActiveMQMessage) command;
            try {
                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(activeMQMessage.getDestination())) {
                    if (responseHandler != null) {
                        try {
                            responseHandler.onResponse(this, new Response());
                            return;
                        } catch (IOException e) {
                            e.printStackTrace();
                            return;
                        }
                    }
                    return;
                }
            } catch (IOException e2) {
                e2.printStackTrace();
            }
        }
        command.setCommandId(generateCommandId());
        if (responseHandler != null) {
            command.setResponseRequired(true);
            this.resposeHandlers.put(Integer.valueOf(command.getCommandId()), responseHandler);
        }
        getMQTTTransport().sendToActiveMQ(command);
    }

    void sendToMQTT(MQTTFrame mQTTFrame) {
        try {
            this.mqttTransport.sendToMQTT(mQTTFrame);
        } catch (IOException e) {
            LOG.warn("Failed to send frame " + mQTTFrame, (Throwable) e);
        }
    }

    public void onMQTTCommand(MQTTFrame mQTTFrame) throws IOException, JMSException {
        switch (mQTTFrame.messageType()) {
            case 1:
                CONNECT decode = new CONNECT().decode(mQTTFrame);
                onMQTTConnect(decode);
                LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), Integer.valueOf(decode.version()));
                return;
            case 2:
            case 9:
            case 11:
            case 13:
            default:
                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + ((int) mQTTFrame.messageType()), true), mQTTFrame);
                return;
            case 3:
                onMQTTPublish(new PUBLISH().decode(mQTTFrame));
                return;
            case 4:
                onMQTTPubAck(new PUBACK().decode(mQTTFrame));
                return;
            case 5:
                onMQTTPubRec(new PUBREC().decode(mQTTFrame));
                return;
            case 6:
                onMQTTPubRel(new PUBREL().decode(mQTTFrame));
                return;
            case 7:
                onMQTTPubComp(new PUBCOMP().decode(mQTTFrame));
                return;
            case 8:
                onSubscribe(new SUBSCRIBE().decode(mQTTFrame));
                return;
            case 10:
                onUnSubscribe(new UNSUBSCRIBE().decode(mQTTFrame));
                return;
            case 12:
                LOG.debug("Received a ping from client: " + getClientId());
                sendToMQTT(PING_RESP_FRAME);
                LOG.debug("Sent Ping Response to " + getClientId());
                return;
            case 14:
                LOG.debug("MQTT Client {} disconnecting", getClientId());
                onMQTTDisconnect();
                return;
        }
    }

    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
        if (this.connected.get()) {
            throw new MQTTProtocolException("Already connected.");
        }
        this.connect = connect;
        String uTF8Buffer = connect.clientId() != null ? connect.clientId().toString() : "";
        String str = null;
        if (connect.userName() != null) {
            str = connect.userName().toString();
        }
        String str2 = null;
        if (connect.password() != null) {
            str2 = connect.password().toString();
        }
        this.version = connect.version();
        configureInactivityMonitor(connect.keepAlive());
        this.connectionInfo.setConnectionId(this.connectionId);
        if (uTF8Buffer != null && !uTF8Buffer.isEmpty()) {
            this.connectionInfo.setClientId(uTF8Buffer);
        } else {
            if (!connect.cleanSession()) {
                CONNACK connack = new CONNACK();
                connack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                try {
                    getMQTTTransport().sendToMQTT(connack.encode());
                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", (Exception) null));
                    return;
                } catch (IOException e) {
                    getMQTTTransport().onException(IOExceptionSupport.create((Exception) e));
                    return;
                }
            }
            this.connectionInfo.setClientId("" + this.connectionInfo.getConnectionId().toString());
        }
        this.connectionInfo.setResponseRequired(true);
        this.connectionInfo.setUserName(str);
        this.connectionInfo.setPassword(str2);
        this.connectionInfo.setTransportContext(this.mqttTransport.getPeerCertificates());
        sendToActiveMQ(this.connectionInfo, new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.1
            @Override // org.apache.activemq.transport.mqtt.ResponseHandler
            public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                if (!response.isException()) {
                    MQTTProtocolConverter.this.sendToActiveMQ(new SessionInfo(MQTTProtocolConverter.this.sessionId), null);
                    MQTTProtocolConverter.this.sendToActiveMQ(new ProducerInfo(MQTTProtocolConverter.this.producerId), new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.1.1
                        @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                        public void onResponse(MQTTProtocolConverter mQTTProtocolConverter2, Response response2) throws IOException {
                            if (response2.isException()) {
                                Throwable exception = ((ExceptionResponse) response2).getException();
                                CONNACK connack2 = new CONNACK();
                                connack2.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                                MQTTProtocolConverter.this.getMQTTTransport().sendToMQTT(connack2.encode());
                                MQTTProtocolConverter.this.getMQTTTransport().onException(IOExceptionSupport.create(exception));
                                return;
                            }
                            CONNACK connack3 = new CONNACK();
                            connack3.code(CONNACK.Code.CONNECTION_ACCEPTED);
                            MQTTProtocolConverter.this.connected.set(true);
                            MQTTProtocolConverter.this.getMQTTTransport().sendToMQTT(connack3.encode());
                            if (connect.cleanSession()) {
                                MQTTProtocolConverter.this.packetIdGenerator.stopClientSession(MQTTProtocolConverter.this.getClientId());
                            } else {
                                MQTTProtocolConverter.this.packetIdGenerator.startClientSession(MQTTProtocolConverter.this.getClientId());
                            }
                            MQTTProtocolConverter.this.findSubscriptionStrategy().onConnect(connect);
                        }
                    });
                    return;
                }
                Throwable exception = ((ExceptionResponse) response).getException();
                CONNACK connack2 = new CONNACK();
                if (exception instanceof InvalidClientIDException) {
                    connack2.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                } else if (exception instanceof SecurityException) {
                    connack2.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
                } else if (exception instanceof CredentialException) {
                    connack2.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                } else {
                    connack2.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                }
                MQTTProtocolConverter.this.getMQTTTransport().sendToMQTT(connack2.encode());
                MQTTProtocolConverter.this.getMQTTTransport().onException(IOExceptionSupport.create(exception));
            }
        });
    }

    void onMQTTDisconnect() throws MQTTProtocolException {
        if (this.connected.get()) {
            this.connected.set(false);
            sendToActiveMQ(this.connectionInfo.createRemoveCommand(), null);
            sendToActiveMQ(new ShutdownInfo(), null);
        }
        stopTransport();
    }

    void onSubscribe(SUBSCRIBE subscribe) throws MQTTProtocolException {
        checkConnected();
        LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", Short.valueOf(subscribe.messageId()), this.clientId, this.connectionInfo.getConnectionId());
        Topic[] topicArr = subscribe.topics();
        if (topicArr == null) {
            LOG.warn("No topics defined for Subscription " + subscribe);
            return;
        }
        byte[] bArr = new byte[topicArr.length];
        for (int i = 0; i < topicArr.length; i++) {
            try {
                bArr[i] = findSubscriptionStrategy().onSubscribe(topicArr[i]);
            } catch (IOException e) {
                throw new MQTTProtocolException("Failed to process subscription request", true, e);
            }
        }
        SUBACK suback = new SUBACK();
        suback.messageId(subscribe.messageId());
        suback.grantedQos(bArr);
        try {
            getMQTTTransport().sendToMQTT(suback.encode());
        } catch (IOException e2) {
            LOG.warn("Couldn't send SUBACK for " + subscribe, (Throwable) e2);
        }
    }

    public void onUnSubscribe(UNSUBSCRIBE unsubscribe) throws MQTTProtocolException {
        checkConnected();
        if (unsubscribe.qos() != QoS.AT_LEAST_ONCE && (this.version != 3 || !this.publishDollarTopics)) {
            throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS"));
        }
        UTF8Buffer[] uTF8BufferArr = unsubscribe.topics();
        if (uTF8BufferArr != null) {
            for (UTF8Buffer uTF8Buffer : uTF8BufferArr) {
                try {
                    findSubscriptionStrategy().onUnSubscribe(uTF8Buffer.toString());
                } catch (IOException e) {
                    throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
                }
            }
        }
        UNSUBACK unsuback = new UNSUBACK();
        unsuback.messageId(unsubscribe.messageId());
        sendToMQTT(unsuback.encode());
    }

    public void onActiveMQCommand(Command command) throws Exception {
        if (command.isResponse()) {
            Response response = (Response) command;
            ResponseHandler remove = this.resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
            if (remove != null) {
                remove.onResponse(this, response);
                return;
            } else {
                if (response.isException()) {
                    handleException(((ExceptionResponse) response).getException(), null);
                    return;
                }
                return;
            }
        }
        if (!command.isMessageDispatch()) {
            if (command.getDataStructureType() == 16) {
                handleException(((ConnectionError) command).getException(), null);
                return;
            } else {
                if (command.isBrokerInfo()) {
                    return;
                }
                LOG.debug("Do not know how to process ActiveMQ Command {}", command);
                return;
            }
        }
        MessageDispatch messageDispatch = (MessageDispatch) command;
        MQTTSubscription subscription = findSubscriptionStrategy().getSubscription(messageDispatch.getConsumerId());
        if (subscription != null) {
            MessageAck createMessageAck = subscription.createMessageAck(messageDispatch);
            PUBLISH createPublish = subscription.createPublish((ActiveMQMessage) messageDispatch.getMessage());
            switch (createPublish.qos()) {
                case AT_LEAST_ONCE:
                case EXACTLY_ONCE:
                    createPublish.mo1077dup(createPublish.dup() ? true : messageDispatch.getMessage().isRedelivered());
                    break;
            }
            if (createMessageAck != null && subscription.expectAck(createPublish)) {
                synchronized (this.consumerAcks) {
                    this.consumerAcks.put(Short.valueOf(createPublish.messageId()), createMessageAck);
                }
            }
            LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", Short.valueOf(createPublish.messageId()), this.clientId, this.connectionInfo.getConnectionId());
            getMQTTTransport().sendToMQTT(createPublish.encode());
            if (createMessageAck == null || subscription.expectAck(createPublish)) {
                return;
            }
            getMQTTTransport().sendToActiveMQ(createMessageAck);
        }
    }

    void onMQTTPublish(PUBLISH publish) throws IOException, JMSException {
        checkConnected();
        LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", Short.valueOf(publish.messageId()), this.clientId, this.connectionInfo.getConnectionId());
        if (containsMqttWildcard(publish.topicName().toString())) {
            getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", (Exception) null));
            return;
        }
        ActiveMQMessage convertMessage = convertMessage(publish);
        convertMessage.setProducerId(this.producerId);
        convertMessage.onSend();
        sendToActiveMQ(convertMessage, createResponseHandler(publish));
    }

    void onMQTTPubAck(PUBACK puback) {
        MessageAck remove;
        short messageId = puback.messageId();
        LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", Short.valueOf(messageId), this.clientId, this.connectionInfo.getConnectionId());
        this.packetIdGenerator.ackPacketId(getClientId(), messageId);
        synchronized (this.consumerAcks) {
            remove = this.consumerAcks.remove(Short.valueOf(messageId));
        }
        if (remove != null) {
            getMQTTTransport().sendToActiveMQ(remove);
        }
    }

    void onMQTTPubRec(PUBREC pubrec) {
        PUBREL pubrel = new PUBREL();
        pubrel.messageId(pubrec.messageId());
        sendToMQTT(pubrel.encode());
    }

    void onMQTTPubRel(PUBREL pubrel) {
        PUBREC remove;
        synchronized (this.publisherRecs) {
            remove = this.publisherRecs.remove(Short.valueOf(pubrel.messageId()));
        }
        if (remove == null) {
            LOG.warn("Unknown PUBREL: {} received", Short.valueOf(pubrel.messageId()));
        }
        PUBCOMP pubcomp = new PUBCOMP();
        pubcomp.messageId(pubrel.messageId());
        sendToMQTT(pubcomp.encode());
    }

    void onMQTTPubComp(PUBCOMP pubcomp) {
        MessageAck remove;
        short messageId = pubcomp.messageId();
        this.packetIdGenerator.ackPacketId(getClientId(), messageId);
        synchronized (this.consumerAcks) {
            remove = this.consumerAcks.remove(Short.valueOf(messageId));
        }
        if (remove != null) {
            getMQTTTransport().sendToActiveMQ(remove);
        }
    }

    ActiveMQMessage convertMessage(PUBLISH publish) throws JMSException {
        ActiveMQDestination activeMQDestination;
        ActiveMQBytesMessage activeMQBytesMessage = new ActiveMQBytesMessage();
        activeMQBytesMessage.setProducerId(this.producerId);
        activeMQBytesMessage.setMessageId(new MessageId(this.producerId, this.publisherIdGenerator.getNextSequenceId()));
        LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", Short.valueOf(publish.messageId()), this.clientId, this.connectionInfo.getConnectionId(), activeMQBytesMessage.getMessageId());
        activeMQBytesMessage.setTimestamp(System.currentTimeMillis());
        activeMQBytesMessage.setPriority((byte) 4);
        activeMQBytesMessage.setPersistent((publish.qos() == QoS.AT_MOST_ONCE || publish.retain()) ? false : true);
        activeMQBytesMessage.setIntProperty(QOS_PROPERTY_NAME, publish.qos().ordinal());
        if (publish.retain()) {
            activeMQBytesMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
        }
        synchronized (this.activeMQDestinationMap) {
            activeMQDestination = this.activeMQDestinationMap.get(publish.topicName());
            if (activeMQDestination == null) {
                try {
                    activeMQDestination = findSubscriptionStrategy().onSend(MQTTProtocolSupport.convertMQTTToActiveMQ(publish.topicName().toString()));
                    this.activeMQDestinationMap.put(publish.topicName().toString(), activeMQDestination);
                } catch (IOException e) {
                    throw JMSExceptionSupport.create((Exception) e);
                }
            }
        }
        activeMQBytesMessage.setJMSDestination(activeMQDestination);
        activeMQBytesMessage.writeBytes(publish.payload().data, publish.payload().offset, publish.payload().length);
        return activeMQBytesMessage;
    }

    public PUBLISH convertMessage(ActiveMQMessage activeMQMessage) throws IOException, JMSException, DataFormatException {
        QoS qoS;
        String str;
        PUBLISH publish = new PUBLISH();
        if (activeMQMessage.propertyExists(QOS_PROPERTY_NAME)) {
            qoS = QoS.values()[activeMQMessage.getIntProperty(QOS_PROPERTY_NAME)];
        } else {
            qoS = activeMQMessage.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
        }
        publish.qos(qoS);
        if (activeMQMessage.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
            publish.retain(true);
        }
        synchronized (this.mqttTopicMap) {
            str = this.mqttTopicMap.get(activeMQMessage.getJMSDestination());
            if (str == null) {
                str = MQTTProtocolSupport.convertActiveMQToMQTT(findSubscriptionStrategy().onSend(activeMQMessage.getDestination()));
                this.mqttTopicMap.put(activeMQMessage.getJMSDestination(), str);
            }
        }
        publish.topicName(new UTF8Buffer(str));
        if (activeMQMessage.getDataStructureType() == 28) {
            ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) activeMQMessage.copy();
            activeMQTextMessage.setReadOnlyBody(true);
            String text = activeMQTextMessage.getText();
            if (text != null) {
                publish.payload(new Buffer(text.getBytes("UTF-8")));
            }
        } else if (activeMQMessage.getDataStructureType() == 24) {
            ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) activeMQMessage.copy();
            activeMQBytesMessage.setReadOnlyBody(true);
            byte[] bArr = new byte[(int) activeMQBytesMessage.getBodyLength()];
            activeMQBytesMessage.readBytes(bArr);
            publish.payload(new Buffer(bArr));
        } else if (activeMQMessage.getDataStructureType() == 25) {
            ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) activeMQMessage.copy();
            activeMQMapMessage.setReadOnlyBody(true);
            Map<String, Object> contentMap = activeMQMapMessage.getContentMap();
            if (contentMap != null) {
                publish.payload(new Buffer(contentMap.toString().getBytes("UTF-8")));
            }
        } else {
            ByteSequence content = activeMQMessage.getContent();
            if (content != null && content.getLength() > 0) {
                if (activeMQMessage.isCompressed()) {
                    Inflater inflater = new Inflater();
                    inflater.setInput(content.data, content.offset, content.length);
                    byte[] bArr2 = new byte[4096];
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    while (true) {
                        int inflate = inflater.inflate(bArr2);
                        if (inflate == 0) {
                            break;
                        }
                        byteArrayOutputStream.write(bArr2, 0, inflate);
                    }
                    content = byteArrayOutputStream.toByteSequence();
                    byteArrayOutputStream.close();
                }
                publish.payload(new Buffer(content.data, content.offset, content.length));
            }
        }
        LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", Short.valueOf(publish.messageId()), this.clientId, this.connectionInfo.getConnectionId(), activeMQMessage.getMessageId());
        return publish;
    }

    public MQTTTransport getMQTTTransport() {
        return this.mqttTransport;
    }

    public void onTransportError() {
        if (this.connect == null || !this.connected.get()) {
            return;
        }
        if (this.connect.willTopic() != null && this.connect.willMessage() != null && !this.willSent) {
            this.willSent = true;
            try {
                PUBLISH publish = new PUBLISH();
                publish.topicName(this.connect.willTopic());
                publish.qos(this.connect.willQos());
                publish.messageId(this.packetIdGenerator.getNextSequenceId(getClientId()));
                publish.payload(this.connect.willMessage());
                publish.retain(this.connect.willRetain());
                ActiveMQMessage convertMessage = convertMessage(publish);
                convertMessage.setProducerId(this.producerId);
                convertMessage.onSend();
                sendToActiveMQ(convertMessage, null);
            } catch (Exception e) {
                LOG.warn("Failed to publish Will Message " + this.connect.willMessage());
            }
        }
        sendToActiveMQ(this.connectionInfo.createRemoveCommand(), null);
    }

    void configureInactivityMonitor(short s) {
        MQTTInactivityMonitor inactivityMonitor = getMQTTTransport().getInactivityMonitor();
        if (inactivityMonitor == null) {
            return;
        }
        inactivityMonitor.stopConnectChecker();
        long j = s * 1000;
        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), Long.valueOf(j));
        if (j == 0) {
            try {
                if (this.defaultKeepAlive > 0) {
                    j = this.defaultKeepAlive;
                }
            } catch (Exception e) {
                LOG.warn("Failed to start MQTT InactivityMonitor ", (Throwable) e);
                return;
            }
        }
        long j2 = (long) (j * MQTT_KEEP_ALIVE_GRACE_PERIOD);
        inactivityMonitor.setProtocolConverter(this);
        inactivityMonitor.setReadKeepAliveTime(j);
        inactivityMonitor.setReadGraceTime(j2);
        inactivityMonitor.startReadChecker();
        LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)", getClientId(), Long.valueOf(j), Long.valueOf(j), Long.valueOf(j2));
    }

    void handleException(Throwable th, MQTTFrame mQTTFrame) {
        LOG.warn("Exception occurred processing: \n" + mQTTFrame + ": " + th.toString());
        LOG.debug("Exception detail", th);
        if (this.connected.get() && this.connectionInfo != null) {
            this.connected.set(false);
            sendToActiveMQ(this.connectionInfo.createRemoveCommand(), null);
        }
        stopTransport();
    }

    void checkConnected() throws MQTTProtocolException {
        if (!this.connected.get()) {
            throw new MQTTProtocolException("Not connected.");
        }
    }

    private void stopTransport() {
        try {
            getMQTTTransport().stop();
        } catch (Throwable th) {
            LOG.debug("Failed to stop MQTT transport ", th);
        }
    }

    ResponseHandler createResponseHandler(final PUBLISH publish) {
        if (publish != null) {
            return new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.2
                @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                    if (response.isException()) {
                        Throwable exception = ((ExceptionResponse) response).getException();
                        MQTTProtocolConverter.LOG.warn("Failed to send MQTT Publish: ", publish, exception.getMessage());
                        MQTTProtocolConverter.LOG.trace("Error trace: {}", exception);
                    }
                    switch (AnonymousClass3.$SwitchMap$org$fusesource$mqtt$client$QoS[publish.qos().ordinal()]) {
                        case 1:
                            PUBACK puback = new PUBACK();
                            puback.messageId(publish.messageId());
                            MQTTProtocolConverter.LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", Short.valueOf(publish.messageId()), MQTTProtocolConverter.this.clientId, MQTTProtocolConverter.this.connectionInfo.getConnectionId());
                            mQTTProtocolConverter.getMQTTTransport().sendToMQTT(puback.encode());
                            return;
                        case 2:
                            PUBREC pubrec = new PUBREC();
                            pubrec.messageId(publish.messageId());
                            synchronized (MQTTProtocolConverter.this.publisherRecs) {
                                MQTTProtocolConverter.this.publisherRecs.put(Short.valueOf(publish.messageId()), pubrec);
                            }
                            MQTTProtocolConverter.LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", Short.valueOf(publish.messageId()), MQTTProtocolConverter.this.clientId, MQTTProtocolConverter.this.connectionInfo.getConnectionId());
                            mQTTProtocolConverter.getMQTTTransport().sendToMQTT(pubrec.encode());
                            return;
                        default:
                            return;
                    }
                }
            };
        }
        return null;
    }

    public long getDefaultKeepAlive() {
        return this.defaultKeepAlive;
    }

    public void setDefaultKeepAlive(long j) {
        this.defaultKeepAlive = j;
    }

    public int getActiveMQSubscriptionPrefetch() {
        return this.activeMQSubscriptionPrefetch;
    }

    public void setActiveMQSubscriptionPrefetch(int i) {
        this.activeMQSubscriptionPrefetch = i;
    }

    public MQTTPacketIdGenerator getPacketIdGenerator() {
        return this.packetIdGenerator;
    }

    public void setPublishDollarTopics(boolean z) {
        this.publishDollarTopics = z;
    }

    public boolean getPublishDollarTopics() {
        return this.publishDollarTopics;
    }

    public ConnectionId getConnectionId() {
        return this.connectionId;
    }

    public SessionId getSessionId() {
        return this.sessionId;
    }

    public boolean isCleanSession() {
        return this.connect.cleanSession();
    }

    public String getSubscriptionStrategy() {
        return this.subscriptionStrategyName;
    }

    public void setSubscriptionStrategy(String str) {
        this.subscriptionStrategyName = str;
    }

    public String getClientId() {
        if (this.clientId == null) {
            if (this.connect == null || this.connect.clientId() == null) {
                this.clientId = "";
            } else {
                this.clientId = this.connect.clientId().toString();
            }
        }
        return this.clientId;
    }

    protected boolean containsMqttWildcard(String str) {
        return str != null && (str.contains("+") || str.contains("#"));
    }

    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
        if (this.subsciptionStrategy == null) {
            synchronized (this.STRATAGY_FINDER) {
                if (this.subsciptionStrategy != null) {
                    return this.subsciptionStrategy;
                }
                if (this.subscriptionStrategyName == null || this.subscriptionStrategyName.isEmpty()) {
                    throw new IOException("Invalid subscription strategy name given: " + this.subscriptionStrategyName);
                }
                try {
                    MQTTSubscriptionStrategy mQTTSubscriptionStrategy = (MQTTSubscriptionStrategy) this.STRATAGY_FINDER.newInstance(this.subscriptionStrategyName);
                    LOG.debug("MQTT Using subscription strategy: {}", this.subscriptionStrategyName);
                    if (mQTTSubscriptionStrategy instanceof BrokerServiceAware) {
                        ((BrokerServiceAware) mQTTSubscriptionStrategy).setBrokerService(this.brokerService);
                    }
                    mQTTSubscriptionStrategy.initialize(this);
                    this.subsciptionStrategy = mQTTSubscriptionStrategy;
                } catch (Exception e) {
                    throw IOExceptionSupport.create(e);
                }
            }
        }
        return this.subsciptionStrategy;
    }
}
