package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.slf4j.Logger;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630224.jar:org/apache/activemq/broker/region/BaseDestination.class */
public abstract class BaseDestination implements Destination {
    public static final int MAX_PAGE_SIZE = 200;
    public static final int MAX_BROWSE_PAGE_SIZE = 400;
    public static final long EXPIRE_MESSAGE_PERIOD = 30000;
    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60000;
    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
    public static final int MAX_AUDIT_DEPTH = 10000;
    protected final ActiveMQDestination destination;
    protected final Broker broker;
    protected final MessageStore store;
    protected SystemUsage systemUsage;
    protected MemoryUsage memoryUsage;
    private boolean advisoryForSlowConsumers;
    private boolean advisoryForFastProducers;
    private boolean advisoryForDiscardingMessages;
    private boolean advisoryWhenFull;
    private boolean advisoryForDelivery;
    private boolean advisoryForConsumed;
    private boolean sendAdvisoryIfNoConsumers;
    protected final BrokerService brokerService;
    protected final Broker regionBroker;
    private SlowConsumerStrategy slowConsumerStrategy;
    private boolean prioritizedMessages;
    private boolean gcIfInactive;
    private boolean gcWithNetworkConsumers;
    protected final Scheduler scheduler;
    private boolean persistJMSRedelivered;
    private boolean producerFlowControl = true;
    private boolean alwaysRetroactive = false;
    protected boolean warnOnProducerFlowControl = true;
    protected long blockedProducerWarningInterval = 30000;
    private int maxProducersToAudit = 1024;
    private int maxAuditDepth = 2048;
    private boolean enableAudit = true;
    private int maxPageSize = 200;
    private int maxBrowsePageSize = 400;
    private boolean useCache = true;
    private int minimumMessageSize = 1024;
    private boolean lazyDispatch = false;
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
    protected long expireMessagesPeriod = 30000;
    private int maxExpirePageSize = 400;
    protected int cursorMemoryHighWaterMark = 70;
    protected int storeUsageHighWaterMark = 100;
    private long inactiveTimeoutBeforeGC = 60000;
    private long lastActiveTime = 0;
    private boolean reduceMemoryFootprint = false;
    private boolean disposed = false;
    private boolean doOptimzeMessageStorage = true;
    private int optimizeMessageStoreInFlightLimit = 10;

