package org.apache.activemq.broker.util;

import java.util.Iterator;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.state.ProducerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630371-04.jar:org/apache/activemq/broker/util/RedeliveryPlugin.class */
public class RedeliveryPlugin extends BrokerPluginSupport {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RedeliveryPlugin.class);
    public static final String REDELIVERY_DELAY = "redeliveryDelay";
    RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
    boolean sendToDlqIfMaxRetriesExceeded = true;
    private boolean fallbackToDeadLetter = true;

    @Override // org.apache.activemq.broker.BrokerPluginSupport, org.apache.activemq.broker.BrokerPlugin
    public Broker installPlugin(Broker broker) throws Exception {
        if (!broker.getBrokerService().isSchedulerSupport()) {
            throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
        }
        validatePolicyDelay(1000L);
        return super.installPlugin(broker);
    }

    private void validatePolicyDelay(long j) {
        Iterator it = this.redeliveryPolicyMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(DestinationFilter.ANY_DESCENDENT), new ActiveMQTopic(DestinationFilter.ANY_DESCENDENT)})).iterator();
        while (it.hasNext()) {
            validateLimit(j, (RedeliveryPolicy) it.next());
        }
        RedeliveryPolicy defaultEntry = this.redeliveryPolicyMap.getDefaultEntry();
        if (defaultEntry != null) {
            validateLimit(j, defaultEntry);
        }
    }

    private void validateLimit(long j, RedeliveryPolicy redeliveryPolicy) {
        if (redeliveryPolicy.getInitialRedeliveryDelay() < j) {
            throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + j + ". " + redeliveryPolicy);
        }
        if (redeliveryPolicy.getRedeliveryDelay() < j) {
            throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + j + ". " + redeliveryPolicy);
        }
    }

    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
        return this.redeliveryPolicyMap;
    }

    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
        this.redeliveryPolicyMap = redeliveryPolicyMap;
    }

    public boolean isSendToDlqIfMaxRetriesExceeded() {
        return this.sendToDlqIfMaxRetriesExceeded;
    }

    public void setSendToDlqIfMaxRetriesExceeded(boolean z) {
        this.sendToDlqIfMaxRetriesExceeded = z;
    }

    public boolean isFallbackToDeadLetter() {
        return this.fallbackToDeadLetter;
    }

    public void setFallbackToDeadLetter(boolean z) {
        this.fallbackToDeadLetter = z;
    }

    @Override // org.apache.activemq.broker.MutableBrokerFilter, org.apache.activemq.broker.Broker
    public boolean sendToDeadLetterQueue(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription, Throwable th) {
        if (messageReference.isExpired() || !(th == null || th.getMessage() == null || !th.getMessage().contains(BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX))) {
            return super.sendToDeadLetterQueue(connectionContext, messageReference, subscription, th);
        }
        try {
            Destination destination = (Destination) messageReference.getRegionDestination();
            RedeliveryPolicy entryFor = this.redeliveryPolicyMap.getEntryFor(destination.getActiveMQDestination());
            if (entryFor == null) {
                if (isFallbackToDeadLetter()) {
                    return super.sendToDeadLetterQueue(connectionContext, messageReference, subscription, th);
                }
                LOG.debug("Ignoring dlq request for: {}, RedeliveryPolicy not found (and no fallback) for: {}", messageReference.getMessageId(), destination.getActiveMQDestination());
                return false;
            }
            int maximumRedeliveries = entryFor.getMaximumRedeliveries();
            int redeliveryCounter = messageReference.getRedeliveryCounter();
            if (-1 == maximumRedeliveries || redeliveryCounter < maximumRedeliveries) {
                long initialRedeliveryDelay = entryFor.getInitialRedeliveryDelay();
                for (int i = 0; i < redeliveryCounter; i++) {
                    initialRedeliveryDelay = entryFor.getNextRedeliveryDelay(initialRedeliveryDelay);
                }
                scheduleRedelivery(connectionContext, messageReference, initialRedeliveryDelay, redeliveryCounter + 1);
            } else {
                if (isSendToDlqIfMaxRetriesExceeded()) {
                    return super.sendToDeadLetterQueue(connectionContext, messageReference, subscription, th);
                }
                LOG.debug("Discarding message that exceeds max redelivery count({}), {}", Integer.valueOf(maximumRedeliveries), messageReference.getMessageId());
            }
            return false;
        } catch (Exception e) {
            RuntimeException runtimeException = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), e);
            LOG.error(runtimeException.toString(), (Throwable) e);
            throw runtimeException;
        }
    }

    private void scheduleRedelivery(ConnectionContext connectionContext, MessageReference messageReference, long j, int i) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("redelivery #{} of: {} with delay: {}, dest: {}", Integer.valueOf(i), messageReference.getMessageId(), Long.valueOf(j), ((Destination) messageReference.getRegionDestination()).getActiveMQDestination());
        }
        Message copy = messageReference.getMessage().copy();
        copy.setTransactionId(null);
        copy.setMemoryUsage(null);
        copy.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
        copy.setProperty(REDELIVERY_DELAY, Long.valueOf(j));
        copy.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, Long.valueOf(j));
        copy.setRedeliveryCounter(i);
        boolean isProducerFlowControl = connectionContext.isProducerFlowControl();
        try {
            connectionContext.setProducerFlowControl(false);
            ProducerState producerState = new ProducerState(new ProducerInfo());
            ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
            producerBrokerExchange.setProducerState(producerState);
            producerBrokerExchange.setMutable(true);
            producerBrokerExchange.setConnectionContext(connectionContext);
            connectionContext.getBroker().send(producerBrokerExchange, copy);
            connectionContext.setProducerFlowControl(isProducerFlowControl);
        } catch (Throwable th) {
            connectionContext.setProducerFlowControl(isProducerFlowControl);
            throw th;
        }
    }
}
