package org.apache.activemq.broker.region.policy;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transport.InactivityIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1.fuse-70-084.jar:org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.class */
public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
    private Scheduler scheduler;
    private Broker broker;
    private String name = "AbortSlowConsumerStrategy@" + hashCode();
    private final AtomicBoolean taskStarted = new AtomicBoolean(false);
    private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap();
    private long maxSlowCount = -1;
    private long maxSlowDuration = 30000;
    private long checkPeriod = 30000;
    private boolean abortConnection = false;

    @Override // org.apache.activemq.broker.region.policy.SlowConsumerStrategy
    public void setBrokerService(Broker broker) {
        this.scheduler = broker.getScheduler();
        this.broker = broker;
    }

    @Override // org.apache.activemq.broker.region.policy.SlowConsumerStrategy
    public void slowConsumer(ConnectionContext connectionContext, Subscription subscription) {
        if (this.maxSlowCount < 0 && this.maxSlowDuration < 0) {
            LOG.info("no limits set, slowConsumer strategy has nothing to do");
            return;
        }
        if (this.taskStarted.compareAndSet(false, true)) {
            this.scheduler.executePeriodically(this, this.checkPeriod);
        }
        if (!this.slowConsumers.containsKey(subscription)) {
            this.slowConsumers.put(subscription, new SlowConsumerEntry(connectionContext));
        } else if (this.maxSlowCount > 0) {
            this.slowConsumers.get(subscription).slow();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.maxSlowDuration > 0) {
            Iterator<SlowConsumerEntry> it = this.slowConsumers.values().iterator();
            while (it.hasNext()) {
                it.next().mark();
            }
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<Subscription, SlowConsumerEntry> entry : this.slowConsumers.entrySet()) {
            if (!entry.getKey().isSlowConsumer()) {
                LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
                this.slowConsumers.remove(entry.getKey());
            } else if ((this.maxSlowDuration > 0 && entry.getValue().markCount * this.checkPeriod > this.maxSlowDuration) || (this.maxSlowCount > 0 && entry.getValue().slowCount > this.maxSlowCount)) {
                hashMap.put(entry.getKey(), entry.getValue());
                this.slowConsumers.remove(entry.getKey());
            }
        }
        abortSubscription(hashMap, this.abortConnection);
    }

    private void abortSubscription(Map<Subscription, SlowConsumerEntry> map, boolean z) {
        for (final Map.Entry<Subscription, SlowConsumerEntry> entry : map.entrySet()) {
            ConnectionContext connectionContext = entry.getValue().context;
            if (connectionContext != null) {
                try {
                    LOG.info("aborting " + (z ? "connection" : "consumer") + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
                    final Connection connection = connectionContext.getConnection();
                    if (connection == null) {
                        LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
                    } else if (z) {
                        this.scheduler.executeAfterDelay(new Runnable() { // from class: org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy.1
                            @Override // java.lang.Runnable
                            public void run() {
                                connection.serviceException(new InactivityIOException("Consumer was slow too often (>" + AbortSlowConsumerStrategy.this.maxSlowCount + ") or too long (>" + AbortSlowConsumerStrategy.this.maxSlowDuration + "): " + ((Subscription) entry.getKey()).getConsumerInfo().getConsumerId()));
                            }
                        }, 0L);
                    } else {
                        ConsumerControl consumerControl = new ConsumerControl();
                        consumerControl.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
                        consumerControl.setClose(true);
                        connection.dispatchAsync(consumerControl);
                    }
                } catch (Exception e) {
                    LOG.info("exception on stopping " + (z ? "connection" : "consumer") + " to abort slow consumer: " + entry.getKey(), e);
                }
            }
        }
    }

    public void abortConsumer(Subscription subscription, boolean z) {
        if (subscription != null) {
            SlowConsumerEntry remove = this.slowConsumers.remove(subscription);
            if (remove == null) {
                LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + subscription);
                return;
            }
            HashMap hashMap = new HashMap();
            hashMap.put(subscription, remove);
            abortSubscription(hashMap, z);
        }
    }

    public long getMaxSlowCount() {
        return this.maxSlowCount;
    }

    public void setMaxSlowCount(long j) {
        this.maxSlowCount = j;
    }

    public long getMaxSlowDuration() {
        return this.maxSlowDuration;
    }

    public void setMaxSlowDuration(long j) {
        this.maxSlowDuration = j;
    }

    public long getCheckPeriod() {
        return this.checkPeriod;
    }

    public void setCheckPeriod(long j) {
        this.checkPeriod = j;
    }

    public boolean isAbortConnection() {
        return this.abortConnection;
    }

    public void setAbortConnection(boolean z) {
        this.abortConnection = z;
    }

    public void setName(String str) {
        this.name = str;
    }

    public String getName() {
        return this.name;
    }

    public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
        return this.slowConsumers;
    }
}