    public BaseDestination(BrokerService brokerService, MessageStore messageStore, ActiveMQDestination activeMQDestination, DestinationStatistics destinationStatistics) throws Exception {
        this.brokerService = brokerService;
        this.broker = brokerService.getBroker();
        this.store = messageStore;
        this.destination = activeMQDestination;
        this.destinationStatistics.setEnabled(destinationStatistics.isEnabled());
        this.destinationStatistics.setParent(destinationStatistics);
        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), activeMQDestination.toString());
        this.memoryUsage = this.systemUsage.getMemoryUsage();
        this.memoryUsage.setUsagePortion(1.0f);
        this.regionBroker = brokerService.getRegionBroker();
        this.scheduler = brokerService.getBroker().getScheduler();
    }

    public void initialize() throws Exception {
        if (this.store != null) {
            this.store.setMemoryUsage(this.memoryUsage);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isProducerFlowControl() {
        return this.producerFlowControl;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setProducerFlowControl(boolean z) {
        this.producerFlowControl = z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isAlwaysRetroactive() {
        return this.alwaysRetroactive;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setAlwaysRetroactive(boolean z) {
        this.alwaysRetroactive = z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setBlockedProducerWarningInterval(long j) {
        this.blockedProducerWarningInterval = j;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public long getBlockedProducerWarningInterval() {
        return this.blockedProducerWarningInterval;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public int getMaxProducersToAudit() {
        return this.maxProducersToAudit;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setMaxProducersToAudit(int i) {
        this.maxProducersToAudit = i;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public int getMaxAuditDepth() {
        return this.maxAuditDepth;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setMaxAuditDepth(int i) {
        this.maxAuditDepth = i;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isEnableAudit() {
        return this.enableAudit;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setEnableAudit(boolean z) {
        this.enableAudit = z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        this.destinationStatistics.getProducers().increment();
        this.lastActiveTime = 0L;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void removeProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
        this.destinationStatistics.getProducers().decrement();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        this.destinationStatistics.getConsumers().increment();
        this.lastActiveTime = 0L;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription, long j) throws Exception {
        this.destinationStatistics.getConsumers().decrement();
    }

    @Override // org.apache.activemq.broker.region.Destination, org.apache.activemq.command.Message.MessageDestination
    public final MemoryUsage getMemoryUsage() {
        return this.memoryUsage;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setMemoryUsage(MemoryUsage memoryUsage) {
        this.memoryUsage = memoryUsage;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public final String getName() {
        return getActiveMQDestination().getPhysicalName();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public final MessageStore getMessageStore() {
        return this.store;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isActive() {
        boolean z = (this.destinationStatistics.getConsumers().getCount() == 0 && this.destinationStatistics.getProducers().getCount() == 0) ? false : true;
        if (z && isGcWithNetworkConsumers() && this.destinationStatistics.getConsumers().getCount() != 0) {
            z = hasRegularConsumers(getConsumers());
        }
        return z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public int getMaxPageSize() {
        return this.maxPageSize;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setMaxPageSize(int i) {
        this.maxPageSize = i;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public int getMaxBrowsePageSize() {
        return this.maxBrowsePageSize;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setMaxBrowsePageSize(int i) {
        this.maxBrowsePageSize = i;
    }

    public int getMaxExpirePageSize() {
        return this.maxExpirePageSize;
    }

    public void setMaxExpirePageSize(int i) {
        this.maxExpirePageSize = i;
    }

    public void setExpireMessagesPeriod(long j) {
        this.expireMessagesPeriod = j;
    }

    public long getExpireMessagesPeriod() {
        return this.expireMessagesPeriod;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isUseCache() {
        return this.useCache;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setUseCache(boolean z) {
        this.useCache = z;
    }

    @Override // org.apache.activemq.broker.region.Destination, org.apache.activemq.command.Message.MessageDestination
    public int getMinimumMessageSize() {
        return this.minimumMessageSize;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setMinimumMessageSize(int i) {
        this.minimumMessageSize = i;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isLazyDispatch() {
        return this.lazyDispatch;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setLazyDispatch(boolean z) {
        this.lazyDispatch = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getDestinationSequenceId() {
        return this.regionBroker.getBrokerSequenceId();
    }

    public boolean isAdvisoryForSlowConsumers() {
        return this.advisoryForSlowConsumers;
    }

    public void setAdvisoryForSlowConsumers(boolean z) {
        this.advisoryForSlowConsumers = z;
    }

    public boolean isAdvisoryForDiscardingMessages() {
        return this.advisoryForDiscardingMessages;
    }

    public void setAdvisoryForDiscardingMessages(boolean z) {
        this.advisoryForDiscardingMessages = z;
    }

    public boolean isAdvisoryWhenFull() {
        return this.advisoryWhenFull;
    }

    public void setAdvisoryWhenFull(boolean z) {
        this.advisoryWhenFull = z;
    }

    public boolean isAdvisoryForDelivery() {
        return this.advisoryForDelivery;
    }

    public void setAdvisoryForDelivery(boolean z) {
        this.advisoryForDelivery = z;
    }

    public boolean isAdvisoryForConsumed() {
        return this.advisoryForConsumed;
    }

    public void setAdvisoryForConsumed(boolean z) {
        this.advisoryForConsumed = z;
    }

    public boolean isAdvisoryForFastProducers() {
        return this.advisoryForFastProducers;
    }

    public void setAdvisoryForFastProducers(boolean z) {
        this.advisoryForFastProducers = z;
    }

    public boolean isSendAdvisoryIfNoConsumers() {
        return this.sendAdvisoryIfNoConsumers;
    }

    public void setSendAdvisoryIfNoConsumers(boolean z) {
        this.sendAdvisoryIfNoConsumers = z;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public DeadLetterStrategy getDeadLetterStrategy() {
        return this.deadLetterStrategy;
    }

    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
        this.deadLetterStrategy = deadLetterStrategy;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public int getCursorMemoryHighWaterMark() {
        return this.cursorMemoryHighWaterMark;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setCursorMemoryHighWaterMark(int i) {
        this.cursorMemoryHighWaterMark = i;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void messageConsumed(ConnectionContext connectionContext, MessageReference messageReference) {
        if (this.advisoryForConsumed) {
            this.broker.messageConsumed(connectionContext, messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void messageDelivered(ConnectionContext connectionContext, MessageReference messageReference) {
        if (this.advisoryForDelivery) {
            this.broker.messageDelivered(connectionContext, messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void messageDiscarded(ConnectionContext connectionContext, Subscription subscription, MessageReference messageReference) {
        if (this.advisoryForDiscardingMessages) {
            this.broker.messageDiscarded(connectionContext, subscription, messageReference);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void slowConsumer(ConnectionContext connectionContext, Subscription subscription) {
        if (this.advisoryForSlowConsumers) {
            this.broker.slowConsumer(connectionContext, this, subscription);
        }
        if (this.slowConsumerStrategy != null) {
            this.slowConsumerStrategy.slowConsumer(connectionContext, subscription);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void fastProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) {
        if (this.advisoryForFastProducers) {
            this.broker.fastProducer(connectionContext, producerInfo, getActiveMQDestination());
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void isFull(ConnectionContext connectionContext, Usage<?> usage) {
        if (this.advisoryWhenFull) {
            this.broker.isFull(connectionContext, this, usage);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void dispose(ConnectionContext connectionContext) throws IOException {
        if (this.store != null) {
            this.store.removeAllMessages(connectionContext);
            this.store.dispose(connectionContext);
        }
        this.destinationStatistics.setParent(null);
        this.memoryUsage.stop();
        this.disposed = true;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isDisposed() {
        return this.disposed;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onMessageWithNoConsumers(ConnectionContext connectionContext, Message message) throws Exception {
        if (message.isPersistent() || !isSendAdvisoryIfNoConsumers()) {
            return;
        }
        if (this.destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(this.destination)) {
            Message copy = message.copy();
            if (copy.getOriginalDestination() != null) {
                copy.setOriginalDestination(copy.getDestination());
            }
            if (copy.getOriginalTransactionId() != null) {
                copy.setOriginalTransactionId(copy.getTransactionId());
            }
            copy.setDestination(this.destination.isQueue() ? AdvisorySupport.getNoQueueConsumersAdvisoryTopic(this.destination) : AdvisorySupport.getNoTopicConsumersAdvisoryTopic(this.destination));
            copy.setTransactionId(null);
            boolean isProducerFlowControl = connectionContext.isProducerFlowControl();
            try {
                connectionContext.setProducerFlowControl(false);
                ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
                producerBrokerExchange.setMutable(false);
                producerBrokerExchange.setConnectionContext(connectionContext);
                producerBrokerExchange.setProducerState(new ProducerState(new ProducerInfo()));
                connectionContext.getBroker().send(producerBrokerExchange, copy);
                connectionContext.setProducerFlowControl(isProducerFlowControl);
            } catch (Throwable th) {
                connectionContext.setProducerFlowControl(isProducerFlowControl);
                throw th;
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
    }

    public final int getStoreUsageHighWaterMark() {
        return this.storeUsageHighWaterMark;
    }

    public void setStoreUsageHighWaterMark(int i) {
        this.storeUsageHighWaterMark = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void waitForSpace(ConnectionContext connectionContext, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String str) throws IOException, InterruptedException, ResourceAllocationException {
        waitForSpace(connectionContext, producerBrokerExchange, usage, 100, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void waitForSpace(ConnectionContext connectionContext, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int i, String str) throws IOException, InterruptedException, ResourceAllocationException {
        if (!connectionContext.isNetworkConnection() && this.systemUsage.isSendFailIfNoSpace()) {
            getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, str);
            throw new ResourceAllocationException(str);
        }
        if (!connectionContext.isNetworkConnection() && this.systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
            if (usage.waitForSpace(this.systemUsage.getSendFailIfNoSpaceAfterTimeout(), i)) {
                return;
            }
            getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, str);
            throw new ResourceAllocationException(str);
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis;
        producerBrokerExchange.blockingOnFlowControl(true);
        this.destinationStatistics.getBlockedSends().increment();
        while (!usage.waitForSpace(1000L, i)) {
            if (connectionContext.getStopping().get()) {
                throw new IOException("Connection closed, send aborted.");
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            if (currentTimeMillis2 >= j) {
                getLog().info("{}: {} (blocking for: {}s)", usage, str, new Long((currentTimeMillis2 - currentTimeMillis) / 1000));
                j = currentTimeMillis2 + this.blockedProducerWarningInterval;
            }
        }
        long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
        this.destinationStatistics.getBlockedTime().addTime(currentTimeMillis3);
        producerBrokerExchange.incrementTimeBlocked(this, currentTimeMillis3);
        producerBrokerExchange.blockingOnFlowControl(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Logger getLog();

    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
        this.slowConsumerStrategy = slowConsumerStrategy;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public SlowConsumerStrategy getSlowConsumerStrategy() {
        return this.slowConsumerStrategy;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isPrioritizedMessages() {
        return this.prioritizedMessages;
    }

    public void setPrioritizedMessages(boolean z) {
        this.prioritizedMessages = z;
        if (this.store != null) {
            this.store.setPrioritizedMessages(z);
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public long getInactiveTimeoutBeforeGC() {
        return this.inactiveTimeoutBeforeGC;
    }

    public void setInactiveTimeoutBeforeGC(long j) {
        this.inactiveTimeoutBeforeGC = j;
    }

    public boolean isGcIfInactive() {
        return this.gcIfInactive;
    }

    public void setGcIfInactive(boolean z) {
        this.gcIfInactive = z;
    }

    public void setGcWithNetworkConsumers(boolean z) {
        this.gcWithNetworkConsumers = z;
    }

    public boolean isGcWithNetworkConsumers() {
        return this.gcWithNetworkConsumers;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void markForGC(long j) {
        if (isGcIfInactive() && this.lastActiveTime == 0 && !isActive() && this.destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0) {
            this.lastActiveTime = j;
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean canGC() {
        boolean z = false;
        if (isGcIfInactive() && this.lastActiveTime != 0 && System.currentTimeMillis() - this.lastActiveTime >= getInactiveTimeoutBeforeGC()) {
            z = true;
        }
        return z;
    }

    public void setReduceMemoryFootprint(boolean z) {
        this.reduceMemoryFootprint = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isReduceMemoryFootprint() {
        return this.reduceMemoryFootprint;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public boolean isDoOptimzeMessageStorage() {
        return this.doOptimzeMessageStorage;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void setDoOptimzeMessageStorage(boolean z) {
        this.doOptimzeMessageStorage = z;
    }

    public int getOptimizeMessageStoreInFlightLimit() {
        return this.optimizeMessageStoreInFlightLimit;
    }

    public void setOptimizeMessageStoreInFlightLimit(int i) {
        this.optimizeMessageStoreInFlightLimit = i;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public abstract List<Subscription> getConsumers();

    protected boolean hasRegularConsumers(List<Subscription> list) {
        boolean z = false;
        Iterator<Subscription> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().getConsumerInfo().isNetworkSubscription()) {
                z = true;
                break;
            }
        }
        return z;
    }

    public ConnectionContext createConnectionContext() {
        ConnectionContext connectionContext = new ConnectionContext(new NonCachedMessageEvaluationContext());
        connectionContext.setBroker(this.broker);
        connectionContext.getMessageEvaluationContext().setDestination(getActiveMQDestination());
        connectionContext.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
        return connectionContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageAck convertToNonRangedAck(MessageAck messageAck, MessageReference messageReference) {
        if (messageAck.getMessageCount() > 0) {
            MessageAck messageAck2 = new MessageAck();
            messageAck.copy(messageAck2);
            messageAck = messageAck2;
            messageAck.setMessageCount(1);
        }
        messageAck.setFirstMessageId(messageReference.getMessageId());
        messageAck.setLastMessageId(messageReference.getMessageId());
        return messageAck;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isDLQ() {
        return this.destination.isDLQ();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void duplicateFromStore(Message message, Subscription subscription) {
        ConnectionContext createConnectionContext = createConnectionContext();
        getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
        Throwable th = new Throwable("duplicate from store for " + this.destination);
        message.setRegionDestination(this);
        this.broker.getRoot().sendToDeadLetterQueue(createConnectionContext, message, null, th);
        MessageAck messageAck = new MessageAck(message, (byte) 1, 1);
        messageAck.setPoisonCause(th);
        try {
            acknowledge(createConnectionContext, subscription, messageAck, message);
        } catch (IOException e) {
            getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), this.destination, messageAck);
        }
    }

    public void setPersistJMSRedelivered(boolean z) {
        this.persistJMSRedelivered = z;
    }

    public boolean isPersistJMSRedelivered() {
        return this.persistJMSRedelivered;
    }
}
