package org.apache.activemq.artemis.protocol.amqp.proton;

import io.netty.buffer.ByteBuf;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationAddressSenderController;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueSenderController;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.class */
public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
    public static final String AMQP_CONTAINER_ID = "amqp-container-id";
    protected final ProtonHandler handler;
    private AMQPConnectionCallback connectionCallback;
    private final String containerId;
    private final boolean isIncomingConnection;
    private final ClientSASLFactory saslClientFactory;
    private final Map<Symbol, Object> connectionProperties;
    private final ScheduledExecutorService scheduledPool;
    private final Map<String, LinkCloseListener> linkCloseListeners;
    private final Map<Session, AMQPSessionContext> sessions;
    private final ProtonProtocolManager protocolManager;
    private final boolean useCoreSubscriptionNaming;
    private final boolean bridgeConnection;
    private final ScheduleOperator scheduleOp;
    private final AtomicReference<Future<?>> scheduledFutureRef;
    private String user;
    private String password;
    private String validatedUser;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
    private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> {
    }, null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext$LocalSecurity.class */
    public class LocalSecurity implements SecurityAuth {
        private LocalSecurity() {
        }

        public String getUsername() {
            String str = null;
            SASLResult sASLResult = AMQPConnectionContext.this.getSASLResult();
            if (sASLResult != null) {
                str = sASLResult.getUser();
            }
            return str;
        }

        public String getPassword() {
            String str = null;
            SASLResult sASLResult = AMQPConnectionContext.this.getSASLResult();
            if (sASLResult != null && (sASLResult instanceof PlainSASLResult)) {
                str = ((PlainSASLResult) sASLResult).getPassword();
            }
            return str;
        }

        public RemotingConnection getRemotingConnection() {
            return AMQPConnectionContext.this.connectionCallback.getProtonConnectionDelegate();
        }

        public String getSecurityDomain() {
            return AMQPConnectionContext.this.getProtocolManager().getSecurityDomain();
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext$ScheduleOperator.class */
    class ScheduleOperator implements UnaryOperator<Future<?>> {
        private long delay;
        final ScheduleRunnable scheduleRunnable;

        ScheduleOperator(ScheduleRunnable scheduleRunnable) {
            this.scheduleRunnable = scheduleRunnable;
        }

        @Override // java.util.function.Function
        public Future<?> apply(Future<?> future) {
            if (future != null) {
                return AMQPConnectionContext.this.scheduledPool.schedule(this.scheduleRunnable, this.delay, TimeUnit.MILLISECONDS);
            }
            return null;
        }

        public void setDelay(long j) {
            this.delay = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext$ScheduleRunnable.class */
    public class ScheduleRunnable implements Runnable {
        final TickerRunnable tickerRunnable;

        ScheduleRunnable() {
            this.tickerRunnable = new TickerRunnable();
        }

        @Override // java.lang.Runnable
        public void run() {
            AMQPConnectionContext.this.handler.runLater(this.tickerRunnable);
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext$TickerRunnable.class */
    class TickerRunnable implements Runnable {
        TickerRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Long tick = AMQPConnectionContext.this.handler.tick(false);
            if (tick == null) {
                AMQPConnectionContext.this.scheduleOp.setDelay(10L);
                AMQPConnectionContext.this.scheduledFutureRef.getAndUpdate(AMQPConnectionContext.this.scheduleOp);
            } else if (tick.longValue() != 0) {
                AMQPConnectionContext.this.scheduleOp.setDelay(tick.longValue() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
                AMQPConnectionContext.this.scheduledFutureRef.getAndUpdate(AMQPConnectionContext.this.scheduleOp);
            }
        }
    }

    public void disableAutoRead() {
        this.handler.requireHandler();
        this.connectionCallback.getTransportConnection().setAutoRead(false);
        this.handler.setReadable(false);
    }

    public void enableAutoRead() {
        this.handler.requireHandler();
        this.connectionCallback.getTransportConnection().setAutoRead(true);
        getHandler().setReadable(true);
        flush();
    }

    public AMQPConnectionContext(ProtonProtocolManager protonProtocolManager, AMQPConnectionCallback aMQPConnectionCallback, String str, int i, int i2, int i3, boolean z, ScheduledExecutorService scheduledExecutorService, boolean z2, ClientSASLFactory clientSASLFactory, Map<Symbol, Object> map) {
        this(protonProtocolManager, aMQPConnectionCallback, str, i, i2, i3, z, scheduledExecutorService, z2, clientSASLFactory, map, false);
    }

    public AMQPConnectionContext(ProtonProtocolManager protonProtocolManager, AMQPConnectionCallback aMQPConnectionCallback, String str, int i, int i2, int i3, boolean z, ScheduledExecutorService scheduledExecutorService, boolean z2, ClientSASLFactory clientSASLFactory, Map<Symbol, Object> map, boolean z3) {
        this.connectionProperties = new HashMap();
        this.linkCloseListeners = new ConcurrentHashMap();
        this.sessions = new ConcurrentHashMap();
        this.scheduleOp = new ScheduleOperator(new ScheduleRunnable());
        this.scheduledFutureRef = new AtomicReference<>(VOID_FUTURE);
        this.protocolManager = protonProtocolManager;
        this.bridgeConnection = z3;
        this.connectionCallback = aMQPConnectionCallback;
        this.useCoreSubscriptionNaming = z;
        this.containerId = str != null ? str : UUID.randomUUID().toString();
        this.isIncomingConnection = z2;
        this.saslClientFactory = clientSASLFactory;
        this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
        this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
        if (map != null) {
            this.connectionProperties.putAll(map);
        }
        this.scheduledPool = scheduledExecutorService;
        this.connectionCallback.setConnection(this);
        this.handler = new ProtonHandler(this.connectionCallback.getTransportConnection().getEventLoop(), protonProtocolManager.getServer().getExecutorFactory().getExecutor(), z2 && clientSASLFactory == null);
        this.handler.addEventHandler(this);
        Transport transport = this.handler.getTransport();
        transport.setEmitFlowEventOnSend(false);
        if (i > 0) {
            transport.setIdleTimeout(i);
        }
        transport.setChannelMax(i3);
        transport.setInitialRemoteMaxFrameSize(protonProtocolManager.getInitialRemoteMaxFrameSize());
        transport.setMaxFrameSize(i2);
        transport.setOutboundFrameSizeLimit(i2);
        if (clientSASLFactory != null) {
            this.handler.createClientSASL();
        }
    }

    public boolean isLargeMessageSync() {
        return this.connectionCallback.isLargeMessageSync();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialize() throws Exception {
        this.initialized = true;
    }

    public AMQPConnectionContext addLinkRemoteCloseListener(String str, LinkCloseListener linkCloseListener) {
        this.linkCloseListeners.put(str, linkCloseListener);
        return this;
    }

    public void removeLinkRemoteCloseListener(String str) {
        this.linkCloseListeners.remove(str);
    }

    public void clearLinkRemoteCloseListeners() {
        this.linkCloseListeners.clear();
    }

    public boolean isBridgeConnection() {
        return this.bridgeConnection;
    }

    public void requireInHandler() {
        this.handler.requireHandler();
    }

    public boolean isHandler() {
        return this.handler.isHandler();
    }

    public void scheduledFlush() {
        this.handler.scheduledFlush();
    }

    public boolean isIncomingConnection() {
        return this.isIncomingConnection;
    }

    public ClientSASLFactory getSaslClientFactory() {
        return this.saslClientFactory;
    }

    protected AMQPSessionContext newSessionExtension(Session session) throws ActiveMQAMQPException {
        return new AMQPSessionContext(this.connectionCallback.createSessionCallback(this), this, session, this.protocolManager.getServer());
    }

    public Map<Session, AMQPSessionContext> getSessions() {
        return this.sessions;
    }

    public SecurityAuth getSecurityAuth() {
        return new LocalSecurity();
    }

    public SASLResult getSASLResult() {
        return this.handler.getSASLResult();
    }

    public void inputBuffer(ByteBuf byteBuf) {
        if (logger.isTraceEnabled()) {
            ByteUtil.debugFrame(logger, "Buffer Received ", byteBuf);
        }
        this.handler.inputBuffer(byteBuf);
    }

    public ProtonHandler getHandler() {
        return this.handler;
    }

    public String getUser() {
        return this.user;
    }

    public String getPassword() {
        return this.password;
    }

    public String getValidatedUser() {
        return this.validatedUser;
    }

    public void destroy() {
        this.handler.runLater(() -> {
            this.connectionCallback.close();
        });
    }

    public boolean isSyncOnFlush() {
        return false;
    }

    public void instantFlush() {
        this.handler.instantFlush();
    }

    public void flush() {
        this.handler.flush();
    }

    public void afterFlush(Runnable runnable) {
        this.handler.afterFlush(runnable);
    }

    public void close(ErrorCondition errorCondition) {
        Future<?> andSet = this.scheduledFutureRef.getAndSet(null);
        if ((this.scheduledPool instanceof ThreadPoolExecutor) && andSet != null && andSet != VOID_FUTURE && (andSet instanceof Runnable) && !((ThreadPoolExecutor) this.scheduledPool).remove((Runnable) andSet) && !andSet.isCancelled() && !andSet.isDone()) {
            ActiveMQAMQPProtocolLogger.LOGGER.cantRemovingScheduledTask();
        }
        this.handler.close(errorCondition, this);
    }

    public AMQPSessionContext getSessionExtension(Session session) throws ActiveMQAMQPException {
        AMQPSessionContext aMQPSessionContext = this.sessions.get(session);
        if (aMQPSessionContext == null) {
            aMQPSessionContext = newSessionExtension(session);
            session.setContext(aMQPSessionContext);
            this.sessions.put(session, aMQPSessionContext);
        }
        return aMQPSessionContext;
    }

    public void runOnPool(Runnable runnable) {
        this.handler.runOnPool(runnable);
    }

    public void runNow(Runnable runnable) {
        this.handler.runNow(runnable);
    }

    public void runLater(Runnable runnable) {
        this.handler.runLater(runnable);
    }

    protected boolean validateConnection(Connection connection) {
        return this.connectionCallback.validateConnection(connection, this.handler.getSASLResult());
    }

    public boolean checkDataReceived() {
        return this.handler.checkDataReceived();
    }

    public long getCreationTime() {
        return this.handler.getCreationTime();
    }

    public String getRemoteContainer() {
        return this.handler.getConnection().getRemoteContainer();
    }

    public String getPubSubPrefix() {
        return null;
    }

    protected void initInternal() throws Exception {
    }

    public AMQPConnectionCallback getConnectionCallback() {
        return this.connectionCallback;
    }

    protected void remoteLinkOpened(Link link) throws Exception {
        AMQPSessionContext sessionExtension = getSessionExtension(link.getSession());
        Runnable runnable = (Runnable) link.attachments().get(AmqpSupport.AMQP_LINK_INITIALIZER_KEY, Runnable.class);
        if (runnable != null) {
            link.attachments().set(AmqpSupport.AMQP_LINK_INITIALIZER_KEY, Runnable.class, (Object) null);
            runnable.run();
            return;
        }
        if (link.getLocalState() == EndpointState.ACTIVE) {
            return;
        }
        link.setSource(link.getRemoteSource());
        link.setTarget(link.getRemoteTarget());
        if (!(link instanceof Receiver)) {
            Sender sender = (Sender) link;
            if (isFederationAddressReceiver(sender)) {
                sessionExtension.addSender(sender, new AMQPFederationAddressSenderController(sessionExtension));
                return;
            }
            if (isFederationQueueReceiver(sender)) {
                sessionExtension.addSender(sender, new AMQPFederationQueueSenderController(sessionExtension));
                return;
            } else if (isFederationEventLink(sender)) {
                sessionExtension.addFederationEventDispatcher(sender);
                return;
            } else {
                sessionExtension.addSender(sender);
                return;
            }
        }
        Receiver receiver = (Receiver) link;
        if (link.getRemoteTarget() instanceof Coordinator) {
            sessionExtension.addTransactionHandler((Coordinator) link.getRemoteTarget(), receiver);
            return;
        }
        if (isReplicaTarget(receiver)) {
            handleReplicaTargetLinkOpened(sessionExtension, receiver);
            return;
        }
        if (isFederationControlLink(receiver)) {
            handleFederationControlLinkOpened(sessionExtension, receiver);
        } else if (isFederationEventLink(receiver)) {
            sessionExtension.addFederationEventProcessor(receiver);
        } else {
            sessionExtension.addReceiver(receiver);
        }
    }

    /* JADX WARN: Type inference failed for: r9v0, types: [java.lang.Throwable, org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException] */
    private void handleReplicaTargetLinkOpened(AMQPSessionContext aMQPSessionContext, Receiver receiver) throws Exception {
        try {
            try {
                aMQPSessionContext.getSessionSPI().check(SimpleString.of(receiver.getTarget().getAddress()), CheckType.SEND, getSecurityAuth());
                if (!AmqpSupport.verifyDesiredCapability(receiver, AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingDesiredCapability(AMQPMirrorControllerSource.MIRROR_CAPABILITY.toString());
                }
                if (AmqpSupport.verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) {
                    receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
                } else {
                    receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
                }
                aMQPSessionContext.addReplicaTarget(receiver);
            } catch (ActiveMQSecurityException e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingProducer(e.getMessage());
            }
        } catch (ActiveMQAMQPException e2) {
            logger.warn(e2.getMessage(), (Throwable) e2);
            receiver.setTarget((Target) null);
            receiver.setCondition(new ErrorCondition(e2.getAmqpError(), e2.getMessage()));
            receiver.close();
        }
    }

    /* JADX WARN: Type inference failed for: r9v1, types: [java.lang.Throwable, org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException] */
    private void handleFederationControlLinkOpened(AMQPSessionContext aMQPSessionContext, Receiver receiver) throws Exception {
        try {
            try {
                aMQPSessionContext.getSessionSPI().check(SimpleString.of(AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS), CheckType.SEND, getSecurityAuth());
                aMQPSessionContext.addFederationCommandProcessor(receiver);
            } catch (ActiveMQSecurityException e) {
                throw new ActiveMQAMQPSecurityException("User does not have permission to attach to the federation control address");
            }
        } catch (ActiveMQAMQPException e2) {
            logger.warn(e2.getMessage(), (Throwable) e2);
            receiver.setTarget((Target) null);
            receiver.setCondition(new ErrorCondition(e2.getAmqpError(), e2.getMessage()));
            receiver.close();
        }
    }

    private static boolean isReplicaTarget(Link link) {
        return (link == null || link.getTarget() == null || link.getTarget().getAddress() == null || !link.getTarget().getAddress().startsWith(ProtonProtocolManager.MIRROR_ADDRESS)) ? false : true;
    }

    private static boolean isFederationControlLink(Receiver receiver) {
        return AmqpSupport.verifyDesiredCapability(receiver, AMQPFederationConstants.FEDERATION_CONTROL_LINK);
    }

    private static boolean isFederationEventLink(Sender sender) {
        return AmqpSupport.verifyDesiredCapability(sender, AMQPFederationConstants.FEDERATION_EVENT_LINK);
    }

    private static boolean isFederationEventLink(Receiver receiver) {
        return AmqpSupport.verifyDesiredCapability(receiver, AMQPFederationConstants.FEDERATION_EVENT_LINK);
    }

    private static boolean isFederationQueueReceiver(Sender sender) {
        return AmqpSupport.verifyDesiredCapability(sender, AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER);
    }

    private static boolean isFederationAddressReceiver(Sender sender) {
        return AmqpSupport.verifyDesiredCapability(sender, AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER);
    }

    public Symbol[] getConnectionCapabilitiesOffered() {
        URI failoverList = this.connectionCallback.getFailoverList();
        if (failoverList != null) {
            HashMap hashMap = new HashMap();
            hashMap.put(AmqpSupport.NETWORK_HOST, failoverList.getHost());
            if (failoverList.getQuery().contains("sslEnabled=true")) {
                hashMap.put(AmqpSupport.SCHEME, "amqps");
            } else {
                hashMap.put(AmqpSupport.SCHEME, "amqp");
            }
            hashMap.put(AmqpSupport.HOSTNAME, failoverList.getHost());
            hashMap.put(AmqpSupport.PORT, Integer.valueOf(failoverList.getPort()));
            this.connectionProperties.put(AmqpSupport.FAILOVER_SERVER_LIST, Arrays.asList(hashMap));
        }
        return ExtCapability.getCapabilities();
    }

    public void open() {
        this.handler.open(this.containerId, this.connectionProperties);
    }

    public String getContainer() {
        return this.containerId;
    }

    public void addEventHandler(EventHandler eventHandler) {
        this.handler.addEventHandler(eventHandler);
    }

    public ProtonProtocolManager getProtocolManager() {
        return this.protocolManager;
    }

    public int getAmqpLowCredits() {
        return this.protocolManager != null ? this.protocolManager.getAmqpLowCredits() : AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
    }

    public int getAmqpCredits() {
        return this.protocolManager != null ? this.protocolManager.getAmqpCredits() : AmqpSupport.AMQP_CREDITS_DEFAULT;
    }

    public boolean isUseCoreSubscriptionNaming() {
        return this.useCoreSubscriptionNaming;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onAuthInit(ProtonHandler protonHandler, Connection connection, boolean z) {
        if (z) {
            String[] saslMechanisms = this.connectionCallback.getSaslMechanisms();
            if (saslMechanisms == null || saslMechanisms.length == 0) {
                saslMechanisms = AnonymousServerSASL.ANONYMOUS_MECH;
            }
            protonHandler.createServerSASL(saslMechanisms);
            return;
        }
        if (this.connectionCallback.isSupportsAnonymous()) {
            return;
        }
        this.connectionCallback.sendSASLSupported();
        this.connectionCallback.close();
        protonHandler.close(null, this);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onSaslRemoteMechanismChosen(ProtonHandler protonHandler, String str) {
        protonHandler.setChosenMechanism(this.connectionCallback.getServerSASL(str));
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onSaslMechanismsOffered(ProtonHandler protonHandler, String[] strArr) {
        if (this.saslClientFactory != null) {
            protonHandler.setClientMechanism(this.saslClientFactory.chooseMechanism(strArr));
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onAuthFailed(ProtonHandler protonHandler, Connection connection) {
        this.connectionCallback.close();
        this.handler.close(null, this);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onAuthSuccess(ProtonHandler protonHandler, Connection connection) {
        connection.open();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onTransport(Transport transport) {
        this.handler.flushBytes();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void pushBytes(ByteBuf byteBuf) {
        this.connectionCallback.onTransport(byteBuf, this);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public boolean flowControl(ReadyListener readyListener) {
        return this.connectionCallback.isWritable(readyListener);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public String getRemoteAddress() {
        return this.connectionCallback.getTransportConnection().getRemoteAddress();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteOpen(Connection connection) throws Exception {
        this.handler.requireHandler();
        try {
            initInternal();
        } catch (Exception e) {
            logger.error("Error init connection", e);
        }
        if (!validateUser(connection) || ((this.connectionCallback.getTransportConnection().getRouter() != null && this.protocolManager.m16getRoutingHandler().route(this, connection)) || !validateConnection(connection))) {
            connection.close();
        } else {
            connection.setContext(this);
            connection.setContainer(this.containerId);
            connection.setProperties(this.connectionProperties);
            connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
            connection.open();
        }
        initialize();
        if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
            long longValue = this.handler.tick(true).longValue();
            if (longValue == 0 || this.scheduledPool == null) {
                return;
            }
            this.scheduleOp.setDelay(longValue - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
            this.scheduledFutureRef.getAndUpdate(this.scheduleOp);
        }
    }

    private boolean validateUser(Connection connection) throws Exception {
        this.user = null;
        this.password = null;
        this.validatedUser = null;
        SASLResult sASLResult = getSASLResult();
        if (sASLResult != null) {
            this.user = sASLResult.getUser();
            if (sASLResult instanceof PlainSASLResult) {
                this.password = ((PlainSASLResult) sASLResult).getPassword();
            }
        }
        if (!isIncomingConnection() || this.saslClientFactory != null || isBridgeConnection()) {
            return true;
        }
        try {
            this.validatedUser = this.protocolManager.getServer().validateUser(this.user, this.password, this.connectionCallback.getProtonConnectionDelegate(), this.protocolManager.getSecurityDomain());
            return true;
        } catch (ActiveMQSecurityException e) {
            ErrorCondition errorCondition = new ErrorCondition();
            errorCondition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
            errorCondition.setDescription(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage());
            connection.setCondition(errorCondition);
            connection.setProperties(Collections.singletonMap(AmqpSupport.CONNECTION_OPEN_FAILED, true));
            return false;
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onTransportError(Transport transport) throws Exception {
        String description = (transport.getCondition() == null || transport.getCondition().getDescription() == null) ? "Unknown Internal Error" : transport.getCondition().getDescription();
        runLater(() -> {
            this.connectionCallback.getProtonConnectionDelegate().fail(new ActiveMQAMQPInternalErrorException(description));
        });
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onLocalClose(Connection connection) {
        this.handler.requireHandler();
        if (this.connectionCallback.getProtonConnectionDelegate().isDestroyed()) {
            Iterator<AMQPSessionContext> it = this.sessions.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                    logger.trace("Caught error while handling local connection close: ", e);
                }
            }
            this.sessions.clear();
            this.handler.flushBytes();
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteClose(Connection connection) {
        this.handler.requireHandler();
        connection.close();
        connection.free();
        Iterator<AMQPSessionContext> it = this.sessions.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.sessions.clear();
        this.handler.flushBytes();
        destroy();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onLocalOpen(Session session) throws Exception {
        AMQPSessionContext sessionExtension = getSessionExtension(session);
        if (this.bridgeConnection) {
            sessionExtension.initialize();
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteOpen(Session session) throws Exception {
        if (session.getConnection().getLocalState() != EndpointState.CLOSED) {
            this.handler.requireHandler();
            getSessionExtension(session).initialize();
            session.open();
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteClose(Session session) throws Exception {
        this.handler.runLater(() -> {
            session.close();
            session.free();
            AMQPSessionContext aMQPSessionContext = (AMQPSessionContext) session.getContext();
            if (aMQPSessionContext != null) {
                aMQPSessionContext.close();
                this.sessions.remove(session);
                session.setContext((Object) null);
            }
        });
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteOpen(Link link) throws Exception {
        if (link.getSession().getConnection().getLocalState() != EndpointState.CLOSED) {
            remoteLinkOpened(link);
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onFlow(Link link) throws Exception {
        if (link.getContext() != null) {
            ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteClose(Link link) throws Exception {
        this.handler.requireHandler();
        AtomicReference atomicReference = new AtomicReference();
        this.linkCloseListeners.forEach((str, linkCloseListener) -> {
            try {
                linkCloseListener.onClose(link);
            } catch (Exception e) {
                atomicReference.compareAndSet(null, e);
            }
        });
        ProtonDeliveryHandler protonDeliveryHandler = (ProtonDeliveryHandler) link.getContext();
        if (protonDeliveryHandler != null) {
            try {
                protonDeliveryHandler.close(true);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        link.close();
        link.free();
        flush();
        if (atomicReference.get() != null) {
            throw ((Exception) atomicReference.get());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onRemoteDetach(Link link) throws Exception {
        this.handler.requireHandler();
        if (link.getSource() != null && link.getSource().getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH) {
            onRemoteClose(link);
        } else {
            link.detach();
            link.free();
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onLocalDetach(Link link) throws Exception {
        this.handler.requireHandler();
        Object context = link.getContext();
        if (context instanceof ProtonServerSenderContext) {
            ((ProtonServerSenderContext) context).close(false);
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler
    public void onDelivery(Delivery delivery) throws Exception {
        this.handler.requireHandler();
        ProtonDeliveryHandler protonDeliveryHandler = (ProtonDeliveryHandler) delivery.getLink().getContext();
        if (protonDeliveryHandler != null) {
            protonDeliveryHandler.onMessage(delivery);
        } else {
            logger.warn("Handler is null, can't delivery {}", delivery, new Exception("tracing location"));
        }
    }
}
