package org.apache.activemq.broker.region.policy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transport.InactivityIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-broker-5.11.0.redhat-630303.jar:org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.class */
public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbortSlowConsumerStrategy.class);
    protected Scheduler scheduler;
    protected Broker broker;
    protected String name = "AbortSlowConsumerStrategy@" + hashCode();
    protected final AtomicBoolean taskStarted = new AtomicBoolean(false);
    protected final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap();
    private long maxSlowCount = -1;
    private long maxSlowDuration = 30000;
    private long checkPeriod = 30000;
    private boolean abortConnection = false;
    private boolean ignoreNetworkConsumers = true;

    @Override // org.apache.activemq.broker.region.policy.SlowConsumerStrategy
    public void setBrokerService(Broker broker) {
        this.scheduler = broker.getScheduler();
        this.broker = broker;
    }

    @Override // org.apache.activemq.broker.region.policy.SlowConsumerStrategy
    public void slowConsumer(ConnectionContext connectionContext, Subscription subscription) {
        if (this.maxSlowCount < 0 && this.maxSlowDuration < 0) {
            LOG.info("no limits set, slowConsumer strategy has nothing to do");
            return;
        }
        if (this.taskStarted.compareAndSet(false, true)) {
            this.scheduler.executePeriodically(this, this.checkPeriod);
        }
        if (!this.slowConsumers.containsKey(subscription)) {
            this.slowConsumers.put(subscription, new SlowConsumerEntry(connectionContext));
        } else if (this.maxSlowCount > 0) {
            this.slowConsumers.get(subscription).slow();
        }
    }

    public void run() {
        if (this.maxSlowDuration > 0) {
            Iterator<SlowConsumerEntry> it = this.slowConsumers.values().iterator();
            while (it.hasNext()) {
                it.next().mark();
            }
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<Subscription, SlowConsumerEntry> entry : this.slowConsumers.entrySet()) {
            Subscription key = entry.getKey();
            if (isIgnoreNetworkSubscriptions() && key.getConsumerInfo().isNetworkSubscription()) {
                if (this.slowConsumers.remove(key) != null) {
                    LOG.info("network sub: {} is no longer slow", key.getConsumerInfo().getConsumerId());
                }
            } else if (!entry.getKey().isSlowConsumer()) {
                LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
                this.slowConsumers.remove(entry.getKey());
            } else if ((this.maxSlowDuration > 0 && entry.getValue().markCount * this.checkPeriod >= this.maxSlowDuration) || (this.maxSlowCount > 0 && entry.getValue().slowCount >= this.maxSlowCount)) {
                hashMap.put(entry.getKey(), entry.getValue());
                this.slowConsumers.remove(entry.getKey());
            }
        }
        abortSubscription(hashMap, this.abortConnection);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abortSubscription(Map<Subscription, SlowConsumerEntry> map, boolean z) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Subscription, SlowConsumerEntry> entry : map.entrySet()) {
            ConnectionContext connectionContext = entry.getValue().context;
            if (connectionContext != null) {
                Connection connection = connectionContext.getConnection();
                if (connection == null) {
                    LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
                }
                if (!hashMap.containsKey(connection)) {
                    hashMap.put(connection, new ArrayList());
                }
                ((List) hashMap.get(connection)).add(entry.getKey());
            }
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            final Connection connection2 = (Connection) entry2.getKey();
            final List<Subscription> list = (List) entry2.getValue();
            if (z) {
                LOG.info("aborting connection:{} with {} slow consumers", connection2.getConnectionId(), Integer.valueOf(list.size()));
                if (LOG.isTraceEnabled()) {
                    for (Subscription subscription : list) {
                        LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}", connection2.getConnectionId(), subscription.getConsumerInfo().getConsumerId(), subscription.getActiveMQDestination());
                    }
                }
                try {
                    this.scheduler.executeAfterDelay(new Runnable() { // from class: org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy.1
                        @Override // java.lang.Runnable
                        public void run() {
                            connection2.serviceException(new InactivityIOException(list.size() + " Consumers was slow too often (>" + AbortSlowConsumerStrategy.this.maxSlowCount + ") or too long (>" + AbortSlowConsumerStrategy.this.maxSlowDuration + "): "));
                        }
                    }, 0L);
                } catch (Exception e) {
                    LOG.info("exception on aborting connection {} with {} slow consumers", connection2.getConnectionId(), Integer.valueOf(list.size()));
                }
            } else {
                for (final Subscription subscription2 : list) {
                    LOG.info("aborting slow consumer: {} for destination:{}", subscription2.getConsumerInfo().getConsumerId(), subscription2.getActiveMQDestination());
                    try {
                        ConsumerControl consumerControl = new ConsumerControl();
                        consumerControl.setConsumerId(subscription2.getConsumerInfo().getConsumerId());
                        consumerControl.setClose(true);
                        connection2.dispatchAsync(consumerControl);
                    } catch (Exception e2) {
                        LOG.info("exception on aborting slow consumer: {}", subscription2.getConsumerInfo().getConsumerId(), e2);
                    }
                    try {
                        this.scheduler.executeAfterDelay(new Runnable() { // from class: org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy.2
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    RemoveInfo createRemoveCommand = subscription2.getConsumerInfo().createRemoveCommand();
                                    if (connection2 instanceof CommandVisitor) {
                                        createRemoveCommand.visit((CommandVisitor) connection2);
                                    } else {
                                        connection2.service(createRemoveCommand);
                                    }
                                } catch (IllegalStateException e3) {
                                } catch (Exception e4) {
                                    AbortSlowConsumerStrategy.LOG.info("exception on local remove of slow consumer: {}", subscription2.getConsumerInfo().getConsumerId(), e4);
                                }
                            }
                        }, 1000L);
                    } catch (Exception e3) {
                        LOG.info("exception on local remove of slow consumer: {}", subscription2.getConsumerInfo().getConsumerId(), e3);
                    }
                }
            }
        }
    }

    public void abortConsumer(Subscription subscription, boolean z) {
        if (subscription != null) {
            SlowConsumerEntry remove = this.slowConsumers.remove(subscription);
            if (remove == null) {
                LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + subscription);
                return;
            }
            HashMap hashMap = new HashMap();
            hashMap.put(subscription, remove);
            abortSubscription(hashMap, z);
        }
    }

    public long getMaxSlowCount() {
        return this.maxSlowCount;
    }

    public void setMaxSlowCount(long j) {
        this.maxSlowCount = j;
    }

    public long getMaxSlowDuration() {
        return this.maxSlowDuration;
    }

    public void setMaxSlowDuration(long j) {
        this.maxSlowDuration = j;
    }

    public long getCheckPeriod() {
        return this.checkPeriod;
    }

    public void setCheckPeriod(long j) {
        this.checkPeriod = j;
    }

    public boolean isAbortConnection() {
        return this.abortConnection;
    }

    public void setAbortConnection(boolean z) {
        this.abortConnection = z;
    }

    public boolean isIgnoreNetworkSubscriptions() {
        return this.ignoreNetworkConsumers;
    }

    public void setIgnoreNetworkConsumers(boolean z) {
        this.ignoreNetworkConsumers = z;
    }

    public void setName(String str) {
        this.name = str;
    }

    public String getName() {
        return this.name;
    }

    public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
        return this.slowConsumers;
    }

    @Override // org.apache.activemq.broker.region.policy.SlowConsumerStrategy
    public void addDestination(Destination destination) {
    }
}
