package org.apache.camel.processor.aggregate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.aries.blueprint.ext.impl.ExtNamespaceHandler;
import org.apache.aries.blueprint.parser.Parser;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Navigate;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.TimeoutMap;
import org.apache.camel.Traceable;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultTimeoutMap;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621222-09.jar:org/apache/camel/processor/aggregate/AggregateProcessor.class */
public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, IdAware {
    public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
    private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
    private final CamelContext camelContext;
    private final Processor processor;
    private String id;
    private AggregationStrategy aggregationStrategy;
    private boolean preCompletion;
    private Expression correlationExpression;
    private final ExecutorService executorService;
    private final boolean shutdownExecutorService;
    private ScheduledExecutorService timeoutCheckerExecutorService;
    private boolean shutdownTimeoutCheckerExecutorService;
    private ScheduledExecutorService recoverService;
    private TimeoutMap<String, String> timeoutMap;
    private ExceptionHandler exceptionHandler;
    private AggregationRepository aggregationRepository;
    private Map<String, String> closedCorrelationKeys;
    private boolean ignoreInvalidCorrelationKeys;
    private Integer closeCorrelationKeyOnCompletion;
    private boolean parallelProcessing;
    private boolean optimisticLocking;
    private boolean eagerCheckCompletion;
    private Predicate completionPredicate;
    private long completionTimeout;
    private Expression completionTimeoutExpression;
    private long completionInterval;
    private int completionSize;
    private Expression completionSizeExpression;
    private boolean completionFromBatchConsumer;
    private boolean discardOnCompletionTimeout;
    private boolean forceCompletionOnStop;
    private ProducerTemplate deadLetterProducerTemplate;
    private final Lock lock = new ReentrantLock();
    private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy();
    private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet();
    private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap();
    private AtomicInteger batchConsumerCounter = new AtomicInteger();

    /* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621222-09.jar:org/apache/camel/processor/aggregate/AggregateProcessor$AggregateOnCompletion.class */
    private final class AggregateOnCompletion implements Synchronization {
        private final String exchangeId;

        private AggregateOnCompletion(String str) {
            this.exchangeId = str;
        }

        @Override // org.apache.camel.spi.Synchronization
        public void onFailure(Exchange exchange) {
            AggregateProcessor.LOG.trace("Aggregated exchange onFailure: {}", exchange);
            AggregateProcessor.this.inProgressCompleteExchanges.remove(this.exchangeId);
        }

        @Override // org.apache.camel.spi.Synchronization
        public void onComplete(Exchange exchange) {
            AggregateProcessor.LOG.trace("Aggregated exchange onComplete: {}", exchange);
            try {
                AggregateProcessor.this.aggregationRepository.confirm(exchange.getContext(), this.exchangeId);
                AggregateProcessor.this.redeliveryState.remove(this.exchangeId);
                AggregateProcessor.this.inProgressCompleteExchanges.remove(this.exchangeId);
            } catch (Throwable th) {
                AggregateProcessor.this.inProgressCompleteExchanges.remove(this.exchangeId);
                throw th;
            }
        }

        public String toString() {
            return "AggregateOnCompletion";
        }
    }

    /* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621222-09.jar:org/apache/camel/processor/aggregate/AggregateProcessor$AggregationIntervalTask.class */
    private final class AggregationIntervalTask implements Runnable {
        private AggregationIntervalTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!AggregateProcessor.this.camelContext.getStatus().isStarted()) {
                AggregateProcessor.LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", AggregateProcessor.this.camelContext.getName());
                return;
            }
            AggregateProcessor.LOG.trace("Starting completion interval task");
            Set<String> keys = AggregateProcessor.this.aggregationRepository.getKeys();
            if (keys != null && !keys.isEmpty()) {
                if (!AggregateProcessor.this.optimisticLocking) {
                    AggregateProcessor.this.lock.lock();
                }
                try {
                    for (String str : keys) {
                        boolean z = false;
                        Exchange exchange = AggregateProcessor.this.aggregationRepository.get(AggregateProcessor.this.camelContext, str);
                        if (exchange == null) {
                            z = true;
                        } else {
                            AggregateProcessor.LOG.trace("Completion interval triggered for correlation key: {}", str);
                            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
                            try {
                                Exchange onCompletion = AggregateProcessor.this.onCompletion(str, exchange, exchange, false);
                                if (onCompletion != null) {
                                    AggregateProcessor.this.onSubmitCompletion(str, onCompletion);
                                }
                            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
                                z = true;
                            }
                        }
                        if (AggregateProcessor.this.optimisticLocking && z) {
                            AggregateProcessor.LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", str);
                        }
                    }
                } finally {
                    if (!AggregateProcessor.this.optimisticLocking) {
                        AggregateProcessor.this.lock.unlock();
                    }
                }
            }
            AggregateProcessor.LOG.trace("Completion interval task complete");
        }
    }

    /* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621222-09.jar:org/apache/camel/processor/aggregate/AggregateProcessor$AggregationTimeoutMap.class */
    private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> {
        private AggregationTimeoutMap(ScheduledExecutorService scheduledExecutorService, long j) {
            super(scheduledExecutorService, j, AggregateProcessor.this.optimisticLocking);
        }

        @Override // org.apache.camel.support.DefaultTimeoutMap, org.apache.camel.TimeoutMap
        public void purge() {
            if (!AggregateProcessor.this.optimisticLocking) {
                AggregateProcessor.this.lock.lock();
            }
            try {
                super.purge();
                if (AggregateProcessor.this.optimisticLocking) {
                    return;
                }
                AggregateProcessor.this.lock.unlock();
            } catch (Throwable th) {
                if (!AggregateProcessor.this.optimisticLocking) {
                    AggregateProcessor.this.lock.unlock();
                }
                throw th;
            }
        }

        @Override // org.apache.camel.support.DefaultTimeoutMap, org.apache.camel.TimeoutMap
        public boolean onEviction(String str, String str2) {
            this.log.debug("Completion timeout triggered for correlation key: {}", str);
            if (AggregateProcessor.this.inProgressCompleteExchanges.contains(str2)) {
                AggregateProcessor.LOG.trace("Aggregated exchange with id: {} is already in progress.", str2);
                return true;
            }
            boolean z = false;
            Exchange exchange = AggregateProcessor.this.aggregationRepository.get(AggregateProcessor.this.camelContext, str);
            if (exchange == null) {
                z = true;
            } else {
                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, Parser.TIMEOUT_ATTRIBUTE);
                try {
                    Exchange onCompletion = AggregateProcessor.this.onCompletion(str, exchange, exchange, true);
                    if (onCompletion != null) {
                        AggregateProcessor.this.onSubmitCompletion(str, onCompletion);
                    }
                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
                    z = true;
                }
            }
            if (!AggregateProcessor.this.optimisticLocking || !z) {
                return true;
            }
            AggregateProcessor.LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction for exchange with id: {} and correlation id: {}", str2, str);
            return true;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621222-09.jar:org/apache/camel/processor/aggregate/AggregateProcessor$RecoverTask.class */
    private final class RecoverTask implements Runnable {
        private final RecoverableAggregationRepository recoverable;

        private RecoverTask(RecoverableAggregationRepository recoverableAggregationRepository) {
            this.recoverable = recoverableAggregationRepository;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!AggregateProcessor.this.camelContext.getStatus().isStarted()) {
                AggregateProcessor.LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", AggregateProcessor.this.camelContext.getName());
                return;
            }
            AggregateProcessor.LOG.trace("Starting recover check");
            LinkedHashSet linkedHashSet = new LinkedHashSet(AggregateProcessor.this.inProgressCompleteExchanges);
            for (String str : this.recoverable.scan(AggregateProcessor.this.camelContext)) {
                if (!AggregateProcessor.this.isRunAllowed()) {
                    AggregateProcessor.LOG.info("We are shutting down so stop recovering");
                    return;
                }
                if (linkedHashSet.contains(str) || AggregateProcessor.this.inProgressCompleteExchanges.contains(str)) {
                    AggregateProcessor.LOG.trace("Aggregated exchange with id: {} is already in progress.", str);
                } else {
                    AggregateProcessor.LOG.debug("Loading aggregated exchange with id: {} to be recovered.", str);
                    Exchange recover = this.recoverable.recover(AggregateProcessor.this.camelContext, str);
                    if (recover != null) {
                        String str2 = (String) recover.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
                        recover.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
                        RedeliveryData redeliveryData = (RedeliveryData) AggregateProcessor.this.redeliveryState.get(recover.getExchangeId());
                        if (redeliveryData == null || this.recoverable.getMaximumRedeliveries() <= 0 || redeliveryData.redeliveryCounter < this.recoverable.getMaximumRedeliveries()) {
                            if (redeliveryData == null) {
                                redeliveryData = new RedeliveryData();
                                AggregateProcessor.this.redeliveryState.put(recover.getExchangeId(), redeliveryData);
                            }
                            redeliveryData.redeliveryCounter++;
                            recover.getIn().setHeader(Exchange.REDELIVERY_COUNTER, Integer.valueOf(redeliveryData.redeliveryCounter));
                            if (this.recoverable.getMaximumRedeliveries() > 0) {
                                recover.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.valueOf(this.recoverable.getMaximumRedeliveries()));
                            }
                            AggregateProcessor.LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", Integer.valueOf(redeliveryData.redeliveryCounter), str);
                            AggregateProcessor.this.onSubmitCompletion(str2, recover);
                        } else {
                            AggregateProcessor.LOG.warn("The recovered exchange is exhausted after " + this.recoverable.getMaximumRedeliveries() + " attempts, will now be moved to dead letter channel: " + this.recoverable.getDeadLetterUri());
                            try {
                                recover.getIn().setHeader(Exchange.REDELIVERY_COUNTER, Integer.valueOf(redeliveryData.redeliveryCounter));
                                recover.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
                                AggregateProcessor.this.deadLetterProducerTemplate.send(this.recoverable.getDeadLetterUri(), recover);
                            } catch (Throwable th) {
                                recover.setException(th);
                            }
                            if (recover.getException() != null) {
                                AggregateProcessor.this.getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + this.recoverable.getDeadLetterUri(), recover.getException());
                            } else {
                                this.recoverable.confirm(AggregateProcessor.this.camelContext, str);
                            }
                        }
                    }
                }
            }
            AggregateProcessor.LOG.trace("Recover check complete");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621222-09.jar:org/apache/camel/processor/aggregate/AggregateProcessor$RedeliveryData.class */
    public class RedeliveryData {
        int redeliveryCounter;

        private RedeliveryData() {
        }
    }

    public AggregateProcessor(CamelContext camelContext, Processor processor, Expression expression, AggregationStrategy aggregationStrategy, ExecutorService executorService, boolean z) {
        ObjectHelper.notNull(camelContext, "camelContext");
        ObjectHelper.notNull(processor, ExtNamespaceHandler.ROLE_PROCESSOR);
        ObjectHelper.notNull(expression, "correlationExpression");
        ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
        ObjectHelper.notNull(executorService, "executorService");
        this.camelContext = camelContext;
        this.processor = processor;
        this.correlationExpression = expression;
        this.aggregationStrategy = aggregationStrategy;
        this.executorService = executorService;
        this.shutdownExecutorService = z;
        this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
    }

    public String toString() {
        return "AggregateProcessor[to: " + this.processor + "]";
    }

    @Override // org.apache.camel.Traceable
    public String getTraceLabel() {
        return "aggregate[" + this.correlationExpression + "]";
    }

    @Override // org.apache.camel.Navigate
    public List<Processor> next() {
        if (!hasNext()) {
            return null;
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(this.processor);
        return arrayList;
    }

    @Override // org.apache.camel.Navigate
    public boolean hasNext() {
        return this.processor != null;
    }

    @Override // org.apache.camel.spi.HasId
    public String getId() {
        return this.id;
    }

    @Override // org.apache.camel.spi.IdAware
    public void setId(String str) {
        this.id = str;
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        AsyncProcessorHelper.process(this, exchange);
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            doProcess(exchange);
        } catch (Throwable th) {
            exchange.setException(th);
        }
        asyncCallback.done(true);
        return true;
    }

    protected void doProcess(Exchange exchange) throws Exception {
        if (((Boolean) exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, Boolean.TYPE)).booleanValue()) {
            forceCompletionOfAllGroups();
            return;
        }
        String str = (String) this.correlationExpression.evaluate(exchange, String.class);
        if (ObjectHelper.isEmpty(str)) {
            if (!isIgnoreInvalidCorrelationKeys()) {
                throw new CamelExchangeException("Invalid correlation key", exchange);
            }
            LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange);
            return;
        }
        if (this.closedCorrelationKeys != null && this.closedCorrelationKeys.containsKey(str)) {
            throw new ClosedCorrelationKeyException(str, exchange);
        }
        if (this.optimisticLocking) {
            List<Exchange> list = null;
            boolean z = true;
            int i = 0;
            do {
                i++;
                Exchange createCorrelatedCopy = ExchangeHelper.createCorrelatedCopy(exchange, false);
                try {
                    list = doAggregation(str, createCorrelatedCopy);
                    z = false;
                    break;
                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
                    LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}", new Object[]{Integer.valueOf(i), this.aggregationRepository, str, createCorrelatedCopy, e});
                    this.optimisticLockRetryPolicy.doDelay(i);
                }
            } while (this.optimisticLockRetryPolicy.shouldRetry(i));
            if (z) {
                throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + i + " times", exchange, new OptimisticLockingAggregationRepository.OptimisticLockingException());
            }
            if (list != null) {
                Iterator<Exchange> it = list.iterator();
                while (it.hasNext()) {
                    onSubmitCompletion(str, it.next());
                }
            }
        } else {
            Exchange createCorrelatedCopy2 = ExchangeHelper.createCorrelatedCopy(exchange, false);
            this.lock.lock();
            try {
                List<Exchange> doAggregation = doAggregation(str, createCorrelatedCopy2);
                this.lock.unlock();
                if (doAggregation != null) {
                    Iterator<Exchange> it2 = doAggregation.iterator();
                    while (it2.hasNext()) {
                        onSubmitCompletion(str, it2.next());
                    }
                }
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
        if (((Boolean) exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, Boolean.TYPE)).booleanValue()) {
            forceCompletionOfAllGroups();
        }
    }

    private List<Exchange> doAggregation(String str, Exchange exchange) throws CamelExchangeException {
        LOG.trace("onAggregation +++ start +++ with correlation key: {}", str);
        ArrayList arrayList = new ArrayList();
        String str2 = null;
        Exchange exchange2 = this.aggregationRepository.get(exchange.getContext(), str);
        Exchange exchange3 = exchange2;
        Integer num = 1;
        if (exchange3 != null) {
            if (this.optimisticLocking && (this.aggregationRepository instanceof MemoryAggregationRepository)) {
                exchange3 = exchange2.copy();
            }
            num = Integer.valueOf(((Integer) exchange3.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class)).intValue() + 1);
        }
        ExchangeHelper.prepareAggregation(exchange3, exchange);
        if (this.preCompletion) {
            try {
                exchange.setProperty(Exchange.AGGREGATED_SIZE, num);
                str2 = isPreCompleted(str, exchange3, exchange);
                if (str2 == null) {
                    trackTimeout(str, exchange);
                }
                exchange.removeProperty(Exchange.AGGREGATED_SIZE);
            } catch (Throwable th) {
                throw new CamelExchangeException("Error occurred during preComplete", exchange, th);
            }
        } else if (isEagerCheckCompletion()) {
            exchange.setProperty(Exchange.AGGREGATED_SIZE, num);
            str2 = isCompleted(str, exchange);
            if (str2 == null) {
                trackTimeout(str, exchange);
            }
            exchange.removeProperty(Exchange.AGGREGATED_SIZE);
        }
        if (this.preCompletion && str2 != null) {
            doAggregationComplete(str2, arrayList, str, exchange2, exchange3);
            str2 = null;
            exchange3 = null;
            exchange2 = null;
            num = 1;
            trackTimeout(str, exchange);
        }
        try {
            Exchange onAggregation = onAggregation(exchange3, exchange);
            if (onAggregation == null) {
                throw new CamelExchangeException("AggregationStrategy " + this.aggregationStrategy + " returned null which is not allowed", exchange);
            }
            onAggregation.setProperty(Exchange.AGGREGATED_SIZE, num);
            if (!this.preCompletion && !isEagerCheckCompletion()) {
                str2 = isCompleted(str, onAggregation);
                if (str2 == null) {
                    trackTimeout(str, exchange);
                }
            }
            if (str2 == null) {
                doAggregationRepositoryAdd(exchange.getContext(), str, exchange2, onAggregation);
            } else {
                doAggregationComplete(str2, arrayList, str, exchange2, onAggregation);
            }
            LOG.trace("onAggregation +++  end  +++ with correlation key: {}", str);
            return arrayList;
        } catch (Throwable th2) {
            throw new CamelExchangeException("Error occurred during aggregation", exchange, th2);
        }
    }

    protected void doAggregationComplete(String str, List<Exchange> list, String str2, Exchange exchange, Exchange exchange2) {
        if ("consumer".equals(str)) {
            for (String str3 : this.batchConsumerCorrelationKeys) {
                Exchange exchange3 = str3.equals(str2) ? exchange2 : this.aggregationRepository.get(this.camelContext, str3);
                if (exchange3 != null) {
                    exchange3.setProperty(Exchange.AGGREGATED_COMPLETED_BY, str);
                    onCompletion(str3, exchange, exchange3, false);
                    list.add(exchange3);
                }
            }
            this.batchConsumerCorrelationKeys.clear();
            exchange2 = null;
        } else if (exchange2 != null) {
            exchange2.setProperty(Exchange.AGGREGATED_COMPLETED_BY, str);
            exchange2 = onCompletion(str2, exchange, exchange2, false);
        }
        if (exchange2 != null) {
            list.add(exchange2);
        }
    }

    protected void doAggregationRepositoryAdd(CamelContext camelContext, String str, Exchange exchange, Exchange exchange2) {
        LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{exchange, exchange2, str});
        if (!this.optimisticLocking) {
            this.aggregationRepository.add(camelContext, str, exchange2);
            return;
        }
        try {
            ((OptimisticLockingAggregationRepository) this.aggregationRepository).add(camelContext, str, exchange, exchange2);
        } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
            onOptimisticLockingFailure(exchange, exchange2);
            throw e;
        }
    }

    protected void onOptimisticLockingFailure(Exchange exchange, Exchange exchange2) {
        if (this.aggregationStrategy instanceof OptimisticLockingAwareAggregationStrategy) {
            LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}", new Object[]{this.aggregationStrategy, exchange, exchange2});
            ((OptimisticLockingAwareAggregationStrategy) this.aggregationStrategy).onOptimisticLockFailure(exchange, exchange2);
        }
    }

    protected String isPreCompleted(String str, Exchange exchange, Exchange exchange2) {
        boolean z = false;
        if (this.aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
            z = ((PreCompletionAwareAggregationStrategy) this.aggregationStrategy).preComplete(exchange, exchange2);
        }
        if (z) {
            return "strategy";
        }
        return null;
    }

    protected String isCompleted(String str, Exchange exchange) {
        Integer num;
        if (isCompletionFromBatchConsumer()) {
            this.batchConsumerCorrelationKeys.add(str);
            this.batchConsumerCounter.incrementAndGet();
            int intValue = ((Integer) exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class)).intValue();
            if (intValue > 0 && this.batchConsumerCounter.intValue() >= intValue) {
                this.batchConsumerCounter.set(0);
                return "consumer";
            }
        }
        if (((Boolean) exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, Boolean.TYPE)).booleanValue()) {
            return "strategy";
        }
        if (getCompletionPredicate() != null && getCompletionPredicate().matches(exchange)) {
            return "predicate";
        }
        boolean z = false;
        if (getCompletionSizeExpression() != null && (num = (Integer) getCompletionSizeExpression().evaluate(exchange, Integer.class)) != null && num.intValue() > 0) {
            z = true;
            if (((Integer) exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class)).intValue() >= num.intValue()) {
                return "size";
            }
        }
        if (z || getCompletionSize() <= 0 || ((Integer) exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class)).intValue() < getCompletionSize()) {
            return null;
        }
        return "size";
    }

    protected void trackTimeout(String str, Exchange exchange) {
        Long l;
        boolean z = false;
        if (getCompletionTimeoutExpression() != null && (l = (Long) getCompletionTimeoutExpression().evaluate(exchange, Long.class)) != null && l.longValue() > 0) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", new Object[]{str, l, exchange});
            }
            addExchangeToTimeoutMap(str, exchange, l.longValue());
            z = true;
        }
        if (z || getCompletionTimeout() <= 0) {
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", new Object[]{str, Long.valueOf(getCompletionTimeout()), exchange});
        }
        addExchangeToTimeoutMap(str, exchange, getCompletionTimeout());
    }

    protected Exchange onAggregation(Exchange exchange, Exchange exchange2) {
        return this.aggregationStrategy.aggregate(exchange, exchange2);
    }

    protected boolean onPreCompletionAggregation(Exchange exchange, Exchange exchange2) {
        if (this.aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
            return ((PreCompletionAwareAggregationStrategy) this.aggregationStrategy).preComplete(exchange, exchange2);
        }
        return false;
    }

    protected Exchange onCompletion(String str, Exchange exchange, Exchange exchange2, boolean z) {
        Exchange exchange3;
        if (exchange != null) {
            exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, str);
        }
        exchange2.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, str);
        if (exchange != null) {
            this.aggregationRepository.remove(exchange2.getContext(), str, exchange);
        }
        if (!z && this.timeoutMap != null) {
            LOG.trace("Removing correlation key {} from timeout", str);
            this.timeoutMap.remove(str);
        }
        if (this.closedCorrelationKeys != null) {
            this.closedCorrelationKeys.put(str, str);
        }
        if (z && (this.aggregationStrategy instanceof TimeoutAwareAggregationStrategy)) {
            ((TimeoutAwareAggregationStrategy) this.aggregationStrategy).timeout(exchange2, -1, -1, getCompletionTimeout() > 0 ? getCompletionTimeout() : -1L);
        }
        if (z && isDiscardOnCompletionTimeout()) {
            LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", str, exchange2);
            this.aggregationRepository.confirm(exchange2.getContext(), exchange2.getExchangeId());
            this.redeliveryState.remove(exchange2.getExchangeId());
            exchange3 = null;
        } else {
            exchange3 = exchange2;
        }
        return exchange3;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onSubmitCompletion(String str, final Exchange exchange) {
        LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", str, exchange);
        this.inProgressCompleteExchanges.add(exchange.getExchangeId());
        if (this.aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
            ((CompletionAwareAggregationStrategy) this.aggregationStrategy).onCompletion(exchange);
        }
        this.executorService.submit(new Runnable() { // from class: org.apache.camel.processor.aggregate.AggregateProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                AggregateProcessor.LOG.debug("Processing aggregated exchange: {}", exchange);
                exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId()));
                try {
                    AggregateProcessor.this.processor.process(exchange);
                } catch (Throwable th) {
                    exchange.setException(th);
                }
                if (exchange.getException() != null) {
                    AggregateProcessor.this.getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
                } else {
                    AggregateProcessor.LOG.trace("Processing aggregated exchange: {} complete.", exchange);
                }
            }
        });
    }

    protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
        Set<String> keys = this.aggregationRepository.getKeys();
        if (keys == null || keys.isEmpty()) {
            return;
        }
        StopWatch stopWatch = new StopWatch();
        LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", Integer.valueOf(keys.size()));
        for (String str : keys) {
            Exchange exchange = this.aggregationRepository.get(this.camelContext, str);
            long longValue = exchange.hasProperties() ? ((Long) exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, Long.TYPE)).longValue() : 0L;
            if (longValue > 0) {
                LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), Long.valueOf(longValue));
                addExchangeToTimeoutMap(str, exchange, longValue);
            }
        }
        LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}", Integer.valueOf(this.timeoutMap.size()), TimeUtils.printDuration(stopWatch.stop()));
    }

    private void addExchangeToTimeoutMap(String str, Exchange exchange, long j) {
        exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, Long.valueOf(j));
        this.timeoutMap.put(str, exchange.getExchangeId(), j);
    }

    public Predicate getCompletionPredicate() {
        return this.completionPredicate;
    }

    public void setCompletionPredicate(Predicate predicate) {
        this.completionPredicate = predicate;
    }

    public boolean isEagerCheckCompletion() {
        return this.eagerCheckCompletion;
    }

    public void setEagerCheckCompletion(boolean z) {
        this.eagerCheckCompletion = z;
    }

    public long getCompletionTimeout() {
        return this.completionTimeout;
    }

    public void setCompletionTimeout(long j) {
        this.completionTimeout = j;
    }

    public Expression getCompletionTimeoutExpression() {
        return this.completionTimeoutExpression;
    }

    public void setCompletionTimeoutExpression(Expression expression) {
        this.completionTimeoutExpression = expression;
    }

    public long getCompletionInterval() {
        return this.completionInterval;
    }

    public void setCompletionInterval(long j) {
        this.completionInterval = j;
    }

    public int getCompletionSize() {
        return this.completionSize;
    }

    public void setCompletionSize(int i) {
        this.completionSize = i;
    }

    public Expression getCompletionSizeExpression() {
        return this.completionSizeExpression;
    }

    public void setCompletionSizeExpression(Expression expression) {
        this.completionSizeExpression = expression;
    }

    public boolean isIgnoreInvalidCorrelationKeys() {
        return this.ignoreInvalidCorrelationKeys;
    }

    public void setIgnoreInvalidCorrelationKeys(boolean z) {
        this.ignoreInvalidCorrelationKeys = z;
    }

    public Integer getCloseCorrelationKeyOnCompletion() {
        return this.closeCorrelationKeyOnCompletion;
    }

    public void setCloseCorrelationKeyOnCompletion(Integer num) {
        this.closeCorrelationKeyOnCompletion = num;
    }

    public boolean isCompletionFromBatchConsumer() {
        return this.completionFromBatchConsumer;
    }

    public void setCompletionFromBatchConsumer(boolean z) {
        this.completionFromBatchConsumer = z;
    }

    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    public boolean isParallelProcessing() {
        return this.parallelProcessing;
    }

    public void setParallelProcessing(boolean z) {
        this.parallelProcessing = z;
    }

    public boolean isOptimisticLocking() {
        return this.optimisticLocking;
    }

    public void setOptimisticLocking(boolean z) {
        this.optimisticLocking = z;
    }

    public AggregationRepository getAggregationRepository() {
        return this.aggregationRepository;
    }

    public void setAggregationRepository(AggregationRepository aggregationRepository) {
        this.aggregationRepository = aggregationRepository;
    }

    public boolean isDiscardOnCompletionTimeout() {
        return this.discardOnCompletionTimeout;
    }

    public void setDiscardOnCompletionTimeout(boolean z) {
        this.discardOnCompletionTimeout = z;
    }

    public void setForceCompletionOnStop(boolean z) {
        this.forceCompletionOnStop = z;
    }

    public void setTimeoutCheckerExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.timeoutCheckerExecutorService = scheduledExecutorService;
    }

    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
        return this.timeoutCheckerExecutorService;
    }

    public boolean isShutdownTimeoutCheckerExecutorService() {
        return this.shutdownTimeoutCheckerExecutorService;
    }

    public void setShutdownTimeoutCheckerExecutorService(boolean z) {
        this.shutdownTimeoutCheckerExecutorService = z;
    }

    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
    }

    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
        return this.optimisticLockRetryPolicy;
    }

    public AggregationStrategy getAggregationStrategy() {
        return this.aggregationStrategy;
    }

    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    public Expression getCorrelationExpression() {
        return this.correlationExpression;
    }

    public void setCorrelationExpression(Expression expression) {
        this.correlationExpression = expression;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        if (this.aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
            this.preCompletion = true;
            LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
        }
        if (!this.preCompletion && getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null && getCompletionSizeExpression() == null) {
            throw new IllegalStateException("At least one of the completions options [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
        }
        if (getCloseCorrelationKeyOnCompletion() != null) {
            if (getCloseCorrelationKeyOnCompletion().intValue() > 0) {
                LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
                this.closedCorrelationKeys = new LRUCache(getCloseCorrelationKeyOnCompletion().intValue());
            } else {
                LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
                this.closedCorrelationKeys = new ConcurrentHashMap();
            }
        }
        if (this.aggregationRepository == null) {
            this.aggregationRepository = new MemoryAggregationRepository(this.optimisticLocking);
            LOG.info("Defaulting to MemoryAggregationRepository");
        }
        if (this.optimisticLocking) {
            if (!(this.aggregationRepository instanceof OptimisticLockingAggregationRepository)) {
                throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository");
            }
            LOG.info("Optimistic locking is enabled");
        }
        ServiceHelper.startServices(this.aggregationStrategy, this.processor, this.aggregationRepository);
        if (this.aggregationRepository instanceof RecoverableAggregationRepository) {
            RecoverableAggregationRepository recoverableAggregationRepository = (RecoverableAggregationRepository) this.aggregationRepository;
            if (recoverableAggregationRepository.isUseRecovery()) {
                long recoveryIntervalInMillis = recoverableAggregationRepository.getRecoveryIntervalInMillis();
                if (recoveryIntervalInMillis <= 0) {
                    throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + recoveryIntervalInMillis);
                }
                this.recoverService = this.camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1);
                RecoverTask recoverTask = new RecoverTask(recoverableAggregationRepository);
                LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + recoveryIntervalInMillis + " millis.");
                this.recoverService.scheduleWithFixedDelay(recoverTask, 1000L, recoveryIntervalInMillis, TimeUnit.MILLISECONDS);
                if (recoverableAggregationRepository.getDeadLetterUri() != null) {
                    int maximumRedeliveries = recoverableAggregationRepository.getMaximumRedeliveries();
                    if (maximumRedeliveries <= 0) {
                        throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + maximumRedeliveries);
                    }
                    LOG.info("After " + maximumRedeliveries + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverableAggregationRepository.getDeadLetterUri());
                    if (this.camelContext.getEndpoint(recoverableAggregationRepository.getDeadLetterUri()) == null) {
                        throw new NoSuchEndpointException(recoverableAggregationRepository.getDeadLetterUri());
                    }
                    this.deadLetterProducerTemplate = this.camelContext.createProducerTemplate();
                }
            }
        }
        if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
            throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
        }
        if (getCompletionInterval() > 0) {
            LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
            if (getTimeoutCheckerExecutorService() == null) {
                setTimeoutCheckerExecutorService(this.camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
                this.shutdownTimeoutCheckerExecutorService = true;
            }
            getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
        }
        if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
            LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
            if (getTimeoutCheckerExecutorService() == null) {
                setTimeoutCheckerExecutorService(this.camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
                this.shutdownTimeoutCheckerExecutorService = true;
            }
            this.timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
            restoreTimeoutMapFromAggregationRepository();
            ServiceHelper.startService(this.timeoutMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        if (this.recoverService != null) {
            this.camelContext.getExecutorServiceManager().shutdown(this.recoverService);
        }
        ServiceHelper.stopServices(this.timeoutMap, this.processor, this.deadLetterProducerTemplate);
        if (this.closedCorrelationKeys != null) {
            ServiceHelper.stopService(this.closedCorrelationKeys);
            this.closedCorrelationKeys.clear();
        }
        this.batchConsumerCorrelationKeys.clear();
        this.redeliveryState.clear();
    }

    @Override // org.apache.camel.spi.ShutdownPrepared
    public void prepareShutdown(boolean z) {
        if (z || !this.forceCompletionOnStop) {
            return;
        }
        doForceCompletionOnStop();
    }

    private void doForceCompletionOnStop() {
        int forceCompletionOfAllGroups = forceCompletionOfAllGroups();
        StopWatch stopWatch = new StopWatch();
        while (this.inProgressCompleteExchanges.size() > 0) {
            LOG.trace("Waiting for {} inflight exchanges to complete", Integer.valueOf(this.inProgressCompleteExchanges.size()));
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", Integer.valueOf(this.inProgressCompleteExchanges.size()));
            }
        }
        if (forceCompletionOfAllGroups > 0) {
            LOG.info("Forcing completion of all groups with {} exchanges completed in {}", Integer.valueOf(forceCompletionOfAllGroups), TimeUtils.printDuration(stopWatch.stop()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownServices(this.aggregationRepository, this.aggregationStrategy);
        this.inProgressCompleteExchanges.clear();
        if (this.shutdownExecutorService) {
            this.camelContext.getExecutorServiceManager().shutdownNow(this.executorService);
        }
        if (this.shutdownTimeoutCheckerExecutorService) {
            this.camelContext.getExecutorServiceManager().shutdownNow(this.timeoutCheckerExecutorService);
            this.timeoutCheckerExecutorService = null;
        }
        super.doShutdown();
    }

    public int forceCompletionOfAllGroups() {
        if (!(this.camelContext.getStatus().isStarted() || this.camelContext.getStatus().isStopping())) {
            LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", this.camelContext.getName());
            return 0;
        }
        LOG.trace("Starting force completion of all groups task");
        Set<String> keys = this.aggregationRepository.getKeys();
        int i = 0;
        if (keys != null && !keys.isEmpty()) {
            if (!this.optimisticLocking) {
                this.lock.lock();
            }
            i = keys.size();
            try {
                for (String str : keys) {
                    Exchange exchange = this.aggregationRepository.get(this.camelContext, str);
                    if (exchange != null) {
                        LOG.trace("Force completion triggered for correlation key: {}", str);
                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
                        Exchange onCompletion = onCompletion(str, exchange, exchange, false);
                        if (onCompletion != null) {
                            onSubmitCompletion(str, onCompletion);
                        }
                    }
                }
            } finally {
                if (!this.optimisticLocking) {
                    this.lock.unlock();
                }
            }
        }
        LOG.trace("Completed force completion of all groups task");
        if (i > 0) {
            LOG.debug("Forcing completion of all groups with {} exchanges", Integer.valueOf(i));
        }
        return i;
    }
}
