package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.class */
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
    private final MpscPool<ACKMessageOperation> ackMessageMpscPool;
    final RoutingContextImpl routingContext;
    final BasicMirrorController<Receiver> basicController;
    final ActiveMQServer server;
    final Configuration configuration;
    DuplicateIDCache lruduplicateIDCache;
    String lruDuplicateIDKey;
    private final ReferenceIDSupplier referenceNodeStore;
    OperationContext mirrorContext;
    private MessageReader coreMessageReader;
    private MessageReader coreLargeMessageReader;
    private AckManager ackManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget$ACKMessageOperation.class */
    public class ACKMessageOperation implements IOCallback, Runnable {
        Delivery delivery;
        public TransactionOperationAbstract tx = new TransactionOperationAbstract() { // from class: org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.ACKMessageOperation.1
            public void afterCommit(Transaction transaction) {
                ACKMessageOperation.this.connectionRun();
            }
        };

        ACKMessageOperation() {
        }

        void reset() {
            this.delivery = null;
        }

        ACKMessageOperation setDelivery(Delivery delivery) {
            this.delivery = delivery;
            return this;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!AMQPMirrorControllerTarget.this.connection.isHandler()) {
                AMQPMirrorControllerTarget.logger.info("Moving execution to proton handler");
                connectionRun();
                return;
            }
            AMQPMirrorControllerTarget.logger.trace("Delivery settling for {}, context={}", this.delivery, this.delivery.getContext());
            this.delivery.disposition(Accepted.getInstance());
            AMQPMirrorControllerTarget.this.settle(this.delivery);
            AMQPMirrorControllerTarget.this.connection.flush();
            AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(this);
        }

        public void done() {
            connectionRun();
        }

        public void connectionRun() {
            AMQPMirrorControllerTarget.this.connection.runNow(this);
        }

        public void onError(int i, String str) {
            AMQPMirrorControllerTarget.logger.warn("{}-{}", Integer.valueOf(i), str);
        }
    }

    public static void setControllerInUse(MirrorController mirrorController) {
        CONTROLLER_THREAD_LOCAL.set(mirrorController);
    }

    public static MirrorController getControllerInUse() {
        return CONTROLLER_THREAD_LOCAL.get();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver, org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
        super.close(z);
        AckManager ackManager = this.ackManager;
        if (ackManager != null) {
            ackManager.unregisterMirror(this);
        }
    }

    public void flush() {
        long j;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.runNow(() -> {
            OperationContext context = OperationContextImpl.getContext();
            try {
                OperationContextImpl.setContext(this.mirrorContext);
                this.mirrorContext.executeOnCompletion(new IOCallback() { // from class: org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.1
                    public void done() {
                        countDownLatch.countDown();
                    }

                    public void onError(int i, String str) {
                        AMQPMirrorControllerTarget.logger.warn("IO Error code on flushing OperationContext for AMQPMirrorControllerTarget . error code = {} / message = {}", Integer.valueOf(i), str);
                        countDownLatch.countDown();
                    }
                });
            } finally {
                OperationContextImpl.setContext(context);
            }
        });
        try {
            j = this.connection.getProtocolManager().getAckManagerFlushTimeout();
        } catch (Throwable th) {
            logger.warn("Could not access the connection and protocol manager, using a default timeout of 10 seconds for AckManagerFlushTimeout", th);
            j = 10000;
        }
        try {
            if (!countDownLatch.await(j, TimeUnit.MILLISECONDS)) {
                ActiveMQAMQPProtocolLogger.LOGGER.timedOutAckManager(j);
            }
        } catch (InterruptedException e) {
            ActiveMQAMQPProtocolLogger.LOGGER.interruptedAckManager(e);
            Thread.currentThread().interrupt();
        }
    }

    public AMQPMirrorControllerTarget(AMQPSessionCallback aMQPSessionCallback, AMQPConnectionContext aMQPConnectionContext, AMQPSessionContext aMQPSessionContext, Receiver receiver, ActiveMQServer activeMQServer) {
        super(aMQPSessionCallback, aMQPConnectionContext, aMQPSessionContext, receiver);
        this.ackMessageMpscPool = new MpscPool<>(this.connection.getAmqpCredits(), (v0) -> {
            v0.reset();
        }, () -> {
            return new ACKMessageOperation();
        });
        this.routingContext = new RoutingContextImpl((Transaction) null);
        this.basicController = new BasicMirrorController<>(activeMQServer);
        this.basicController.setLink(receiver);
        this.server = activeMQServer;
        this.configuration = activeMQServer.getConfiguration();
        this.referenceNodeStore = aMQPSessionCallback.getProtocolManager().getReferenceIDSupplier();
        this.mirrorContext = aMQPSessionContext.getSessionSPI().getSessionContext();
    }

    public String getRemoteMirrorId() {
        return this.basicController.getRemoteMirrorId();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    public void flow() {
        this.creditRunnable.run();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction transaction) {
        OperationContext recoverContext = recoverContext();
        incrementSettle();
        logger.trace("{}::actualDelivery call for {}", this.server, message);
        setControllerInUse(this);
        delivery.setContext(message);
        ACKMessageOperation delivery2 = ((ACKMessageOperation) this.ackMessageMpscPool.borrow()).setDelivery(delivery);
        try {
            try {
                if (message instanceof AMQPMessage) {
                    AMQPMessage aMQPMessage = (AMQPMessage) message;
                    Object messageAnnotationProperty = AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.EVENT_TYPE);
                    if (messageAnnotationProperty != null) {
                        if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.ADD_ADDRESS)) {
                            addAddress(parseAddress(aMQPMessage));
                        } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.DELETE_ADDRESS)) {
                            deleteAddress(parseAddress(aMQPMessage));
                        } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.CREATE_QUEUE)) {
                            createQueue(parseQueue(aMQPMessage));
                        } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.DELETE_QUEUE)) {
                            deleteQueue(SimpleString.of((String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.ADDRESS)), SimpleString.of((String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.QUEUE)));
                        } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.POST_ACK)) {
                            String str = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.BROKER_ID);
                            logger.trace("ACK towards NodeID = {}, while the localNodeID={}", str, this.server.getNodeID());
                            AckReason messageAnnotationAckReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(aMQPMessage);
                            if (str == null) {
                                str = getRemoteMirrorId();
                                logger.trace("Replacing nodeID by {}", str);
                            }
                            if (postAcknowledge((String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.QUEUE), str, ((Long) aMQPMessage.getBody().getValue()).longValue(), delivery2, messageAnnotationAckReason)) {
                                delivery2 = null;
                            }
                        }
                    } else if (sendMessage(aMQPMessage, deliveryAnnotations, delivery2)) {
                        delivery2 = null;
                    }
                } else if (sendMessage(message, deliveryAnnotations, delivery2)) {
                    delivery2 = null;
                }
                setControllerInUse(null);
                if (delivery2 != null) {
                    this.server.getStorageManager().afterCompleteOperations(delivery2, OperationConsistencyLevel.FULL);
                }
                OperationContextImpl.setContext(recoverContext);
            } catch (Throwable th) {
                logger.warn(th.getMessage(), th);
                setControllerInUse(null);
                if (delivery2 != null) {
                    this.server.getStorageManager().afterCompleteOperations(delivery2, OperationConsistencyLevel.FULL);
                }
                OperationContextImpl.setContext(recoverContext);
            }
        } catch (Throwable th2) {
            setControllerInUse(null);
            if (delivery2 != null) {
                this.server.getStorageManager().afterCompleteOperations(delivery2, OperationConsistencyLevel.FULL);
            }
            OperationContextImpl.setContext(recoverContext);
            throw th2;
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialize() throws Exception {
        this.initialized = true;
        this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
        this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        flow();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected MessageReader trySelectMessageReader(Receiver receiver, Delivery delivery) {
        if (delivery.getMessageFormat() == 1183580416) {
            if (this.coreMessageReader != null) {
                return this.coreMessageReader;
            }
            AMQPTunneledCoreMessageReader aMQPTunneledCoreMessageReader = new AMQPTunneledCoreMessageReader(this);
            this.coreMessageReader = aMQPTunneledCoreMessageReader;
            return aMQPTunneledCoreMessageReader;
        }
        if (delivery.getMessageFormat() != 1183580672) {
            return super.trySelectMessageReader(receiver, delivery);
        }
        if (this.coreLargeMessageReader != null) {
            return this.coreLargeMessageReader;
        }
        AMQPTunneledCoreLargeMessageReader aMQPTunneledCoreLargeMessageReader = new AMQPTunneledCoreLargeMessageReader(this);
        this.coreLargeMessageReader = aMQPTunneledCoreLargeMessageReader;
        return aMQPTunneledCoreLargeMessageReader;
    }

    private QueueConfiguration parseQueue(AMQPMessage aMQPMessage) {
        return QueueConfiguration.fromJSON((String) aMQPMessage.getBody().getValue());
    }

    private AddressInfo parseAddress(AMQPMessage aMQPMessage) {
        return AddressInfo.fromJSON((String) aMQPMessage.getBody().getValue());
    }

    public void preAcknowledge(Transaction transaction, MessageReference messageReference, AckReason ackReason) throws Exception {
    }

    public void addAddress(AddressInfo addressInfo) throws Exception {
        logger.debug("{} adding address {}", this.server, addressInfo);
        this.server.addAddressInfo(addressInfo);
    }

    public void deleteAddress(AddressInfo addressInfo) throws Exception {
        logger.debug("{} delete address {}", this.server, addressInfo);
        try {
            this.server.removeAddressInfo(addressInfo.getName(), (SecurityAuth) null, true);
        } catch (ActiveMQAddressDoesNotExistException e) {
            logger.debug(e.getMessage(), e);
        } catch (Exception e2) {
            logger.warn(e2.getMessage(), e2);
        }
    }

    public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
        logger.debug("{} adding queue {}", this.server, queueConfiguration);
        try {
            this.server.createQueue(queueConfiguration, true);
        } catch (Exception e) {
            logger.debug("Queue could not be created, already existed {}", queueConfiguration, e);
        }
    }

    public void deleteQueue(SimpleString simpleString, SimpleString simpleString2) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("{} destroy queue {} on address = {} server {}", new Object[]{this.server, simpleString2, simpleString, this.server.getIdentity()});
        }
        try {
            this.server.destroyQueue(simpleString2, (SecurityAuth) null, false, true, false, false);
        } catch (ActiveMQNonExistentQueueException e) {
            if (logger.isDebugEnabled()) {
                logger.debug("{} queue {} was previously removed", new Object[]{this.server, simpleString2, e});
            }
        }
    }

    public boolean postAcknowledge(String str, String str2, long j, ACKMessageOperation aCKMessageOperation, AckReason ackReason) throws Exception {
        Queue locateQueue = this.server.locateQueue(str);
        if (locateQueue == null) {
            logger.warn("Queue {} not found on mirror target, ignoring ack for queue={}, messageID={}, nodeID={}", new Object[]{str, str, Long.valueOf(j), str2});
            return false;
        }
        if (logger.isDebugEnabled() && locateQueue.getConsumerCount() > 0) {
            logger.debug("server {}, queue {} has consumers while delivering ack for {}", new Object[]{this.server.getIdentity(), locateQueue.getName(), Long.valueOf(j)});
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Server {} with queue = {} being acked for {} from {} targetQueue = {} reason = {}", new Object[]{this.server.getIdentity(), str, Long.valueOf(j), aCKMessageOperation, locateQueue, ackReason});
        }
        performAck(str2, locateQueue, j, aCKMessageOperation, ackReason);
        return true;
    }

    private void performAck(String str, Queue queue, long j, ACKMessageOperation aCKMessageOperation, AckReason ackReason) {
        if (logger.isTraceEnabled()) {
            logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", new Object[]{str, Long.valueOf(j), queue.getName()});
        }
        if (this.ackManager == null) {
            this.ackManager = AckManagerProvider.getManager(this.server);
            this.ackManager.registerMirror(this);
        }
        this.ackManager.ack(str, queue, j, ackReason, true);
        OperationContextImpl.getContext().executeOnCompletion(aCKMessageOperation, OperationConsistencyLevel.FULL);
    }

    private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation aCKMessageOperation) throws Exception {
        DuplicateIDCache duplicateIDCache;
        if (message.getMessageID() <= 0) {
            message.setMessageID(this.server.getStorageManager().generateID());
        }
        String str = (String) deliveryAnnotations.getValue().get(AMQPMirrorControllerSource.BROKER_ID);
        if (str == null) {
            str = getRemoteMirrorId();
        }
        Long l = (Long) deliveryAnnotations.getValue().get(AMQPMirrorControllerSource.INTERNAL_ID);
        String str2 = (String) deliveryAnnotations.getValue().get(AMQPMirrorControllerSource.INTERNAL_DESTINATION);
        Collection<String> collection = (Collection) deliveryAnnotations.getValue().get(AMQPMirrorControllerSource.TARGET_QUEUES);
        long j = 0;
        if (l != null) {
            j = l.longValue();
        }
        if (logger.isTraceEnabled()) {
            logger.trace("sendMessage on server {} for message {} with internalID = {} mirror id {}", new Object[]{this.server, message, l, str});
        }
        this.routingContext.setDuplicateDetection(false);
        if (this.lruDuplicateIDKey == null || !this.lruDuplicateIDKey.equals(str)) {
            logger.trace("Setting up duplicate detection cache on {}, ServerID={} with {} elements, being the number of credits", new Object[]{ProtonProtocolManager.MIRROR_ADDRESS, str, Integer.valueOf(this.connection.getAmqpCredits())});
            this.lruDuplicateIDKey = str;
            this.lruduplicateIDCache = this.server.getPostOffice().getDuplicateIDCache(SimpleString.of("$ACTIVEMQ_ARTEMIS_MIRROR_" + str), this.connection.getAmqpCredits());
            duplicateIDCache = this.lruduplicateIDCache;
        } else {
            duplicateIDCache = this.lruduplicateIDCache;
        }
        byte[] longToBytes = ByteUtil.longToBytes(l.longValue());
        if (duplicateIDCache.contains(longToBytes)) {
            message.usageDown();
            flow();
            return false;
        }
        message.setBrokerProperty(AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY, Long.valueOf(j));
        message.setBrokerProperty(AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY, str);
        if (str2 != null) {
            message.setAddress(str2);
        }
        TransactionImpl async = new MirrorTransaction(this.server.getStorageManager()).setAllowPageTransaction(this.configuration.isMirrorPageTransaction()).setAsync(true);
        this.routingContext.setTransaction(async);
        duplicateIDCache.addToCache(longToBytes, async);
        this.routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY).disableDivert();
        if (collection != null) {
            targetQueuesRouting(message, this.routingContext, collection);
            this.server.getPostOffice().processRoute(message, this.routingContext, false);
        } else {
            this.server.getPostOffice().route(message, this.routingContext, false);
        }
        async.commit();
        this.server.getStorageManager().afterCompleteOperations(aCKMessageOperation, OperationConsistencyLevel.FULL);
        flow();
        return true;
    }

    private void targetQueuesRouting(Message message, RoutingContext routingContext, Collection<String> collection) throws Exception {
        Bindings bindingsForAddress = this.server.getPostOffice().getBindingsForAddress(message.getAddressSimpleString());
        collection.forEach(str -> {
            Binding binding = bindingsForAddress.getBinding(str);
            if (binding != null) {
                try {
                    binding.route(message, routingContext);
                } catch (Exception e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        });
    }

    public void postAcknowledge(MessageReference messageReference, AckReason ackReason) {
    }

    public void sendMessage(Transaction transaction, Message message, RoutingContext routingContext) {
    }
}
