package org.apache.camel.component.mqtt;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.Promise;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.switchyard.component.camel.mqtt.model.CamelMqttBindingModel;

@UriEndpoint(scheme = CamelMqttBindingModel.MQTT, title = "MQTT", syntax = "mqtt:name", consumerClass = MQTTConsumer.class, label = "messaging")
/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.2.1.redhat-186.zip:modules/system/layers/fuse/org/apache/camel/component/mqtt/main/camel-mqtt-2.15.1.redhat-621186.jar:org/apache/camel/component/mqtt/MQTTEndpoint.class */
public class MQTTEndpoint extends DefaultEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTEndpoint.class);
    private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3;
    private CallbackConnection connection;

    @UriPath
    @Metadata(required = "true")
    private String name;

    @UriParam
    private final MQTTConfiguration configuration;
    private volatile boolean connected;
    private final List<MQTTConsumer> consumers;

    public MQTTEndpoint(final String str, MQTTComponent mQTTComponent, MQTTConfiguration mQTTConfiguration) {
        super(str, mQTTComponent);
        this.consumers = new CopyOnWriteArrayList();
        this.configuration = mQTTConfiguration;
        if (LOG.isTraceEnabled()) {
            this.configuration.setTracer(new Tracer() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.1
                @Override // org.fusesource.mqtt.client.Tracer
                public void debug(String str2, Object... objArr) {
                    MQTTEndpoint.LOG.trace("tracer.debug() " + this + ": uri=" + str + ", message=" + String.format(str2, objArr));
                }

                @Override // org.fusesource.mqtt.client.Tracer
                public void onSend(MQTTFrame mQTTFrame) {
                    String mQTTFrame2;
                    try {
                        switch (mQTTFrame.messageType()) {
                            case 1:
                                mQTTFrame2 = new CONNECT().decode(mQTTFrame).toString();
                                break;
                            case 2:
                                mQTTFrame2 = new CONNACK().decode(mQTTFrame).toString();
                                break;
                            case 3:
                                mQTTFrame2 = new PUBLISH().decode(mQTTFrame).toString();
                                break;
                            case 4:
                                mQTTFrame2 = new PUBACK().decode(mQTTFrame).toString();
                                break;
                            case 5:
                                mQTTFrame2 = new PUBREC().decode(mQTTFrame).toString();
                                break;
                            case 6:
                                mQTTFrame2 = new PUBREL().decode(mQTTFrame).toString();
                                break;
                            case 7:
                                mQTTFrame2 = new PUBCOMP().decode(mQTTFrame).toString();
                                break;
                            case 8:
                                mQTTFrame2 = new SUBSCRIBE().decode(mQTTFrame).toString();
                                break;
                            case 9:
                                mQTTFrame2 = new SUBACK().decode(mQTTFrame).toString();
                                break;
                            case 10:
                                mQTTFrame2 = new UNSUBSCRIBE().decode(mQTTFrame).toString();
                                break;
                            case 11:
                            default:
                                mQTTFrame2 = mQTTFrame.toString();
                                break;
                            case 12:
                                mQTTFrame2 = new PINGREQ().decode(mQTTFrame).toString();
                                break;
                            case 13:
                                mQTTFrame2 = new PINGRESP().decode(mQTTFrame).toString();
                                break;
                            case 14:
                                mQTTFrame2 = new DISCONNECT().decode(mQTTFrame).toString();
                                break;
                        }
                    } catch (Throwable th) {
                        mQTTFrame2 = mQTTFrame.toString();
                    }
                    MQTTEndpoint.LOG.trace("tracer.onSend() " + this + ":  uri=" + str + ", frame=" + mQTTFrame2);
                }

                @Override // org.fusesource.mqtt.client.Tracer
                public void onReceive(MQTTFrame mQTTFrame) {
                    String mQTTFrame2;
                    try {
                        switch (mQTTFrame.messageType()) {
                            case 1:
                                mQTTFrame2 = new CONNECT().decode(mQTTFrame).toString();
                                break;
                            case 2:
                                mQTTFrame2 = new CONNACK().decode(mQTTFrame).toString();
                                break;
                            case 3:
                                mQTTFrame2 = new PUBLISH().decode(mQTTFrame).toString();
                                break;
                            case 4:
                                mQTTFrame2 = new PUBACK().decode(mQTTFrame).toString();
                                break;
                            case 5:
                                mQTTFrame2 = new PUBREC().decode(mQTTFrame).toString();
                                break;
                            case 6:
                                mQTTFrame2 = new PUBREL().decode(mQTTFrame).toString();
                                break;
                            case 7:
                                mQTTFrame2 = new PUBCOMP().decode(mQTTFrame).toString();
                                break;
                            case 8:
                                mQTTFrame2 = new SUBSCRIBE().decode(mQTTFrame).toString();
                                break;
                            case 9:
                                mQTTFrame2 = new SUBACK().decode(mQTTFrame).toString();
                                break;
                            case 10:
                                mQTTFrame2 = new UNSUBSCRIBE().decode(mQTTFrame).toString();
                                break;
                            case 11:
                            default:
                                mQTTFrame2 = mQTTFrame.toString();
                                break;
                            case 12:
                                mQTTFrame2 = new PINGREQ().decode(mQTTFrame).toString();
                                break;
                            case 13:
                                mQTTFrame2 = new PINGRESP().decode(mQTTFrame).toString();
                                break;
                            case 14:
                                mQTTFrame2 = new DISCONNECT().decode(mQTTFrame).toString();
                                break;
                        }
                    } catch (Throwable th) {
                        mQTTFrame2 = mQTTFrame.toString();
                    }
                    MQTTEndpoint.LOG.trace("tracer.onReceive() " + this + ":  uri=" + str + ", frame=" + mQTTFrame2);
                }
            });
        }
    }

    @Override // org.apache.camel.Endpoint
    public Consumer createConsumer(Processor processor) throws Exception {
        MQTTConsumer mQTTConsumer = new MQTTConsumer(this, processor);
        configureConsumer(mQTTConsumer);
        return mQTTConsumer;
    }

    @Override // org.apache.camel.Endpoint
    public Producer createProducer() throws Exception {
        return new MQTTProducer(this);
    }

    public MQTTConfiguration getConfiguration() {
        return this.configuration;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultEndpoint, org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        super.doStart();
        createConnection();
    }

    protected void createConnection() {
        this.connection = this.configuration.callbackConnection();
        this.connection.listener(new Listener() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.2
            @Override // org.fusesource.mqtt.client.Listener
            public void onConnected() {
                MQTTEndpoint.this.connected = true;
                MQTTEndpoint.LOG.info("MQTT Connection connected to {}", MQTTEndpoint.this.configuration.getHost());
            }

            @Override // org.fusesource.mqtt.client.Listener
            public void onDisconnected() {
                MQTTEndpoint.LOG.debug("MQTT Connection disconnected from {}", MQTTEndpoint.this.configuration.getHost());
            }

            @Override // org.fusesource.mqtt.client.Listener
            public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Runnable runnable) {
                if (!MQTTEndpoint.this.consumers.isEmpty()) {
                    Exchange createExchange = MQTTEndpoint.this.createExchange();
                    createExchange.getIn().setBody(buffer.toByteArray());
                    createExchange.getIn().setHeader(MQTTConfiguration.MQTT_SUBSCRIBE_TOPIC, uTF8Buffer.toString());
                    Iterator it = MQTTEndpoint.this.consumers.iterator();
                    while (it.hasNext()) {
                        ((MQTTConsumer) it.next()).processExchange(createExchange);
                    }
                }
                if (runnable != null) {
                    runnable.run();
                }
            }

            @Override // org.fusesource.mqtt.client.Listener
            public void onFailure(Throwable th) {
                MQTTEndpoint.this.connected = false;
                MQTTEndpoint.LOG.warn("Connection to " + MQTTEndpoint.this.configuration.getHost() + " failure due " + th.getMessage() + ". Forcing a disconnect to re-connect on next attempt.");
                MQTTEndpoint.this.connection.disconnect(new Callback<Void>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.2.1
                    @Override // org.fusesource.mqtt.client.Callback
                    public void onSuccess(Void r2) {
                    }

                    @Override // org.fusesource.mqtt.client.Callback
                    public void onFailure(Throwable th2) {
                        MQTTEndpoint.LOG.debug("Failed to disconnect from " + MQTTEndpoint.this.configuration.getHost() + ". This exception is ignored.", th2);
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultEndpoint, org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        super.doStop();
        if (this.connection != null) {
            final Promise promise = new Promise();
            this.connection.getDispatchQueue().execute(new Task() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.3
                @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                public void run() {
                    MQTTEndpoint.this.connection.disconnect(new Callback<Void>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.3.1
                        @Override // org.fusesource.mqtt.client.Callback
                        public void onSuccess(Void r4) {
                            promise.onSuccess(r4);
                        }

                        @Override // org.fusesource.mqtt.client.Callback
                        public void onFailure(Throwable th) {
                            promise.onFailure(th);
                        }
                    });
                }
            });
            promise.await(this.configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect() throws Exception {
        final Promise promise = new Promise();
        this.connection.connect(new Callback<Void>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.4
            @Override // org.fusesource.mqtt.client.Callback
            public void onSuccess(Void r7) {
                MQTTEndpoint.LOG.debug("Connected to {}", MQTTEndpoint.this.configuration.getHost());
                Topic[] createSubscribeTopics = MQTTEndpoint.this.createSubscribeTopics();
                if (createSubscribeTopics != null && createSubscribeTopics.length > 0) {
                    MQTTEndpoint.this.connection.subscribe(createSubscribeTopics, new Callback<byte[]>() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.4.1
                        @Override // org.fusesource.mqtt.client.Callback
                        public void onSuccess(byte[] bArr) {
                            promise.onSuccess(bArr);
                            MQTTEndpoint.this.connected = true;
                        }

                        @Override // org.fusesource.mqtt.client.Callback
                        public void onFailure(Throwable th) {
                            MQTTEndpoint.LOG.debug("Failed to subscribe", th);
                            promise.onFailure(th);
                            MQTTEndpoint.this.connection.disconnect(null);
                            MQTTEndpoint.this.connected = false;
                        }
                    });
                } else {
                    promise.onSuccess(r7);
                    MQTTEndpoint.this.connected = true;
                }
            }

            @Override // org.fusesource.mqtt.client.Callback
            public void onFailure(Throwable th) {
                MQTTEndpoint.LOG.warn("Failed to connect to " + MQTTEndpoint.this.configuration.getHost() + " due " + th.getMessage());
                promise.onFailure(th);
                MQTTEndpoint.this.connection.disconnect(null);
                MQTTEndpoint.this.connected = false;
            }
        });
        LOG.info("Connecting to {} using {} seconds timeout", this.configuration.getHost(), Integer.valueOf(this.configuration.getConnectWaitInSeconds()));
        promise.await(this.configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS);
    }

    Topic[] createSubscribeTopics() {
        String subscribeTopicNames = this.configuration.getSubscribeTopicNames();
        if (subscribeTopicNames == null || subscribeTopicNames.isEmpty()) {
            String subscribeTopicName = this.configuration.getSubscribeTopicName();
            String trim = subscribeTopicName != null ? subscribeTopicName.trim() : null;
            if (trim != null && !trim.isEmpty()) {
                return new Topic[]{new Topic(trim, this.configuration.getQoS())};
            }
            LOG.warn("No topic subscriptions were specified in configuration");
            return null;
        }
        String[] split = subscribeTopicNames.split(",");
        Topic[] topicArr = new Topic[split.length];
        for (int i = 0; i < split.length; i++) {
            topicArr[i] = new Topic(split[i].trim(), this.configuration.getQoS());
        }
        return topicArr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnected() {
        return this.connected;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(final String str, final byte[] bArr, final QoS qoS, final boolean z, final Callback<Void> callback) throws Exception {
        boolean isConnected = isConnected();
        int i = 0;
        TimeoutException timeoutException = null;
        while (!isConnected && i <= 3) {
            i++;
            try {
                LOG.warn("#{} attempt to re-create connection to {} before publishing", Integer.valueOf(i), this.configuration.getHost());
                createConnection();
                connect();
            } catch (TimeoutException e) {
                timeoutException = e;
                LOG.debug("Timed out after {} seconds after {} attempt to re-create connection to {}", new Object[]{Integer.valueOf(this.configuration.getConnectWaitInSeconds()), Integer.valueOf(i), this.configuration.getHost()});
            } catch (Throwable th) {
                callback.onFailure(th);
                return;
            }
            isConnected = isConnected();
        }
        if (i <= 3 || isConnected()) {
            this.connection.getDispatchQueue().execute(new Task() { // from class: org.apache.camel.component.mqtt.MQTTEndpoint.5
                @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                public void run() {
                    MQTTEndpoint.LOG.debug("Publishing to {}", MQTTEndpoint.this.configuration.getHost());
                    MQTTEndpoint.this.connection.publish(str, bArr, qoS, z, callback);
                }
            });
        } else {
            LOG.warn("Cannot re-connect to {} after {} attempts", this.configuration.getHost(), Integer.valueOf(i));
            callback.onFailure(timeoutException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addConsumer(MQTTConsumer mQTTConsumer) {
        this.consumers.add(mQTTConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConsumer(MQTTConsumer mQTTConsumer) {
        this.consumers.remove(mQTTConsumer);
    }

    @Override // org.apache.camel.IsSingleton
    public boolean isSingleton() {
        return true;
    }
}
