/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.mqtt.handler.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ClientConnectionContext;
import com.hivemq.bootstrap.ClientState;
import com.hivemq.configuration.service.FullConfigurationService;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.packets.auth.ModifiableDefaultPermissions;
import com.hivemq.extension.sdk.api.packets.disconnect.DisconnectReasonCode;
import com.hivemq.extension.sdk.api.packets.publish.AckReasonCode;
import com.hivemq.extensions.auth.parameter.ModifiableClientSettingsImpl;
import com.hivemq.extensions.events.OnAuthSuccessEvent;
import com.hivemq.extensions.handler.PluginAuthenticatorService;
import com.hivemq.extensions.handler.PluginAuthorizerService;
import com.hivemq.extensions.handler.PluginAuthorizerServiceImpl;
import com.hivemq.extensions.handler.tasks.PublishAuthorizerResult;
import com.hivemq.extensions.packets.general.ModifiableDefaultPermissionsImpl;
import com.hivemq.extensions.services.auth.Authorizers;
import com.hivemq.limitation.TopicAliasLimiter;
import com.hivemq.mqtt.handler.KeepAliveDisconnectHandler;
import com.hivemq.mqtt.handler.KeepAliveDisconnectService;
import com.hivemq.mqtt.handler.connack.MqttConnacker;
import com.hivemq.mqtt.handler.connect.PollInflightMessageListener;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.handler.publish.DefaultPermissionsEvaluator;
import com.hivemq.mqtt.handler.publish.FlowControlHandler;
import com.hivemq.mqtt.handler.publish.PublishFlowHandler;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.connack.CONNACK;
import com.hivemq.mqtt.message.connack.CONNACKBuilder;
import com.hivemq.mqtt.message.connack.Mqtt3ConnAckReturnCode;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.connect.MqttWillPublish;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.persistence.clientsession.ClientSessionPersistence;
import com.hivemq.persistence.clientsession.SharedSubscriptionService;
import com.hivemq.persistence.connection.ConnectionPersistence;
import com.hivemq.util.Bytes;
import com.hivemq.util.Exceptions;
import com.hivemq.util.Topics;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@ChannelHandler.Sharable
public class ConnectHandler
extends SimpleChannelInboundHandler<CONNECT> {
    @NotNull
    private static final Logger log = LoggerFactory.getLogger(ConnectHandler.class);
    @NotNull
    private final ClientSessionPersistence clientSessionPersistence;
    @NotNull
    private final ConnectionPersistence connectionPersistence;
    @NotNull
    private final FullConfigurationService configurationService;
    @NotNull
    private final Provider<PublishFlowHandler> publishFlowHandlerProvider;
    @NotNull
    private final Provider<FlowControlHandler> flowControlHandlerProvider;
    @NotNull
    private final MqttConnacker mqttConnacker;
    @NotNull
    private final TopicAliasLimiter topicAliasLimiter;
    @NotNull
    private final PublishPollService publishPollService;
    @NotNull
    private final SharedSubscriptionService sharedSubscriptionService;
    @NotNull
    private final Authorizers authorizers;
    @NotNull
    private final PluginAuthenticatorService pluginAuthenticatorService;
    @NotNull
    private final PluginAuthorizerService pluginAuthorizerService;
    @NotNull
    private final MqttServerDisconnector mqttServerDisconnector;
    @NotNull
    private final KeepAliveDisconnectService keepAliveDisconnectService;
    private int maxClientIdLength;
    private long configuredSessionExpiryInterval;
    private int topicAliasMaximum;
    private int serverKeepAliveMaximum;
    private boolean allowZeroKeepAlive;
    private long maxMessageExpiryInterval;

    @Inject
    public ConnectHandler(@NotNull ClientSessionPersistence clientSessionPersistence, @NotNull ConnectionPersistence connectionPersistence, @NotNull FullConfigurationService configurationService, @NotNull Provider<PublishFlowHandler> publishFlowHandlerProvider, @NotNull Provider<FlowControlHandler> flowControlHandlerProvider, @NotNull MqttConnacker mqttConnacker, @NotNull TopicAliasLimiter topicAliasLimiter, @NotNull PublishPollService publishPollService, @NotNull SharedSubscriptionService sharedSubscriptionService, @NotNull PluginAuthenticatorService pluginAuthenticatorService, @NotNull Authorizers authorizers, @NotNull PluginAuthorizerService pluginAuthorizerService, @NotNull MqttServerDisconnector mqttServerDisconnector, @NotNull KeepAliveDisconnectService keepAliveDisconnectService) {
        this.clientSessionPersistence = clientSessionPersistence;
        this.connectionPersistence = connectionPersistence;
        this.configurationService = configurationService;
        this.publishFlowHandlerProvider = publishFlowHandlerProvider;
        this.flowControlHandlerProvider = flowControlHandlerProvider;
        this.mqttConnacker = mqttConnacker;
        this.topicAliasLimiter = topicAliasLimiter;
        this.publishPollService = publishPollService;
        this.sharedSubscriptionService = sharedSubscriptionService;
        this.pluginAuthenticatorService = pluginAuthenticatorService;
        this.authorizers = authorizers;
        this.pluginAuthorizerService = pluginAuthorizerService;
        this.mqttServerDisconnector = mqttServerDisconnector;
        this.keepAliveDisconnectService = keepAliveDisconnectService;
    }

    @PostConstruct
    public void postConstruct() {
        this.maxClientIdLength = this.configurationService.restrictionsConfiguration().maxClientIdLength();
        this.configuredSessionExpiryInterval = this.configurationService.mqttConfiguration().maxSessionExpiryInterval();
        this.topicAliasMaximum = this.configurationService.mqttConfiguration().topicAliasEnabled() ? this.configurationService.mqttConfiguration().topicAliasMaxPerClient() : 0;
        this.serverKeepAliveMaximum = this.configurationService.mqttConfiguration().keepAliveMax();
        this.allowZeroKeepAlive = this.configurationService.mqttConfiguration().keepAliveAllowZero();
        this.maxMessageExpiryInterval = this.configurationService.mqttConfiguration().maxMessageExpiryInterval();
    }

    protected void channelRead0(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT connect) throws Exception {
        this.adjustValuesAccordingToSettings(connect);
        if (!this.checkClientId(ctx, connect)) {
            return;
        }
        if (!this.checkWillPublish(ctx, connect)) {
            return;
        }
        if (!this.checkWillRetained(ctx, connect)) {
            return;
        }
        ClientConnectionContext clientConnectionContext = ClientConnectionContext.of(ctx.channel());
        clientConnectionContext.setDisconnectFuture((SettableFuture<Void>)SettableFuture.create());
        clientConnectionContext.setClientReceiveMaximum(connect.getReceiveMaximum());
        if (connect.getMaximumPacketSize() <= 0x10000004L) {
            clientConnectionContext.setMaxPacketSizeSend(connect.getMaximumPacketSize());
        }
        clientConnectionContext.setRequestResponseInformation(connect.isResponseInformationRequested());
        clientConnectionContext.setRequestProblemInformation(connect.isProblemInformationRequested());
        this.addPublishFlowHandler(ctx, connect);
        clientConnectionContext.proposeClientState(ClientState.AUTHENTICATING);
        clientConnectionContext.setAuthConnect(connect);
        this.pluginAuthenticatorService.authenticateConnect(ctx, clientConnectionContext, connect, new ModifiableClientSettingsImpl(connect.getReceiveMaximum(), null));
    }

    public void connectSuccessfulUndecided(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnectionContext clientConnectionContext, @NotNull CONNECT connect, @Nullable ModifiableClientSettingsImpl clientSettings) {
        this.clearPasswordIfWanted(clientConnectionContext);
        if (InternalConfigurations.AUTH_DENY_UNAUTHENTICATED_CONNECTIONS.get()) {
            this.mqttConnacker.connackError(clientConnectionContext.getChannel(), "Client with ip {} could not be authenticated", "Authentication failed, no authenticator registered", Mqtt5ConnAckReasonCode.NOT_AUTHORIZED, "Authentication failed, no authenticator registered", Mqtt5UserProperties.NO_USER_PROPERTIES, true);
            return;
        }
        clientConnectionContext.proposeClientState(ClientState.AUTHENTICATED);
        ConnectHandler.cleanChannelAttributesAfterAuth(clientConnectionContext);
        this.connectAuthenticated(ctx, clientConnectionContext, connect, clientSettings);
    }

    public void connectSuccessfulAuthenticated(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnectionContext clientConnectionContext, @NotNull CONNECT connect, @Nullable ModifiableClientSettingsImpl clientSettings) {
        this.clearPasswordIfWanted(clientConnectionContext);
        clientConnectionContext.proposeClientState(ClientState.AUTHENTICATED);
        ConnectHandler.cleanChannelAttributesAfterAuth(clientConnectionContext);
        this.connectAuthenticated(ctx, clientConnectionContext, connect, clientSettings);
    }

    private void clearPasswordIfWanted(@NotNull ClientConnectionContext clientConnectionContext) {
        Optional<Boolean> clearPasswordAfterAuthOptional = clientConnectionContext.isClearPasswordAfterAuth();
        if (clearPasswordAfterAuthOptional.isPresent()) {
            if (clearPasswordAfterAuthOptional.get().booleanValue()) {
                clientConnectionContext.clearPassword();
            }
        } else {
            clientConnectionContext.clearPassword();
        }
    }

    private static void cleanChannelAttributesAfterAuth(@NotNull ClientConnectionContext clientConnectionContext) {
        ChannelPipeline pipeline = clientConnectionContext.getChannel().pipeline();
        if (pipeline.context("auth_in_progress_message_handler") != null) {
            try {
                pipeline.remove("auth_in_progress_message_handler");
            }
            catch (NoSuchElementException noSuchElementException) {
                // empty catch block
            }
        }
        clientConnectionContext.setAuthConnect(null);
    }

    private void adjustValuesAccordingToSettings(@NotNull CONNECT connect) {
        MqttWillPublish willPublish;
        if (connect.getWillPublish() != null && (willPublish = connect.getWillPublish()).getMessageExpiryInterval() > this.maxMessageExpiryInterval) {
            willPublish.setMessageExpiryInterval(this.maxMessageExpiryInterval);
        }
    }

    private void addPublishFlowHandler(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT connect) {
        ctx.channel().pipeline().addBefore("message_expiry_handler", "mqtt_publish_flow_handler", (ChannelHandler)this.publishFlowHandlerProvider.get());
        if (connect.getProtocolVersion() == ProtocolVersion.MQTTv5) {
            ctx.channel().pipeline().addBefore("mqtt_message_barrier", "mqtt_5_flow_control_handler", (ChannelHandler)this.flowControlHandlerProvider.get());
        }
    }

    public void userEventTriggered(@NotNull ChannelHandlerContext ctx, @NotNull Object evt) throws Exception {
        if (evt instanceof PluginAuthorizerServiceImpl.AuthorizeWillResultEvent) {
            PluginAuthorizerServiceImpl.AuthorizeWillResultEvent resultEvent = (PluginAuthorizerServiceImpl.AuthorizeWillResultEvent)evt;
            this.afterPublishAuthorizer(ctx, resultEvent.getConnect(), resultEvent.getResult());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @NotNull
    private ListenableFuture<Void> updatePersistenceData(boolean cleanStart, @NotNull String clientId, long sessionExpiryInterval, @Nullable MqttWillPublish willPublish, @Nullable Long queueSizeMaximum) {
        return this.clientSessionPersistence.clientConnected(clientId, cleanStart, sessionExpiryInterval, willPublish, queueSizeMaximum);
    }

    private boolean checkClientId(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg) {
        Boolean assigned = ClientConnectionContext.of(ctx.channel()).isClientIdAssigned();
        if (assigned != null && assigned.booleanValue()) {
            return true;
        }
        if (msg.getClientIdentifier().length() > this.maxClientIdLength) {
            String logMessage = "A client (IP: {}) connected with a client identifier longer than " + this.maxClientIdLength + " characters. This is not allowed.";
            String eventlogMessage = "Sent CONNECT with Client identifier too long";
            this.mqttConnacker.connackError(ctx.channel(), logMessage, "Sent CONNECT with Client identifier too long", Mqtt5ConnAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID, "Sent CONNECT with a client identifier that is too long.");
            return false;
        }
        return true;
    }

    private boolean checkWillPublish(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg) {
        if (msg.getWillPublish() != null) {
            int maxQos;
            if (Topics.containsWildcard(msg.getWillPublish().getTopic())) {
                this.mqttConnacker.connackError(ctx.channel(), "A client (IP: {}) sent a CONNECT with a wildcard character in the Will Topic (# or +). This is not allowed.", "Sent CONNECT with wildcard character in the Will Topic (#/+)", Mqtt5ConnAckReasonCode.TOPIC_NAME_INVALID, "Not authorized to connect. Will topic contained wildcard characters (#/+). The broker does not allow this.");
                return false;
            }
            int willQos = msg.getWillPublish().getQos().getQosNumber();
            if (willQos > (maxQos = this.configurationService.mqttConfiguration().maximumQos().getQosNumber())) {
                this.mqttConnacker.connackError(ctx.channel(), "A client (IP: {}) sent a CONNECT with a Will QoS higher than the maximum configured QoS. This is not allowed.", "Sent CONNECT with Will QoS (" + willQos + ") higher than the allowed maximum (" + maxQos + ")", Mqtt5ConnAckReasonCode.QOS_NOT_SUPPORTED, String.format("Quality of service level of Will message in CONNECT exceeds maximum allowed QoS. QoS used: %s. Maximum allowed QoS: %s.", willQos, maxQos));
                return false;
            }
            int maxTopicLength = this.configurationService.restrictionsConfiguration().maxTopicLength();
            if (msg.getWillPublish().getTopic().length() > maxTopicLength) {
                this.mqttConnacker.connackError(ctx.channel(), "A client (IP: {}) sent a CONNECT with a Will Topic exceeding the max length. This is not allowed.", "Sent CONNECT with Will topic that exceeds maximum topic length", Mqtt5ConnAckReasonCode.TOPIC_NAME_INVALID, "Not authorized to connect. The will topic length exceeded the maximum length configured on the broker.");
                return false;
            }
        }
        return true;
    }

    private boolean checkWillRetained(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg) {
        if (msg.getWillPublish() != null && msg.getWillPublish().isRetain() && !this.configurationService.mqttConfiguration().retainedMessagesEnabled()) {
            this.mqttConnacker.connackError(ctx.channel(), "A client (IP: {}) sent a CONNECT with Will Retain set to 1 although retain is not available.", "Sent a CONNECT with Will Retain set to 1 although retain is not available", Mqtt5ConnAckReasonCode.RETAIN_NOT_SUPPORTED, "Retain flag of Will message was set to true. The broker does not allow this.");
            return false;
        }
        return true;
    }

    private void connectAuthenticated(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnectionContext clientConnectionContext, @NotNull CONNECT msg, @Nullable ModifiableClientSettingsImpl clientSettings) {
        clientConnectionContext.setPreventLwt(true);
        if (clientSettings != null && clientSettings.isModified()) {
            this.applyClientSettings(clientSettings, msg, clientConnectionContext.getChannel());
        }
        if (msg.getWillPublish() != null) {
            if (this.authorizers.areAuthorizersAvailable()) {
                ctx.executor().execute(() -> this.pluginAuthorizerService.authorizeWillPublish(ctx, msg));
            } else {
                if (this.isWillNotAuthorized(ctx, msg)) {
                    return;
                }
                this.continueAfterWillAuthorization(ctx, clientConnectionContext, msg);
            }
        } else {
            this.continueAfterWillAuthorization(ctx, clientConnectionContext, msg);
        }
    }

    private void applyClientSettings(@NotNull ModifiableClientSettingsImpl clientSettings, @NotNull CONNECT msg, @NotNull Channel channel) {
        msg.setReceiveMaximum(clientSettings.getClientReceiveMaximum());
        ClientConnectionContext clientConnectionContext = ClientConnectionContext.of(channel);
        clientConnectionContext.setClientReceiveMaximum(clientSettings.getClientReceiveMaximum());
        clientConnectionContext.setQueueSizeMaximum(clientSettings.getQueueSizeMaximum());
    }

    private void continueAfterWillAuthorization(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnectionContext clientConnectionContext, @NotNull CONNECT msg) {
        clientConnectionContext.getChannel().pipeline().fireUserEventTriggered((Object)new OnAuthSuccessEvent());
        ClientConnection clientConnection = ClientConnection.from(clientConnectionContext);
        this.disconnectClientWithSameClientId(clientConnection, ctx, msg);
    }

    private void afterPublishAuthorizer(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg, @NotNull PublishAuthorizerResult authorizerResult) {
        ClientConnectionContext clientConnectionContext = ClientConnectionContext.of(ctx.channel());
        if (authorizerResult.isAuthorizerPresent() && authorizerResult.getAckReasonCode() != null) {
            if (authorizerResult.getAckReasonCode() == AckReasonCode.SUCCESS) {
                this.continueAfterWillAuthorization(ctx, clientConnectionContext, msg);
            } else {
                this.connackWillNotAuthorized(ctx, msg, authorizerResult.getDisconnectReasonCode(), authorizerResult.getAckReasonCode(), authorizerResult.getReasonString());
            }
            return;
        }
        ModifiableDefaultPermissions permissions = clientConnectionContext.getAuthPermissions();
        ModifiableDefaultPermissionsImpl defaultPermissions = (ModifiableDefaultPermissionsImpl)permissions;
        if (authorizerResult.isAuthorizerPresent() && (defaultPermissions == null || defaultPermissions.asList().size() < 1 && !defaultPermissions.isDefaultAuthorizationBehaviourOverridden())) {
            this.connackWillNotAuthorized(ctx, msg, authorizerResult.getDisconnectReasonCode(), null, null);
            return;
        }
        if (!DefaultPermissionsEvaluator.checkWillPublish(permissions, msg.getWillPublish())) {
            this.connackWillNotAuthorized(ctx, msg, authorizerResult.getDisconnectReasonCode(), authorizerResult.getAckReasonCode(), authorizerResult.getReasonString());
            return;
        }
        this.continueAfterWillAuthorization(ctx, clientConnectionContext, msg);
    }

    private boolean isWillNotAuthorized(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg) {
        ModifiableDefaultPermissions permissions;
        if (msg.getWillPublish() != null && !DefaultPermissionsEvaluator.checkWillPublish(permissions = ClientConnectionContext.of(ctx.channel()).getAuthPermissions(), msg.getWillPublish())) {
            this.connackWillNotAuthorized(ctx, msg, null, null, null);
            return true;
        }
        return false;
    }

    private void connackWillNotAuthorized(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg, @Nullable DisconnectReasonCode disconnectReasonCode, @Nullable AckReasonCode ackReasonCode, @Nullable String reasonString) {
        Mqtt5ConnAckReasonCode connAckReasonCode;
        Mqtt5ConnAckReasonCode mqtt5ConnAckReasonCode = connAckReasonCode = disconnectReasonCode != null ? Mqtt5ConnAckReasonCode.fromDisconnectReasonCode(disconnectReasonCode) : null;
        if (connAckReasonCode == null) {
            connAckReasonCode = ackReasonCode != null ? Mqtt5ConnAckReasonCode.fromAckReasonCode(ackReasonCode) : Mqtt5ConnAckReasonCode.NOT_AUTHORIZED;
        }
        String usedReasonString = reasonString != null ? reasonString : "Will Publish is not authorized for topic '" + msg.getWillPublish().getTopic() + "' with QoS '" + String.valueOf((Object)msg.getWillPublish().getQos()) + "' and retain '" + msg.getWillPublish().isRetain() + "'";
        this.mqttConnacker.connackError(ctx.channel(), "A client (IP: {}) sent a CONNECT message with an not authorized Will Publish to topic '" + msg.getWillPublish().getTopic() + "' with QoS '" + msg.getWillPublish().getQos().getQosNumber() + "' and retain '" + msg.getWillPublish().isRetain() + "'.", "Sent a CONNECT message with an not authorized Will Publish to topic '" + msg.getWillPublish().getTopic() + "' with QoS '" + msg.getWillPublish().getQos().getQosNumber() + "' and retain '" + msg.getWillPublish().isRetain() + "'", connAckReasonCode, usedReasonString, Mqtt5UserProperties.NO_USER_PROPERTIES, true);
    }

    @VisibleForTesting
    void afterTakeover(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnection clientConnection, @NotNull CONNECT msg) {
        Long queueSizeMaximum = clientConnection.getQueueSizeMaximum();
        long sessionExpiryInterval = msg.getSessionExpiryInterval() > this.configuredSessionExpiryInterval ? this.configuredSessionExpiryInterval : msg.getSessionExpiryInterval();
        boolean existent = msg.isCleanStart() ? false : this.clientSessionPersistence.isExistent(msg.getClientIdentifier());
        ListenableFuture<Void> future = this.updatePersistenceData(msg.isCleanStart(), msg.getClientIdentifier(), sessionExpiryInterval, msg.getWillPublish(), queueSizeMaximum);
        Futures.addCallback(future, (FutureCallback)new UpdatePersistenceCallback(ctx, clientConnection, this, msg, existent), (Executor)ctx.executor());
    }

    private void afterPersistSession(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnection clientConnection, @NotNull CONNECT msg, boolean sessionPresent) {
        this.sharedSubscriptionService.invalidateSharedSubscriptionCache(msg.getClientIdentifier());
        this.addKeepAliveHandler(ctx, msg);
        this.sendConnackSuccess(ctx, clientConnection, msg, sessionPresent);
        try {
            ctx.pipeline().remove((ChannelHandler)this);
        }
        catch (NoSuchElementException noSuchElementException) {
            // empty catch block
        }
    }

    private void sendConnackSuccess(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnection clientConnection, @NotNull CONNECT msg, boolean sessionPresent) {
        ChannelFuture connackSent;
        clientConnection.setWillPublish(msg.getWillPublish());
        if (msg.getProtocolVersion() == ProtocolVersion.MQTTv5) {
            CONNACK connack = this.buildMqtt5Connack(clientConnection, msg, sessionPresent);
            connackSent = this.mqttConnacker.connackSuccess(ctx, connack, msg);
        } else {
            clientConnection.setClientSessionExpiryInterval(msg.getSessionExpiryInterval());
            CONNACK connack = CONNACK.builder().withMqtt3ReturnCode(Mqtt3ConnAckReturnCode.ACCEPTED).withSessionPresent(sessionPresent).build();
            connackSent = this.mqttConnacker.connackSuccess(ctx, connack, msg);
        }
        connackSent.addListener((GenericFutureListener)new PollInflightMessageListener(this.publishPollService, clientConnection.getClientId()));
    }

    @NotNull
    private CONNACK buildMqtt5Connack(@NotNull ClientConnection clientConnection, @NotNull CONNECT msg, boolean sessionPresent) {
        String authMethod;
        boolean clientIdAssigned;
        long sessionExpiryInterval;
        CONNACKBuilder builder = CONNACK.builder().withSessionPresent(sessionPresent).withReasonCode(Mqtt5ConnAckReasonCode.SUCCESS).withReceiveMaximum(this.configurationService.mqttConfiguration().serverReceiveMaximum()).withSubscriptionIdentifierAvailable(this.configurationService.mqttConfiguration().subscriptionIdentifierEnabled()).withMaximumPacketSize(this.configurationService.mqttConfiguration().maxPacketSize()).withWildcardSubscriptionAvailable(this.configurationService.mqttConfiguration().wildcardSubscriptionsEnabled()).withSharedSubscriptionAvailable(this.configurationService.mqttConfiguration().sharedSubscriptionsEnabled()).withMaximumQoS(this.configurationService.mqttConfiguration().maximumQos()).withRetainAvailable(this.configurationService.mqttConfiguration().retainedMessagesEnabled());
        boolean overridden = msg.getSessionExpiryInterval() > this.configuredSessionExpiryInterval;
        long l = sessionExpiryInterval = overridden ? this.configuredSessionExpiryInterval : msg.getSessionExpiryInterval();
        if (overridden) {
            builder.withSessionExpiryInterval(sessionExpiryInterval);
        }
        if (clientIdAssigned = clientConnection.isClientIdAssigned()) {
            builder.withAssignedClientIdentifier(msg.getClientIdentifier());
        }
        if (msg.getKeepAlive() == 0 && !this.allowZeroKeepAlive || msg.getKeepAlive() > this.serverKeepAliveMaximum) {
            builder.withServerKeepAlive(this.serverKeepAliveMaximum);
            clientConnection.setConnectKeepAlive(this.serverKeepAliveMaximum);
        } else {
            builder.withServerKeepAlive(-1);
            clientConnection.setConnectKeepAlive(msg.getKeepAlive());
        }
        if (this.topicAliasMaximum > 0 && this.topicAliasLimiter.aliasesAvailable()) {
            clientConnection.setTopicAliasMapping(new String[this.topicAliasMaximum]);
            builder.withTopicAliasMaximum(this.topicAliasMaximum);
            this.topicAliasLimiter.initUsage(this.topicAliasMaximum);
        }
        clientConnection.setClientSessionExpiryInterval(sessionExpiryInterval);
        Mqtt5UserProperties userPropertiesFromAuth = clientConnection.getAuthUserProperties();
        if (userPropertiesFromAuth != null) {
            clientConnection.setAuthUserProperties(null);
            builder.withUserProperties(userPropertiesFromAuth);
        }
        if ((authMethod = clientConnection.getAuthMethod()) != null) {
            builder.withAuthMethod(authMethod);
            ByteBuffer authData = clientConnection.getAuthData();
            if (authData != null) {
                clientConnection.setAuthData(null);
                builder.withAuthData(Bytes.fromReadOnlyBuffer(authData));
            }
        }
        return builder.build();
    }

    private void disconnectClientWithSameClientId(final @NotNull ClientConnection clientConnection, final @NotNull ChannelHandlerContext ctx, final @NotNull CONNECT msg) {
        if (clientConnection.getClientState().disconnected()) {
            log.debug("Disconnecting client with same client identifier '{}' failed. Cause: Disconnected before takeover.", (Object)clientConnection.getClientId());
            return;
        }
        ClientConnection persistedClientConnection = this.connectionPersistence.persistIfAbsent(clientConnection);
        if (persistedClientConnection == clientConnection) {
            this.afterTakeover(ctx, clientConnection, msg);
            return;
        }
        persistedClientConnection.getChannel().eventLoop().execute(() -> {
            if (!persistedClientConnection.getClientState().disconnectingOrDisconnected()) {
                this.mqttServerDisconnector.disconnect(persistedClientConnection.getChannel(), "Disconnecting already connected client with id {} and ip {} because another client connects with that id", "Another client connected with the same client id.", Mqtt5DisconnectReasonCode.SESSION_TAKEN_OVER, "Another client connected with the same client id.");
            }
        });
        Futures.addCallback(persistedClientConnection.getDisconnectFuture(), (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                ConnectHandler.this.disconnectClientWithSameClientId(clientConnection, ctx, msg);
            }

            public void onFailure(@NotNull Throwable t) {
                log.warn("Exception on disconnecting client with same client identifier '{}'. Cause: {}", (Object)clientConnection.getClientId(), (Object)t.getMessage());
            }
        }, (Executor)clientConnection.getChannel().eventLoop());
    }

    private void addKeepAliveHandler(@NotNull ChannelHandlerContext ctx, @NotNull CONNECT msg) {
        int keepAlive;
        if (ProtocolVersion.MQTTv5.equals((Object)msg.getProtocolVersion()) && (msg.getKeepAlive() == 0 && !this.allowZeroKeepAlive || msg.getKeepAlive() > this.serverKeepAliveMaximum)) {
            if (log.isTraceEnabled()) {
                log.trace("Client {} used keepAlive {} which is invalid, using server maximum of {}", new Object[]{msg.getClientIdentifier(), msg.getKeepAlive(), this.serverKeepAliveMaximum});
            }
            keepAlive = this.serverKeepAliveMaximum;
        } else {
            keepAlive = msg.getKeepAlive();
        }
        if (keepAlive > 0) {
            Double keepAliveValue = (double)keepAlive * ConnectHandler.getGracePeriod();
            if (log.isTraceEnabled()) {
                log.trace("Client {} specified a keepAlive value of {}s. Using keepAlive of {}s. The maximum timeout before disconnecting is {}s", new Object[]{msg.getClientIdentifier(), msg.getKeepAlive(), keepAlive, keepAliveValue});
            }
            ctx.pipeline().addFirst("mqtt_keepalive_idle_handler", (ChannelHandler)new KeepAliveDisconnectHandler(keepAliveValue.intValue(), TimeUnit.SECONDS, this.keepAliveDisconnectService));
        } else if (log.isTraceEnabled()) {
            log.trace("Client {} specified keepAlive of 0. Disabling PING mechanism", (Object)msg.getClientIdentifier());
        }
    }

    private static double getGracePeriod() {
        return 1.5;
    }

    private static final class UpdatePersistenceCallback
    implements FutureCallback<Void> {
        @NotNull
        private final ChannelHandlerContext ctx;
        @NotNull
        private final ClientConnection clientConnection;
        @NotNull
        private final ConnectHandler connectHandler;
        @NotNull
        private final CONNECT connect;
        private final boolean sessionPresent;

        private UpdatePersistenceCallback(@NotNull ChannelHandlerContext ctx, @NotNull ClientConnection clientConnection, @NotNull ConnectHandler connectHandler, @NotNull CONNECT connect, boolean sessionPresent) {
            this.ctx = ctx;
            this.clientConnection = clientConnection;
            this.connectHandler = connectHandler;
            this.connect = connect;
            this.sessionPresent = sessionPresent;
        }

        public void onSuccess(@Nullable Void aVoid) {
            if (this.ctx.channel().isActive() && !this.ctx.executor().isShutdown()) {
                this.connectHandler.afterPersistSession(this.ctx, this.clientConnection, this.connect, this.sessionPresent);
            }
        }

        public void onFailure(@NotNull Throwable throwable) {
            Exceptions.rethrowError("Unable to handle client connection for id " + this.connect.getClientIdentifier() + ".", throwable);
            this.ctx.channel().disconnect();
        }
    }
}

