package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.ToLongFunction;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.class */
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
    final ActiveMQServer server;
    final RoutingContextImpl routingContext;
    Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
    public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(AMQPMirrorControllerSource.INTERNAL_ID.toString());
    private static final Logger logger = Logger.getLogger(AMQPMirrorControllerTarget.class);
    private static ToLongFunction<MessageReference> referenceIDSupplier = messageReference -> {
        Long l = (Long) messageReference.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
        if (l == null) {
            return -1L;
        }
        return l.longValue();
    };

    public AMQPMirrorControllerTarget(AMQPSessionCallback aMQPSessionCallback, AMQPConnectionContext aMQPConnectionContext, AMQPSessionContext aMQPSessionContext, Receiver receiver, ActiveMQServer activeMQServer) {
        super(aMQPSessionCallback, aMQPConnectionContext, aMQPSessionContext, receiver);
        this.routingContext = new RoutingContextImpl((Transaction) null);
        this.server = activeMQServer;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    public void flow() {
        this.creditRunnable.run();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected void actualDelivery(AMQPMessage aMQPMessage, Delivery delivery, Receiver receiver, Transaction transaction) {
        incrementSettle();
        if (logger.isDebugEnabled()) {
            logger.debug(this.server.getIdentity() + "::Received " + aMQPMessage);
        }
        try {
            try {
                Object messageAnnotationProperty = AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.EVENT_TYPE);
                if (messageAnnotationProperty == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Sending message " + aMQPMessage);
                    }
                    sendMessage(aMQPMessage);
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.ADDRESS_SCAN_START)) {
                    logger.debug("Starting scan for removed queues");
                    startAddressScan();
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.ADDRESS_SCAN_END)) {
                    logger.debug("Ending scan for removed queues");
                    endAddressScan();
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.ADD_ADDRESS)) {
                    AddressInfo parseAddress = parseAddress(aMQPMessage);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Adding Address " + parseAddress);
                    }
                    addAddress(parseAddress);
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.DELETE_ADDRESS)) {
                    AddressInfo parseAddress2 = parseAddress(aMQPMessage);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Removing Address " + parseAddress2);
                    }
                    deleteAddress(parseAddress2);
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.CREATE_QUEUE)) {
                    QueueConfiguration parseQueue = parseQueue(aMQPMessage);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Creating queue " + parseQueue);
                    }
                    createQueue(parseQueue);
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.DELETE_QUEUE)) {
                    String str = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.ADDRESS);
                    String str2 = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.QUEUE);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Deleting queue " + str2 + " on address " + str);
                    }
                    deleteQueue(SimpleString.toSimpleString(str), SimpleString.toSimpleString(str2));
                } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.POST_ACK)) {
                    String str3 = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.ADDRESS);
                    String str4 = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.QUEUE);
                    Long l = (Long) aMQPMessage.getBody().getValue();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Post ack address=" + str3 + " queueName = " + str4 + " messageID=" + l);
                    }
                    postAcknowledge(str3, str4, l.longValue());
                }
                delivery.disposition(Accepted.getInstance());
                settle(delivery);
                this.connection.flush();
            } catch (Throwable th) {
                logger.warn(th.getMessage(), th);
                delivery.disposition(Accepted.getInstance());
                settle(delivery);
                this.connection.flush();
            }
        } catch (Throwable th2) {
            delivery.disposition(Accepted.getInstance());
            settle(delivery);
            this.connection.flush();
            throw th2;
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialize() throws Exception {
        super.initialize();
        this.receiver.getRemoteTarget();
        this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
        this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        flow();
    }

    private QueueConfiguration parseQueue(AMQPMessage aMQPMessage) throws Exception {
        return QueueConfiguration.fromJSON((String) aMQPMessage.getBody().getValue());
    }

    private AddressInfo parseAddress(AMQPMessage aMQPMessage) throws Exception {
        return AddressInfo.fromJSON((String) aMQPMessage.getBody().getValue());
    }

    public void startAddressScan() throws Exception {
        this.scanAddresses = new HashMap();
    }

    public void endAddressScan() throws Exception {
        Map<SimpleString, Map<SimpleString, QueueConfiguration>> map = this.scanAddresses;
        this.scanAddresses = null;
        this.server.getPostOffice().getAllBindings().forEach(binding -> {
            if (binding instanceof LocalQueueBinding) {
                LocalQueueBinding localQueueBinding = (LocalQueueBinding) binding;
                Map map2 = (Map) map.get(localQueueBinding.getQueue().getAddress());
                if (map2 == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("There's no address " + localQueueBinding.getQueue().getAddress() + " so, removing queue");
                    }
                    try {
                        deleteQueue(localQueueBinding.getQueue().getAddress(), localQueueBinding.getQueue().getName());
                        return;
                    } catch (Exception e) {
                        logger.warn(e.getMessage(), e);
                        return;
                    }
                }
                if (((QueueConfiguration) map2.get(localQueueBinding.getQueue().getName())) == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("There no queue for " + localQueueBinding.getQueue().getName() + " so, removing queue");
                    }
                    try {
                        deleteQueue(localQueueBinding.getQueue().getAddress(), localQueueBinding.getQueue().getName());
                    } catch (Exception e2) {
                        logger.warn(e2.getMessage(), e2);
                    }
                }
            }
        });
    }

    private Map<SimpleString, QueueConfiguration> getQueueScanMap(SimpleString simpleString) {
        Map<SimpleString, QueueConfiguration> map = this.scanAddresses.get(simpleString);
        if (map == null) {
            map = new HashMap();
            this.scanAddresses.put(simpleString, map);
        }
        return map;
    }

    public void addAddress(AddressInfo addressInfo) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("Adding address " + addressInfo);
        }
        this.server.addAddressInfo(addressInfo);
    }

    public void deleteAddress(AddressInfo addressInfo) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("delete address " + addressInfo);
        }
        try {
            this.server.removeAddressInfo(addressInfo.getName(), (SecurityAuth) null, true);
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        } catch (ActiveMQAddressDoesNotExistException e2) {
            logger.debug(e2.getMessage(), e2);
        }
    }

    public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("Adding queue " + queueConfiguration);
        }
        this.server.createQueue(queueConfiguration, true);
        if (this.scanAddresses != null) {
            getQueueScanMap(queueConfiguration.getAddress()).put(queueConfiguration.getName(), queueConfiguration);
        }
    }

    public void deleteQueue(SimpleString simpleString, SimpleString simpleString2) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("destroy queue " + simpleString2 + " on address = " + simpleString);
        }
        try {
            this.server.destroyQueue(simpleString2);
        } catch (ActiveMQNonExistentQueueException e) {
            logger.debug("queue " + simpleString2 + " was previously removed", e);
        }
    }

    public void postAcknowledge(String str, String str2, long j) {
        if (logger.isDebugEnabled()) {
            logger.debug("post acking " + str + ", queue = " + str2 + ", messageID = " + j);
        }
        Queue locateQueue = this.server.locateQueue(str2);
        if (locateQueue != null) {
            MessageReference removeWithSuppliedID = locateQueue.removeWithSuppliedID(j, referenceIDSupplier);
            if (removeWithSuppliedID == null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("There is no reference to ack on " + j);
                }
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Acking reference " + removeWithSuppliedID);
                }
                try {
                    locateQueue.acknowledge(removeWithSuppliedID);
                } catch (Exception e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

    private void sendMessage(AMQPMessage aMQPMessage) throws Exception {
        if (aMQPMessage.getMessageID() <= 0) {
            aMQPMessage.setMessageID(this.server.getStorageManager().generateID());
        }
        Long l = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.INTERNAL_ID);
        String str = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.INTERNAL_DESTINATION);
        if (l != null) {
            aMQPMessage.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, l);
        }
        if (str != null) {
            aMQPMessage.m5setAddress(str);
        }
        this.routingContext.clear();
        this.server.getPostOffice().route(aMQPMessage, this.routingContext, false);
        flow();
    }

    public void postAcknowledge(MessageReference messageReference, AckReason ackReason) {
    }

    public void sendMessage(Message message, RoutingContext routingContext, List<MessageReference> list) {
    }
}
