package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotImplementedException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.class */
public final class AMQPFederationAddressSenderController extends AMQPFederationBaseSenderController {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ProtonServerSenderContext senderContext;

    public AMQPFederationAddressSenderController(AMQPSessionContext aMQPSessionContext) throws ActiveMQAMQPException {
        super(aMQPSessionContext);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public Consumer init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
        String str;
        Sender sender = protonServerSenderContext.getSender();
        Source source = (Source) sender.getRemoteSource();
        SimpleString of = SimpleString.of(sender.getName());
        Connection connection = this.session.getSession().getConnection();
        if (this.federation == null) {
            throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
        }
        if (source == null) {
            throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links.");
        }
        this.senderContext = protonServerSenderContext;
        sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());
        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        sender.setOfferedCapabilities(new Symbol[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER});
        sender.setDesiredCapabilities(new Symbol[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
        this.tunnelCoreMessages = AmqpSupport.verifyOfferedCapabilities(sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
        Map map = (sender.getRemoteProperties() == null || !sender.getRemoteProperties().containsKey(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES)) ? Collections.EMPTY_MAP : (Map) sender.getRemoteProperties().get(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES);
        boolean booleanValue = ((Boolean) map.getOrDefault(AMQPFederationConstants.ADDRESS_AUTO_DELETE, false)).booleanValue();
        long longValue = ((Number) map.getOrDefault(AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY, 0)).longValue();
        long longValue2 = ((Number) map.getOrDefault(AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT, 0)).longValue();
        String jMSSelectorFromFilters = getJMSSelectorFromFilters(source);
        if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
            String str2 = "__AMQ_CID<>'" + connection.getRemoteContainer() + "'";
            str = jMSSelectorFromFilters == null ? str2 : jMSSelectorFromFilters + " AND " + str2;
        } else {
            str = jMSSelectorFromFilters;
        }
        SimpleString of2 = SimpleString.of(source.getAddress());
        try {
            AddressQueryResult addressQuery = this.sessionSPI.addressQuery(of2, RoutingType.MULTICAST, true);
            if (!addressQuery.isExists()) {
                this.federation.registerMissingAddress(of2.toString());
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
            }
            if (!addressQuery.getRoutingTypes().contains(RoutingType.MULTICAST)) {
                throw new ActiveMQAMQPIllegalStateException("Address " + of2 + " is not configured for MULTICAST support");
            }
            RoutingType routingType = getRoutingType(source);
            QueueQueryResult queueQuery = this.sessionSPI.queueQuery(of, routingType, false);
            if (!queueQuery.isExists()) {
                QueueConfiguration of3 = QueueConfiguration.of(of);
                of3.setAddress(of2);
                of3.setRoutingType(routingType);
                of3.setAutoCreateAddress(false);
                of3.setMaxConsumers(-1);
                of3.setPurgeOnNoConsumers(false);
                of3.setFilterString(str);
                of3.setDurable(true);
                of3.setAutoCreated(false);
                of3.setAutoDelete(Boolean.valueOf(booleanValue));
                of3.setAutoDeleteDelay(Long.valueOf(longValue));
                of3.setAutoDeleteMessageCount(Long.valueOf(longValue2));
                queueQuery = this.sessionSPI.queueQuery(of3, true);
            }
            if (!queueQuery.getAddress().equals(of2)) {
                throw new ActiveMQAMQPIllegalStateException("Requested queue: " + of + " for federation of address: " + of2 + ", but it is already mapped to a different address: " + queueQuery.getAddress());
            }
            this.resourceDeletedAction = errorCondition -> {
                this.federation.registerMissingAddress(of2.toString());
            };
            registerRemoteLinkClosedInterceptor(sender);
            return this.sessionSPI.createSender(protonServerSenderContext, of, null, false);
        } catch (Exception e) {
            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
        } catch (ActiveMQAMQPException e2) {
            throw e2;
        } catch (ActiveMQSecurityException e3) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e3.getMessage());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationBaseSenderController
    protected void handleLinkRemotelyClosed() {
        deleteAddressFederationBindingIfPresent();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationBaseSenderController
    protected void handleLinkLocallyClosed(ErrorCondition errorCondition) {
        deleteAddressFederationBindingIfPresent();
    }

    private void deleteAddressFederationBindingIfPresent() {
        if (this.senderContext == null) {
            return;
        }
        try {
            Sender sender = this.senderContext.getSender();
            Source remoteSource = sender.getRemoteSource();
            SimpleString of = SimpleString.of(sender.getName());
            if (this.sessionSPI.queueQuery(of, getRoutingType(remoteSource), false).isExists()) {
                this.sessionSPI.deleteQueue(of);
            }
        } catch (Exception e) {
            logger.debug("Federation address sender link closed cleanup caught error: ", e);
        }
    }

    private String getJMSSelectorFromFilters(Source source) throws ActiveMQAMQPException {
        Map.Entry<Symbol, DescribedType> findFilter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
        String str = null;
        if (findFilter != null) {
            str = findFilter.getValue().getDescribed().toString();
            try {
                SelectorParser.parse(str);
            } catch (FilterException e) {
                throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }
        }
        return str;
    }

    private static RoutingType getRoutingType(Source source) {
        if (source != null && source.getCapabilities() != null) {
            for (Symbol symbol : source.getCapabilities()) {
                if (AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
                    return RoutingType.MULTICAST;
                }
                if (AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
                    return RoutingType.ANYCAST;
                }
            }
        }
        return ActiveMQDefaultConfiguration.getDefaultRoutingType();
    }
}
