package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import java.io.UnsupportedEncodingException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;

/* loaded from: input_file:org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.class */
public class MQTTPublishManager {
    private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
    private SimpleString managementAddress;
    private ServerConsumer managementConsumer;
    private MQTTSession session;
    private MQTTLogger log = MQTTLogger.LOGGER;
    private final Object lock = new Object();
    private MQTTSessionState state;
    private MQTTSessionState.OutboundStore outboundStore;

    public MQTTPublishManager(MQTTSession mQTTSession) {
        this.session = mQTTSession;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void start() throws Exception {
        this.state = this.session.getSessionState();
        this.outboundStore = this.state.getOutboundStore();
        createManagementAddress();
        createManagementQueue();
        createManagementConsumer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stop() throws Exception {
        if (this.managementConsumer != null) {
            this.managementConsumer.removeItself();
            this.managementConsumer.setStarted(false);
            this.managementConsumer.close(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clean() throws Exception {
        createManagementAddress();
        Queue locateQueue = this.session.getServer().locateQueue(this.managementAddress);
        if (locateQueue != null) {
            locateQueue.deleteQueue();
        }
    }

    private void createManagementConsumer() throws Exception {
        this.managementConsumer = this.session.getServerSession().createConsumer(this.session.getServer().getStorageManager().generateID(), this.managementAddress, (SimpleString) null, false, false, -1);
        this.managementConsumer.setStarted(true);
    }

    private void createManagementAddress() {
        this.managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + this.session.getSessionState().getClientId());
    }

    private void createManagementQueue() throws Exception {
        if (this.session.getServer().locateQueue(this.managementAddress) == null) {
            this.session.getServerSession().createQueue(this.managementAddress, this.managementAddress, (SimpleString) null, false, true);
        }
    }

    boolean isManagementConsumer(ServerConsumer serverConsumer) {
        return serverConsumer == this.managementConsumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendMessage(ServerMessage serverMessage, ServerConsumer serverConsumer, int i) throws Exception {
        if (isManagementConsumer(serverConsumer)) {
            sendPubRelMessage(serverMessage);
            return;
        }
        int decideQoS = decideQoS(serverMessage, serverConsumer);
        if (decideQoS == 0) {
            sendServerMessage((int) serverMessage.getMessageID(), (ServerMessageImpl) serverMessage, i, decideQoS);
            this.session.getServerSession().acknowledge(serverConsumer.getID(), serverMessage.getMessageID());
        } else {
            int generateMqttId = this.outboundStore.generateMqttId(serverMessage.getMessageID(), serverConsumer.getID());
            this.outboundStore.publish(generateMqttId, serverMessage.getMessageID(), serverConsumer.getID());
            sendServerMessage(generateMqttId, (ServerMessageImpl) serverMessage, i, decideQoS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleMessage(int i, String str, int i2, ByteBuf byteBuf, boolean z) throws Exception {
        sendInternal(i, str, i2, byteBuf, z, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendInternal(int i, String str, int i2, ByteBuf byteBuf, boolean z, boolean z2) throws Exception {
        synchronized (this.lock) {
            ServerMessage createServerMessageFromByteBuf = MQTTUtil.createServerMessageFromByteBuf(this.session, str, z, i2, byteBuf);
            if (i2 > 0) {
                createServerMessageFromByteBuf.setDurable(true);
            }
            if (i2 < 2 || !this.state.getPubRec().contains(Integer.valueOf(i))) {
                if (i2 == 2 && !z2) {
                    this.state.getPubRec().add(Integer.valueOf(i));
                }
                Transaction newTransaction = this.session.getServerSession().newTransaction();
                try {
                    if (z2) {
                        this.session.getServer().getPostOffice().route(createServerMessageFromByteBuf, (QueueCreator) null, newTransaction, true);
                    } else {
                        this.session.getServerSession().send(newTransaction, createServerMessageFromByteBuf, true, false);
                    }
                    if (z) {
                        this.session.getRetainMessageManager().handleRetainedMessage(createServerMessageFromByteBuf, str, (byteBuf instanceof EmptyByteBuf) || byteBuf.capacity() == 0, newTransaction);
                    }
                    newTransaction.commit();
                    createMessageAck(i, i2, z2);
                } catch (Throwable th) {
                    newTransaction.rollback();
                    throw th;
                }
            }
        }
    }

    void sendPubRelMessage(ServerMessage serverMessage) {
        this.session.getProtocolHandler().sendPubRel(serverMessage.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY).intValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubRec(int i) throws Exception {
        try {
            Pair<Long, Long> publishReceived = this.outboundStore.publishReceived(i);
            if (publishReceived != null) {
                this.session.getServerSession().send(MQTTUtil.createPubRelMessage(this.session, this.managementAddress, i), true);
                this.session.getServerSession().acknowledge(((Long) publishReceived.getB()).longValue(), ((Long) publishReceived.getA()).longValue());
            } else {
                this.session.getProtocolHandler().sendPubRel(i);
            }
        } catch (ActiveMQIllegalStateException e) {
            this.log.warn("MQTT Client(" + this.session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubComp(int i) throws Exception {
        Pair<Long, Long> publishComplete = this.session.getState().getOutboundStore().publishComplete(i);
        if (publishComplete != null) {
            this.session.getServerSession().acknowledge(((Long) publishComplete.getB()).longValue(), ((Long) publishComplete.getA()).longValue());
        }
    }

    private void createMessageAck(final int i, final int i2, final boolean z) {
        this.session.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { // from class: org.apache.activemq.artemis.core.protocol.mqtt.MQTTPublishManager.1
            public void done() {
                if (z) {
                    return;
                }
                if (i2 == 1) {
                    MQTTPublishManager.this.session.getProtocolHandler().sendPubAck(i);
                } else if (i2 == 2) {
                    MQTTPublishManager.this.session.getProtocolHandler().sendPubRec(i);
                }
            }

            public void onError(int i3, String str) {
                MQTTPublishManager.this.log.error("Pub Sync Failed");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubRel(int i) {
        this.state.getPubRec().remove(Integer.valueOf(i));
        this.session.getProtocolHandler().sendPubComp(i);
        this.state.removeMessageRef(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubAck(int i) throws Exception {
        try {
            Pair<Long, Long> publishAckd = this.outboundStore.publishAckd(i);
            if (publishAckd != null) {
                this.session.getServerSession().acknowledge(((Long) publishAckd.getB()).longValue(), ((Long) publishAckd.getA()).longValue());
            }
        } catch (ActiveMQIllegalStateException e) {
            this.log.warn("MQTT Client(" + this.session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
        }
    }

    private void sendServerMessage(int i, ServerMessageImpl serverMessageImpl, int i2, int i3) {
        ByteBuf buffer;
        String convertCoreAddressFilterToMQTT = MQTTUtil.convertCoreAddressFilterToMQTT(serverMessageImpl.getAddress().toString());
        switch (serverMessageImpl.getType()) {
            case 3:
                try {
                    byte[] bytes = serverMessageImpl.getBodyBuffer().readNullableSimpleString().toString().getBytes("UTF-8");
                    buffer = ByteBufAllocator.DEFAULT.buffer(bytes.length);
                    buffer.writeBytes(bytes);
                    break;
                } catch (UnsupportedEncodingException e) {
                    this.log.warn("Unable to send message: " + serverMessageImpl.getMessageID() + " Cause: " + e.getMessage());
                }
            default:
                ActiveMQBuffer bodyBufferDuplicate = serverMessageImpl.getBodyBufferDuplicate();
                buffer = bodyBufferDuplicate.readBytes(serverMessageImpl.getEndOfBodyPosition() - bodyBufferDuplicate.readerIndex()).byteBuf();
                break;
        }
        this.session.getProtocolHandler().send(i, convertCoreAddressFilterToMQTT, i3, buffer, i2);
    }

    private int decideQoS(ServerMessage serverMessage, ServerConsumer serverConsumer) {
        int intValue = this.session.getSubscriptionManager().getConsumerQoSLevels().get(Long.valueOf(serverConsumer.getID())).intValue();
        int i = 2;
        if (serverMessage.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
            i = serverMessage.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY).intValue();
        }
        return intValue < i ? intValue : i;
    }
}
