package org.apache.activemq.broker.region.virtual;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.LRUCache;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630514.jar:org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.class */
public class VirtualTopicInterceptor extends DestinationFilter {
    private final String prefix;
    private final String postfix;
    private final boolean local;
    private final boolean concurrentSend;
    private final boolean transactedSend;
    private final boolean dropMessageOnResourceLimit;
    private final boolean setOriginalDestination;
    private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache;

    public VirtualTopicInterceptor(Destination destination, VirtualTopic virtualTopic) {
        super(destination);
        this.cache = new LRUCache<>();
        this.prefix = virtualTopic.getPrefix();
        this.postfix = virtualTopic.getPostfix();
        this.local = virtualTopic.isLocal();
        this.concurrentSend = virtualTopic.isConcurrentSend();
        this.transactedSend = virtualTopic.isTransactedSend();
        this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit();
        this.setOriginalDestination = virtualTopic.isSetOriginalDestination();
    }

    public Topic getTopic() {
        return (Topic) this.next;
    }

    @Override // org.apache.activemq.broker.region.DestinationFilter, org.apache.activemq.broker.region.Destination
    public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
        if (!message.isAdvisory() && (!this.local || message.getBrokerPath() == null)) {
            send(producerBrokerExchange, message, getQueueConsumersWildcard(message.getDestination()));
        }
        super.send(producerBrokerExchange, message);
    }

    @Override // org.apache.activemq.broker.region.DestinationFilter
    protected void send(final ProducerBrokerExchange producerBrokerExchange, final Message message, ActiveMQDestination activeMQDestination) throws Exception {
        Broker broker = producerBrokerExchange.getConnectionContext().getBroker();
        Set<Destination> destinations = broker.getDestinations(activeMQDestination);
        int size = destinations.size();
        LocalTransactionId beginLocalTransaction = beginLocalTransaction(size, producerBrokerExchange.getConnectionContext(), message);
        try {
            if (!this.concurrentSend || size <= 1) {
                for (Destination destination : destinations) {
                    if (shouldDispatch(broker, message, destination)) {
                        try {
                            destination.send(producerBrokerExchange, copy(message, destination.getActiveMQDestination()));
                        } catch (ResourceAllocationException e) {
                            if (!this.dropMessageOnResourceLimit) {
                                throw e;
                            }
                        }
                    }
                }
            } else {
                final CountDownLatch countDownLatch = new CountDownLatch(destinations.size());
                final AtomicReference atomicReference = new AtomicReference();
                BrokerService brokerService = broker.getBrokerService();
                for (final Destination destination2 : destinations) {
                    if (shouldDispatch(broker, message, destination2)) {
                        brokerService.getTaskRunnerFactory().execute(new Runnable() { // from class: org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor.1
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    try {
                                        try {
                                            if (atomicReference.get() == null) {
                                                destination2.send(producerBrokerExchange, VirtualTopicInterceptor.this.copy(message, destination2.getActiveMQDestination()));
                                            }
                                            countDownLatch.countDown();
                                        } catch (ResourceAllocationException e2) {
                                            if (!VirtualTopicInterceptor.this.dropMessageOnResourceLimit) {
                                                atomicReference.set(e2);
                                            }
                                            countDownLatch.countDown();
                                        }
                                    } catch (Exception e3) {
                                        atomicReference.set(e3);
                                        countDownLatch.countDown();
                                    }
                                } catch (Throwable th) {
                                    countDownLatch.countDown();
                                    throw th;
                                }
                            }
                        });
                    } else {
                        countDownLatch.countDown();
                    }
                }
                countDownLatch.await();
                if (atomicReference.get() != null) {
                    throw ((Exception) atomicReference.get());
                }
            }
        } finally {
            commit(beginLocalTransaction, producerBrokerExchange.getConnectionContext(), message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message copy(Message message, ActiveMQDestination activeMQDestination) {
        Message copy = message.copy();
        if (this.setOriginalDestination) {
            copy.setDestination(activeMQDestination);
            copy.setOriginalDestination(message.getDestination());
        }
        return copy;
    }

    private LocalTransactionId beginLocalTransaction(int i, ConnectionContext connectionContext, Message message) throws Exception {
        LocalTransactionId localTransactionId = null;
        if (this.transactedSend && i > 1 && message.isPersistent() && message.getTransactionId() == null) {
            localTransactionId = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
            connectionContext.getBroker().beginTransaction(connectionContext, localTransactionId);
            connectionContext.setTransaction(connectionContext.getTransactions().get(localTransactionId));
            message.setTransactionId(localTransactionId);
        }
        return localTransactionId;
    }

    private void commit(LocalTransactionId localTransactionId, ConnectionContext connectionContext, Message message) throws Exception {
        if (localTransactionId != null) {
            connectionContext.getBroker().commitTransaction(connectionContext, localTransactionId, true);
            connectionContext.getTransactions().remove(localTransactionId);
            connectionContext.setTransaction(null);
            message.setTransactionId(null);
        }
    }

    protected boolean shouldDispatch(Broker broker, Message message, Destination destination) throws IOException {
        return true;
    }

    protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination activeMQDestination) {
        ActiveMQQueue activeMQQueue;
        synchronized (this.cache) {
            activeMQQueue = this.cache.get(activeMQDestination);
            if (activeMQQueue == null) {
                activeMQQueue = new ActiveMQQueue(this.prefix + activeMQDestination.getPhysicalName() + this.postfix);
                this.cache.put(activeMQDestination, activeMQQueue);
            }
        }
        return activeMQQueue;
    }
}
