package org.apache.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-fuse-4.1.0.10.jar:org/apache/activemq/broker/region/Topic.class */
public class Topic implements Destination {
    private static final Log log;
    protected final ActiveMQDestination destination;
    protected final TopicMessageStore store;
    protected final UsageManager usageManager;
    private boolean sendAdvisoryIfNoConsumers;
    static Class class$org$apache$activemq$broker$region$Topic;
    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
    protected final Valve dispatchValve = new Valve(true);
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
    private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
    private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();

    public Topic(ActiveMQDestination activeMQDestination, TopicMessageStore topicMessageStore, UsageManager usageManager, DestinationStatistics destinationStatistics, TaskRunnerFactory taskRunnerFactory) {
        this.destination = activeMQDestination;
        this.store = topicMessageStore;
        this.usageManager = new UsageManager(usageManager);
        this.usageManager.setLimit(Long.MAX_VALUE);
        if (topicMessageStore != null) {
            topicMessageStore.setUsageManager(this.usageManager);
        }
        this.destinationStatistics.setParent(destinationStatistics);
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean lock(MessageReference messageReference, LockOwner lockOwner) {
        return true;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        subscription.add(connectionContext, this);
        if (subscription.getConsumerInfo().isDurable()) {
            DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) subscription;
            this.durableSubcribers.put(durableTopicSubscription.getSubscriptionKey(), durableTopicSubscription);
            return;
        }
        this.destinationStatistics.getConsumers().increment();
        if (!subscription.getConsumerInfo().isRetroactive()) {
            synchronized (this.consumers) {
                this.consumers.add(subscription);
            }
            return;
        }
        this.dispatchValve.turnOff();
        try {
            synchronized (this.consumers) {
                this.consumers.add(subscription);
            }
            this.subscriptionRecoveryPolicy.recover(connectionContext, this, subscription);
            this.dispatchValve.turnOn();
        } catch (Throwable th) {
            this.dispatchValve.turnOn();
            throw th;
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        if (!subscription.getConsumerInfo().isDurable()) {
            this.destinationStatistics.getConsumers().decrement();
            synchronized (this.consumers) {
                this.consumers.remove(subscription);
            }
        }
        subscription.remove(connectionContext, this);
    }

    public void deleteSubscription(ConnectionContext connectionContext, SubscriptionKey subscriptionKey) throws IOException {
        if (this.store != null) {
            this.store.deleteSubscription(subscriptionKey.clientId, subscriptionKey.subscriptionName);
            this.durableSubcribers.remove(subscriptionKey);
            this.destinationStatistics.getConsumers().decrement();
        }
    }

    public void activate(ConnectionContext connectionContext, DurableTopicSubscription durableTopicSubscription) throws Exception {
        this.dispatchValve.turnOff();
        try {
            synchronized (this.consumers) {
                this.consumers.add(durableTopicSubscription);
            }
            if (this.store == null) {
                return;
            }
            String clientId = durableTopicSubscription.getClientId();
            String subscriptionName = durableTopicSubscription.getSubscriptionName();
            String selector = durableTopicSubscription.getConsumerInfo().getSelector();
            SubscriptionInfo lookupSubscription = this.store.lookupSubscription(clientId, subscriptionName);
            if (lookupSubscription != null) {
                String selector2 = lookupSubscription.getSelector();
                if (((selector2 == null) ^ (selector == null)) || (selector2 != null && !selector2.equals(selector))) {
                    this.store.deleteSubscription(clientId, subscriptionName);
                    lookupSubscription = null;
                }
            }
            if (lookupSubscription == null) {
                this.store.addSubsciption(clientId, subscriptionName, selector, durableTopicSubscription.getConsumerInfo().isRetroactive());
            }
            MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
            messageEvaluationContext.setDestination(this.destination);
            this.store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener(this, messageEvaluationContext, durableTopicSubscription) { // from class: org.apache.activemq.broker.region.Topic.1
                private final MessageEvaluationContext val$msgContext;
                private final DurableTopicSubscription val$subscription;
                private final Topic this$0;

                {
                    this.this$0 = this;
                    this.val$msgContext = messageEvaluationContext;
                    this.val$subscription = durableTopicSubscription;
                }

                @Override // org.apache.activemq.store.MessageRecoveryListener
                public void recoverMessage(Message message) throws Exception {
                    message.setRegionDestination(this.this$0);
                    try {
                        this.val$msgContext.setMessageReference(message);
                        if (this.val$subscription.matches(message, this.val$msgContext)) {
                            this.val$subscription.add(message);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }

                @Override // org.apache.activemq.store.MessageRecoveryListener
                public void recoverMessageReference(String str) throws Exception {
                    throw new RuntimeException("Should not be called.");
                }

                @Override // org.apache.activemq.store.MessageRecoveryListener
                public void finished() {
                }
            });
            if (durableTopicSubscription.getConsumerInfo().isRetroactive() && durableTopicSubscription.getEnqueueCounter() == 0) {
                this.subscriptionRecoveryPolicy.recover(connectionContext, this, durableTopicSubscription);
            }
            this.dispatchValve.turnOn();
        } finally {
            this.dispatchValve.turnOn();
        }
    }

    public void deactivate(ConnectionContext connectionContext, DurableTopicSubscription durableTopicSubscription) throws Exception {
        synchronized (this.consumers) {
            this.consumers.remove(durableTopicSubscription);
        }
        durableTopicSubscription.remove(connectionContext, this);
    }

    /* JADX WARN: Code restructure failed: missing block: B:13:0x0028, code lost:
    
        r9.setRegionDestination(r7);
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0031, code lost:
    
        if (r7.store == null) goto L17;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0038, code lost:
    
        if (r9.isPersistent() == false) goto L17;
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x003f, code lost:
    
        if (canOptimizeOutPersistence() != false) goto L17;
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0042, code lost:
    
        r7.store.addMessage(r8, r9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x004d, code lost:
    
        r9.incrementReferenceCount();
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0056, code lost:
    
        if (r8.isInTransaction() == false) goto L21;
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x0059, code lost:
    
        r8.getTransaction().addSynchronization(new org.apache.activemq.broker.region.Topic.AnonymousClass2(r7, r8, r9));
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x0083, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x006d, code lost:
    
        dispatch(r8, r9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x007b, code lost:
    
        r10 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:2:0x0004, code lost:
    
        if (r8.isProducerFlowControl() != false) goto L4;
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x007d, code lost:
    
        r9.decrementReferenceCount();
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x0082, code lost:
    
        throw r10;
     */
    /* JADX WARN: Code restructure failed: missing block: B:4:0x0011, code lost:
    
        if (r7.usageManager.waitForSpace(1000) != false) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:6:0x001b, code lost:
    
        if (r8.getStopping().get() == false) goto L32;
     */
    /* JADX WARN: Code restructure failed: missing block: B:9:0x0027, code lost:
    
        throw new java.io.IOException("Connection closed, send aborted.");
     */
    @Override // org.apache.activemq.broker.region.Destination
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void send(org.apache.activemq.broker.ConnectionContext r8, org.apache.activemq.command.Message r9) throws java.lang.Exception {
        /*
            r7 = this;
            r0 = r8
            boolean r0 = r0.isProducerFlowControl()
            if (r0 == 0) goto L28
        L7:
            r0 = r7
            org.apache.activemq.memory.UsageManager r0 = r0.usageManager
            r1 = 1000(0x3e8, double:4.94E-321)
            boolean r0 = r0.waitForSpace(r1)
            if (r0 != 0) goto L28
            r0 = r8
            edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean r0 = r0.getStopping()
            boolean r0 = r0.get()
            if (r0 == 0) goto L7
            java.io.IOException r0 = new java.io.IOException
            r1 = r0
            java.lang.String r2 = "Connection closed, send aborted."
            r1.<init>(r2)
            throw r0
        L28:
            r0 = r9
            r1 = r7
            r0.setRegionDestination(r1)
            r0 = r7
            org.apache.activemq.store.TopicMessageStore r0 = r0.store
            if (r0 == 0) goto L4d
            r0 = r9
            boolean r0 = r0.isPersistent()
            if (r0 == 0) goto L4d
            r0 = r7
            boolean r0 = r0.canOptimizeOutPersistence()
            if (r0 != 0) goto L4d
            r0 = r7
            org.apache.activemq.store.TopicMessageStore r0 = r0.store
            r1 = r8
            r2 = r9
            r0.addMessage(r1, r2)
        L4d:
            r0 = r9
            int r0 = r0.incrementReferenceCount()
            r0 = r8
            boolean r0 = r0.isInTransaction()     // Catch: java.lang.Throwable -> L7b
            if (r0 == 0) goto L6d
            r0 = r8
            org.apache.activemq.transaction.Transaction r0 = r0.getTransaction()     // Catch: java.lang.Throwable -> L7b
            org.apache.activemq.broker.region.Topic$2 r1 = new org.apache.activemq.broker.region.Topic$2     // Catch: java.lang.Throwable -> L7b
            r2 = r1
            r3 = r7
            r4 = r8
            r5 = r9
            r2.<init>(r3, r4, r5)     // Catch: java.lang.Throwable -> L7b
            r0.addSynchronization(r1)     // Catch: java.lang.Throwable -> L7b
            goto L73
        L6d:
            r0 = r7
            r1 = r8
            r2 = r9
            r0.dispatch(r1, r2)     // Catch: java.lang.Throwable -> L7b
        L73:
            r0 = r9
            int r0 = r0.decrementReferenceCount()
            goto L83
        L7b:
            r10 = move-exception
            r0 = r9
            int r0 = r0.decrementReferenceCount()
            r0 = r10
            throw r0
        L83:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.broker.region.Topic.send(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.Message):void");
    }

    private boolean canOptimizeOutPersistence() {
        return this.durableSubcribers.size() == 0;
    }

    public String toString() {
        return new StringBuffer().append("Topic: destination=").append(this.destination.getPhysicalName()).append(", subscriptions=").append(this.consumers.size()).toString();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void acknowledge(ConnectionContext connectionContext, Subscription subscription, MessageAck messageAck, MessageReference messageReference) throws IOException {
        if (this.store == null || !messageReference.isPersistent()) {
            return;
        }
        DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) subscription;
        this.store.acknowledge(connectionContext, durableTopicSubscription.getClientId(), durableTopicSubscription.getSubscriptionName(), messageReference.getMessageId());
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void dispose(ConnectionContext connectionContext) throws IOException {
        if (this.store != null) {
            this.store.removeAllMessages(connectionContext);
        }
        this.destinationStatistics.setParent(null);
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void gc() {
    }

    @Override // org.apache.activemq.broker.region.Destination
    public Message loadMessage(MessageId messageId) throws IOException {
        if (this.store != null) {
            return this.store.getMessage(messageId);
        }
        return null;
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        this.subscriptionRecoveryPolicy.start();
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        this.subscriptionRecoveryPolicy.stop();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public Message[] browse() {
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        try {
            if (this.store != null) {
                this.store.recover(new MessageRecoveryListener(this, copyOnWriteArraySet) { // from class: org.apache.activemq.broker.region.Topic.3
                    private final Set val$result;
                    private final Topic this$0;

                    {
                        this.this$0 = this;
                        this.val$result = copyOnWriteArraySet;
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public void recoverMessage(Message message) throws Exception {
                        this.val$result.add(message);
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public void recoverMessageReference(String str) throws Exception {
                    }

                    @Override // org.apache.activemq.store.MessageRecoveryListener
                    public void finished() {
                    }
                });
                Message[] browse = this.subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                if (browse != null) {
                    for (Message message : browse) {
                        copyOnWriteArraySet.add(message);
                    }
                }
            }
        } catch (Throwable th) {
            log.warn(new StringBuffer().append("Failed to browse Topic: ").append(getActiveMQDestination().getPhysicalName()).toString(), th);
        }
        return (Message[]) copyOnWriteArraySet.toArray(new Message[copyOnWriteArraySet.size()]);
    }

    @Override // org.apache.activemq.broker.region.Destination
    public UsageManager getUsageManager() {
        return this.usageManager;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    public String getDestination() {
        return this.destination.getPhysicalName();
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
        return this.subscriptionRecoveryPolicy;
    }

    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
    }

    public boolean isSendAdvisoryIfNoConsumers() {
        return this.sendAdvisoryIfNoConsumers;
    }

    public void setSendAdvisoryIfNoConsumers(boolean z) {
        this.sendAdvisoryIfNoConsumers = z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public MessageStore getMessageStore() {
        return this.store;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public DeadLetterStrategy getDeadLetterStrategy() {
        return this.deadLetterStrategy;
    }

    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
        this.deadLetterStrategy = deadLetterStrategy;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public String getName() {
        return getActiveMQDestination().getPhysicalName();
    }

    protected void dispatch(ConnectionContext connectionContext, Message message) throws Exception {
        this.destinationStatistics.getEnqueues().increment();
        this.dispatchValve.increment();
        MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
        try {
            if (this.subscriptionRecoveryPolicy.add(connectionContext, message)) {
                synchronized (this.consumers) {
                    if (this.consumers.isEmpty()) {
                        onMessageWithNoConsumers(connectionContext, message);
                        messageEvaluationContext.clear();
                        this.dispatchValve.decrement();
                    } else {
                        messageEvaluationContext.setDestination(this.destination);
                        messageEvaluationContext.setMessageReference(message);
                        if (!this.dispatchPolicy.dispatch(connectionContext, message, messageEvaluationContext, this.consumers)) {
                            onMessageWithNoConsumers(connectionContext, message);
                        }
                        messageEvaluationContext.clear();
                        this.dispatchValve.decrement();
                    }
                }
            }
        } finally {
            messageEvaluationContext.clear();
            this.dispatchValve.decrement();
        }
    }

    protected void onMessageWithNoConsumers(ConnectionContext connectionContext, Message message) throws Exception {
        if (message.isPersistent() || !this.sendAdvisoryIfNoConsumers || AdvisorySupport.isAdvisoryTopic(this.destination)) {
            return;
        }
        if (message.getOriginalDestination() != null) {
            message.setOriginalDestination(message.getDestination());
        }
        if (message.getOriginalTransactionId() != null) {
            message.setOriginalTransactionId(message.getTransactionId());
        }
        message.setDestination(AdvisorySupport.getNoTopicConsumersAdvisoryTopic(this.destination));
        message.setTransactionId(null);
        message.evictMarshlledForm();
        boolean isProducerFlowControl = connectionContext.isProducerFlowControl();
        try {
            connectionContext.setProducerFlowControl(false);
            connectionContext.getBroker().send(connectionContext, message);
            connectionContext.setProducerFlowControl(isProducerFlowControl);
        } catch (Throwable th) {
            connectionContext.setProducerFlowControl(isProducerFlowControl);
            throw th;
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$broker$region$Topic == null) {
            cls = class$("org.apache.activemq.broker.region.Topic");
            class$org$apache$activemq$broker$region$Topic = cls;
        } else {
            cls = class$org$apache$activemq$broker$region$Topic;
        }
        log = LogFactory.getLog(cls);
    }
}
