package org.apache.camel.processor;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.StreamCache;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
import org.apache.camel.processor.aggregate.DelegateAggregationStrategy;
import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TracedRouteNodes;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AtomicException;
import org.apache.camel.util.concurrent.AtomicExchange;
import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
import org.apache.cxf.jaxrs.client.AbstractClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-core-2.23.2.fuse-7_10_1-00008-redhat-00001.jar:org/apache/camel/processor/MulticastProcessor.class */
public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MulticastProcessor.class);
    protected final Processor onPrepare;
    private final CamelContext camelContext;
    private String id;
    private Collection<Processor> processors;
    private final AggregationStrategy aggregationStrategy;
    private final boolean parallelProcessing;
    private final boolean streaming;
    private final boolean parallelAggregate;
    private final boolean stopOnAggregateException;
    private final boolean stopOnException;
    private final ExecutorService executorService;
    private final boolean shutdownExecutorService;
    private ExecutorService aggregateExecutorService;
    private final long timeout;
    private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers;
    private final boolean shareUnitOfWork;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/camel-core-2.23.2.fuse-7_10_1-00008-redhat-00001.jar:org/apache/camel/processor/MulticastProcessor$AggregateOnTheFlyTask.class */
    public final class AggregateOnTheFlyTask implements Runnable {
        private final AtomicExchange result;
        private final Exchange original;
        private final AtomicInteger total;
        private final CompletionService<Exchange> completion;
        private final AtomicBoolean running;
        private final CountDownLatch aggregationOnTheFlyDone;
        private final AtomicBoolean allTasksSubmitted;
        private final AtomicException executionException;

        private AggregateOnTheFlyTask(AtomicExchange atomicExchange, Exchange exchange, AtomicInteger atomicInteger, CompletionService<Exchange> completionService, AtomicBoolean atomicBoolean, CountDownLatch countDownLatch, AtomicBoolean atomicBoolean2, AtomicException atomicException) {
            this.result = atomicExchange;
            this.original = exchange;
            this.total = atomicInteger;
            this.completion = completionService;
            this.running = atomicBoolean;
            this.aggregationOnTheFlyDone = countDownLatch;
            this.allTasksSubmitted = atomicBoolean2;
            this.executionException = atomicException;
        }

        @Override // java.lang.Runnable
        public void run() {
            MulticastProcessor.LOG.trace("Aggregate on the fly task started for exchangeId: {}", this.original.getExchangeId());
            try {
                try {
                    aggregateOnTheFly();
                    MulticastProcessor.LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", this.original.getExchangeId());
                    MulticastProcessor.LOG.trace("Aggregate on the fly task done for exchangeId: {}", this.original.getExchangeId());
                    this.aggregationOnTheFlyDone.countDown();
                } catch (Throwable th) {
                    if (th instanceof Exception) {
                        this.executionException.set((Exception) th);
                    } else {
                        this.executionException.set(ObjectHelper.wrapRuntimeCamelException(th));
                    }
                    MulticastProcessor.LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", this.original.getExchangeId());
                    MulticastProcessor.LOG.trace("Aggregate on the fly task done for exchangeId: {}", this.original.getExchangeId());
                    this.aggregationOnTheFlyDone.countDown();
                }
            } catch (Throwable th2) {
                MulticastProcessor.LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", this.original.getExchangeId());
                MulticastProcessor.LOG.trace("Aggregate on the fly task done for exchangeId: {}", this.original.getExchangeId());
                this.aggregationOnTheFlyDone.countDown();
                throw th2;
            }
        }

        private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
            Future<Exchange> poll;
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            boolean z = false;
            StopWatch stopWatch = new StopWatch();
            AtomicInteger atomicInteger = new AtomicInteger();
            while (true) {
                if (0 == 0) {
                    if (this.allTasksSubmitted.get() && atomicInteger.intValue() >= this.total.get()) {
                        MulticastProcessor.LOG.debug("Done aggregating {} exchanges on the fly.", atomicInteger);
                        break;
                    }
                    if (atomicBoolean.get()) {
                        poll = this.completion.poll();
                        MulticastProcessor.LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", atomicInteger, poll);
                    } else if (MulticastProcessor.this.timeout > 0) {
                        long taken = MulticastProcessor.this.timeout - stopWatch.taken();
                        if (taken < 0) {
                            taken = 0;
                        }
                        MulticastProcessor.LOG.trace("Polling completion task #{} using timeout {} millis.", atomicInteger, Long.valueOf(taken));
                        poll = this.completion.poll(taken, TimeUnit.MILLISECONDS);
                    } else {
                        MulticastProcessor.LOG.trace("Polling completion task #{}", atomicInteger);
                        poll = this.completion.poll(1L, TimeUnit.SECONDS);
                        if (poll == null) {
                            continue;
                        }
                    }
                    if (poll != null) {
                        Exchange exchange = poll.get();
                        boolean continueProcessing = PipelineHelper.continueProcessing(exchange, "Parallel processing failed for number " + MulticastProcessor.this.getExchangeIndex(exchange), MulticastProcessor.LOG);
                        if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                            this.result.set(exchange);
                            z = true;
                            break;
                        } else {
                            ParallelAggregateTask parallelAggregateTask = new ParallelAggregateTask(this.result, exchange, atomicInteger);
                            if (MulticastProcessor.this.parallelAggregate) {
                                MulticastProcessor.this.aggregateExecutorService.submit(parallelAggregateTask);
                            } else {
                                parallelAggregateTask.run();
                            }
                        }
                    } else {
                        ParallelAggregateTimeoutTask parallelAggregateTimeoutTask = new ParallelAggregateTimeoutTask(this.original, this.result, this.completion, atomicInteger, this.total, atomicBoolean);
                        if (MulticastProcessor.this.parallelAggregate) {
                            MulticastProcessor.this.aggregateExecutorService.submit(parallelAggregateTimeoutTask);
                        } else {
                            parallelAggregateTimeoutTask.run();
                        }
                    }
                } else {
                    break;
                }
            }
            if (atomicBoolean.get() || z) {
                if (atomicBoolean.get()) {
                    MulticastProcessor.LOG.debug("Cancelling tasks due timeout after {} millis.", Long.valueOf(MulticastProcessor.this.timeout));
                }
                if (z) {
                    MulticastProcessor.LOG.debug("Cancelling tasks due stopOnException.");
                }
                this.running.set(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/camel-core-2.23.2.fuse-7_10_1-00008-redhat-00001.jar:org/apache/camel/processor/MulticastProcessor$DefaultProcessorExchangePair.class */
    public static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
        private final int index;
        private final Processor processor;
        private final Processor prepared;
        private final Exchange exchange;

        private DefaultProcessorExchangePair(int i, Processor processor, Processor processor2, Exchange exchange) {
            this.index = i;
            this.processor = processor;
            this.prepared = processor2;
            this.exchange = exchange;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public int getIndex() {
            return this.index;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Exchange getExchange() {
            return this.exchange;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Producer getProducer() {
            if (this.processor instanceof Producer) {
                return (Producer) this.processor;
            }
            return null;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Processor getProcessor() {
            return this.prepared;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public void begin() {
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public void done() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/camel-core-2.23.2.fuse-7_10_1-00008-redhat-00001.jar:org/apache/camel/processor/MulticastProcessor$ParallelAggregateTask.class */
    public final class ParallelAggregateTask implements Runnable {
        private final AtomicExchange result;
        private final Exchange subExchange;
        private final AtomicInteger aggregated;

        private ParallelAggregateTask(AtomicExchange atomicExchange, Exchange exchange, AtomicInteger atomicInteger) {
            this.result = atomicExchange;
            this.subExchange = exchange;
            this.aggregated = atomicInteger;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (MulticastProcessor.this.parallelAggregate) {
                    MulticastProcessor.this.doAggregateInternal(MulticastProcessor.this.getAggregationStrategy(this.subExchange), this.result, this.subExchange);
                } else {
                    MulticastProcessor.this.doAggregate(MulticastProcessor.this.getAggregationStrategy(this.subExchange), this.result, this.subExchange);
                }
            } catch (Throwable th) {
                if (MulticastProcessor.this.isStopOnAggregateException()) {
                    throw th;
                }
                CamelExchangeException camelExchangeException = new CamelExchangeException("Parallel processing failed for number " + this.aggregated.get(), this.subExchange, th);
                this.subExchange.setException(camelExchangeException);
                MulticastProcessor.LOG.debug(camelExchangeException.getMessage(), (Throwable) camelExchangeException);
            } finally {
                this.aggregated.incrementAndGet();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/camel-core-2.23.2.fuse-7_10_1-00008-redhat-00001.jar:org/apache/camel/processor/MulticastProcessor$ParallelAggregateTimeoutTask.class */
    public final class ParallelAggregateTimeoutTask implements Runnable {
        private final Exchange original;
        private final AtomicExchange result;
        private final CompletionService<Exchange> completion;
        private final AtomicInteger aggregated;
        private final AtomicInteger total;
        private final AtomicBoolean timedOut;

        private ParallelAggregateTimeoutTask(Exchange exchange, AtomicExchange atomicExchange, CompletionService<Exchange> completionService, AtomicInteger atomicInteger, AtomicInteger atomicInteger2, AtomicBoolean atomicBoolean) {
            this.original = exchange;
            this.result = atomicExchange;
            this.completion = completionService;
            this.aggregated = atomicInteger;
            this.total = atomicInteger2;
            this.timedOut = atomicBoolean;
        }

        @Override // java.lang.Runnable
        public void run() {
            AggregationStrategy aggregationStrategy = MulticastProcessor.this.getAggregationStrategy(null);
            if (aggregationStrategy instanceof DelegateAggregationStrategy) {
                aggregationStrategy = ((DelegateAggregationStrategy) aggregationStrategy).getDelegate();
            }
            if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
                Exchange exchange = this.result.get();
                if (exchange == null) {
                    exchange = this.original;
                }
                ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange, this.aggregated.intValue(), this.total.intValue(), MulticastProcessor.this.timeout);
            } else {
                MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", Long.valueOf(MulticastProcessor.this.timeout), Integer.valueOf(this.aggregated.intValue()));
            }
            MulticastProcessor.LOG.debug("Timeout occurred after {} millis for number {} task.", Long.valueOf(MulticastProcessor.this.timeout), Integer.valueOf(this.aggregated.intValue()));
            this.timedOut.set(true);
            if (this.completion instanceof SubmitOrderedCompletionService) {
                ((SubmitOrderedCompletionService) this.completion).timeoutTask();
            }
            this.aggregated.incrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/camel-core-2.23.2.fuse-7_10_1-00008-redhat-00001.jar:org/apache/camel/processor/MulticastProcessor$PreparedErrorHandler.class */
    public static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
        PreparedErrorHandler(RouteContext routeContext, Processor processor) {
            super(routeContext, processor);
        }
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection) {
        this(camelContext, collection, null);
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection, AggregationStrategy aggregationStrategy) {
        this(camelContext, collection, aggregationStrategy, false, null, false, false, false, 0L, null, false, false);
    }

    @Deprecated
    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ExecutorService executorService, boolean z2, boolean z3, boolean z4, long j, Processor processor, boolean z5) {
        this(camelContext, collection, aggregationStrategy, z, executorService, z2, z3, z4, j, processor, z5, false);
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ExecutorService executorService, boolean z2, boolean z3, boolean z4, long j, Processor processor, boolean z5, boolean z6) {
        this(camelContext, collection, aggregationStrategy, z, executorService, z2, z3, z4, j, processor, z5, false, false);
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ExecutorService executorService, boolean z2, boolean z3, boolean z4, long j, Processor processor, boolean z5, boolean z6, boolean z7) {
        this.errorHandlers = new ConcurrentHashMap();
        ObjectHelper.notNull(camelContext, "camelContext");
        this.camelContext = camelContext;
        this.processors = collection;
        this.aggregationStrategy = aggregationStrategy;
        this.executorService = executorService;
        this.shutdownExecutorService = z2;
        this.streaming = z3;
        this.stopOnException = z4;
        this.parallelProcessing = z || executorService != null;
        this.timeout = j;
        this.onPrepare = processor;
        this.shareUnitOfWork = z5;
        this.parallelAggregate = z6;
        this.stopOnAggregateException = z7;
    }

    public String toString() {
        return "Multicast[" + getProcessors() + "]";
    }

    @Override // org.apache.camel.spi.HasId
    public String getId() {
        return this.id;
    }

    @Override // org.apache.camel.spi.IdAware
    public void setId(String str) {
        this.id = str;
    }

    @Override // org.apache.camel.Traceable
    public String getTraceLabel() {
        return "multicast";
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        AsyncProcessorHelper.process(this, exchange);
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        AtomicExchange atomicExchange = new AtomicExchange();
        try {
            boolean z = true;
            Iterable<ProcessorExchangePair> createProcessorExchangePairs = createProcessorExchangePairs(exchange);
            if (isParallelProcessing()) {
                ObjectHelper.notNull(this.executorService, AbstractClient.EXECUTOR_SERVICE_PROPERTY, this);
                doProcessParallel(exchange, atomicExchange, createProcessorExchangePairs, isStreaming(), asyncCallback);
            } else {
                z = doProcessSequential(exchange, atomicExchange, createProcessorExchangePairs, asyncCallback);
            }
            if (!z) {
                return false;
            }
            doDone(exchange, atomicExchange.get() != null ? atomicExchange.get() : null, createProcessorExchangePairs, asyncCallback, true, true);
            return true;
        } catch (Throwable th) {
            exchange.setException(th);
            doDone(exchange, null, null, asyncCallback, true, false);
            return true;
        }
    }

    protected void doProcessParallel(Exchange exchange, AtomicExchange atomicExchange, Iterable<ProcessorExchangePair> iterable, boolean z, AsyncCallback asyncCallback) throws Exception {
        ObjectHelper.notNull(this.executorService, "ExecutorService", this);
        ObjectHelper.notNull(this.aggregateExecutorService, "AggregateExecutorService", this);
        CompletionService executorCompletionService = z ? new ExecutorCompletionService(this.executorService) : new SubmitOrderedCompletionService(this.executorService);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Iterator<ProcessorExchangePair> it = iterable.iterator();
        if (it.hasNext()) {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicException atomicException = new AtomicException();
            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(atomicExchange, exchange, atomicInteger, executorCompletionService, atomicBoolean, countDownLatch, atomicBoolean2, atomicException);
            final AtomicBoolean atomicBoolean3 = new AtomicBoolean();
            LOG.trace("Starting to submit parallel tasks");
            while (it.hasNext()) {
                try {
                    final ProcessorExchangePair next = it.next();
                    if (next != null) {
                        final Exchange exchange2 = next.getExchange();
                        updateNewExchange(exchange2, atomicInteger.intValue(), iterable, it);
                        executorCompletionService.submit(new Callable<Exchange>() { // from class: org.apache.camel.processor.MulticastProcessor.1
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            public Exchange call() throws Exception {
                                if (atomicBoolean3.compareAndSet(false, true)) {
                                    MulticastProcessor.this.aggregateExecutorService.submit(aggregateOnTheFlyTask);
                                }
                                if (!atomicBoolean.get()) {
                                    return exchange2;
                                }
                                try {
                                    MulticastProcessor.this.doProcessParallel(next);
                                } catch (Throwable th) {
                                    exchange2.setException(th);
                                }
                                Integer exchangeIndex = MulticastProcessor.this.getExchangeIndex(exchange2);
                                boolean continueProcessing = PipelineHelper.continueProcessing(exchange2, "Parallel processing failed for number " + exchangeIndex, MulticastProcessor.LOG);
                                if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                                    atomicBoolean.set(false);
                                    if (exchange2.getException() != null) {
                                        exchange2.setException(new CamelExchangeException("Parallel processing failed for number " + exchangeIndex, exchange2, exchange2.getException()));
                                    }
                                }
                                MulticastProcessor.LOG.trace("Parallel processing complete for exchange: {}", exchange2);
                                return exchange2;
                            }
                        });
                        atomicInteger.incrementAndGet();
                    }
                } catch (Throwable th) {
                    if (th instanceof Exception) {
                        atomicException.set((Exception) th);
                    } else {
                        atomicException.set(ObjectHelper.wrapRuntimeCamelException(th));
                    }
                    LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", exchange.getExchangeId());
                    LOG.trace("Aggregate on the fly task done for exchangeId: {}", exchange.getExchangeId());
                    countDownLatch.countDown();
                }
            }
            LOG.trace("Signaling that all {} tasks has been submitted.", Integer.valueOf(atomicInteger.get()));
            atomicBoolean2.set(true);
            LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", Integer.valueOf(atomicInteger.get()), exchange.getExchangeId());
            countDownLatch.await();
            if (atomicException.get() != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Parallel processing failed due {}", atomicException.get().getMessage());
                }
                throw atomicException.get();
            }
        }
        LOG.debug("Done parallel processing {} exchanges", atomicInteger);
    }

    protected boolean doProcessSequential(Exchange exchange, AtomicExchange atomicExchange, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Iterator<ProcessorExchangePair> it = iterable.iterator();
        while (it.hasNext()) {
            ProcessorExchangePair next = it.next();
            if (next != null) {
                Exchange exchange2 = next.getExchange();
                updateNewExchange(exchange2, atomicInteger.get(), iterable, it);
                if (!doProcessSequential(exchange, atomicExchange, iterable, it, next, asyncCallback, atomicInteger)) {
                    if (!LOG.isTraceEnabled()) {
                        return false;
                    }
                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", next.getExchange().getExchangeId());
                    return false;
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processing exchangeId: {} is continued being processed synchronously", next.getExchange().getExchangeId());
                }
                boolean continueProcessing = PipelineHelper.continueProcessing(exchange2, "Sequential processing failed for number " + atomicInteger.get(), LOG);
                if (this.stopOnException && !continueProcessing) {
                    if (exchange2.getException() != null) {
                        exchange2.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger.get(), exchange2, exchange2.getException()));
                    }
                    atomicExchange.set(exchange2);
                    return true;
                }
                LOG.trace("Sequential processing complete for number {} exchange: {}", atomicInteger, exchange2);
                if (this.parallelAggregate) {
                    doAggregateInternal(getAggregationStrategy(exchange2), atomicExchange, exchange2);
                } else {
                    doAggregate(getAggregationStrategy(exchange2), atomicExchange, exchange2);
                }
                atomicInteger.incrementAndGet();
            }
        }
        LOG.debug("Done sequential processing {} exchanges", atomicInteger);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean doProcessSequential(final Exchange exchange, final AtomicExchange atomicExchange, final Iterable<ProcessorExchangePair> iterable, final Iterator<ProcessorExchangePair> it, final ProcessorExchangePair processorExchangePair, final AsyncCallback asyncCallback, final AtomicInteger atomicInteger) {
        final Exchange exchange2 = processorExchangePair.getExchange();
        Processor processor = processorExchangePair.getProcessor();
        final Producer producer = processorExchangePair.getProducer();
        TracedRouteNodes tracedRouteNodes = exchange2.getUnitOfWork() != null ? exchange2.getUnitOfWork().getTracedRouteNodes() : null;
        if (tracedRouteNodes != null) {
            try {
                tracedRouteNodes.pushBlock();
            } catch (Throwable th) {
                if (tracedRouteNodes != null) {
                    tracedRouteNodes.popBlock();
                }
                throw th;
            }
        }
        StopWatch stopWatch = null;
        if (producer != null && EventHelper.notifyExchangeSending(exchange2.getContext(), exchange2, producer.getEndpoint())) {
            stopWatch = new StopWatch();
        }
        final StopWatch stopWatch2 = stopWatch;
        AsyncProcessor convert = AsyncProcessorConverterHelper.convert(processor);
        processorExchangePair.begin();
        boolean process = convert.process(exchange2, new AsyncCallback() { // from class: org.apache.camel.processor.MulticastProcessor.2
            @Override // org.apache.camel.AsyncCallback
            public void done(boolean z) {
                processorExchangePair.done();
                if (producer != null && stopWatch2 != null) {
                    EventHelper.notifyExchangeSent(exchange2.getContext(), exchange2, producer.getEndpoint(), stopWatch2.taken());
                }
                if (z) {
                    return;
                }
                Exchange exchange3 = exchange2;
                boolean continueProcessing = PipelineHelper.continueProcessing(exchange3, "Sequential processing failed for number " + atomicInteger.get(), MulticastProcessor.LOG);
                if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                    if (exchange3.getException() != null) {
                        exchange3.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange3, exchange3.getException()));
                    } else {
                        atomicExchange.set(exchange3);
                    }
                    MulticastProcessor.this.doDone(exchange, exchange3, iterable, asyncCallback, false, true);
                    return;
                }
                try {
                    if (MulticastProcessor.this.parallelAggregate) {
                        MulticastProcessor.this.doAggregateInternal(MulticastProcessor.this.getAggregationStrategy(exchange3), atomicExchange, exchange3);
                    } else {
                        MulticastProcessor.this.doAggregate(MulticastProcessor.this.getAggregationStrategy(exchange3), atomicExchange, exchange3);
                    }
                    atomicInteger.incrementAndGet();
                    while (it.hasNext()) {
                        ProcessorExchangePair processorExchangePair2 = (ProcessorExchangePair) it.next();
                        Exchange exchange4 = processorExchangePair2.getExchange();
                        MulticastProcessor.this.updateNewExchange(exchange4, atomicInteger.get(), iterable, it);
                        if (!MulticastProcessor.this.doProcessSequential(exchange, atomicExchange, iterable, it, processorExchangePair2, asyncCallback, atomicInteger)) {
                            MulticastProcessor.LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
                            return;
                        }
                        boolean continueProcessing2 = PipelineHelper.continueProcessing(exchange4, "Sequential processing failed for number " + atomicInteger.get(), MulticastProcessor.LOG);
                        if (MulticastProcessor.this.stopOnException && !continueProcessing2) {
                            if (exchange4.getException() != null) {
                                exchange4.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange4, exchange4.getException()));
                            } else {
                                atomicExchange.set(exchange4);
                            }
                            MulticastProcessor.this.doDone(exchange, exchange4, iterable, asyncCallback, false, true);
                            return;
                        }
                        try {
                            if (MulticastProcessor.this.parallelAggregate) {
                                MulticastProcessor.this.doAggregateInternal(MulticastProcessor.this.getAggregationStrategy(exchange4), atomicExchange, exchange4);
                            } else {
                                MulticastProcessor.this.doAggregate(MulticastProcessor.this.getAggregationStrategy(exchange4), atomicExchange, exchange4);
                            }
                            atomicInteger.incrementAndGet();
                        } catch (Throwable th2) {
                            exchange4.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange4, th2));
                            MulticastProcessor.this.doDone(exchange, exchange4, iterable, asyncCallback, false, true);
                            return;
                        }
                    }
                    MulticastProcessor.this.doDone(exchange, atomicExchange.get() != null ? atomicExchange.get() : null, iterable, asyncCallback, false, true);
                } catch (Throwable th3) {
                    exchange.setException(th3);
                    MulticastProcessor.this.doDone(exchange, null, iterable, asyncCallback, false, true);
                }
            }
        });
        if (tracedRouteNodes != null) {
            tracedRouteNodes.popBlock();
        }
        return process;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doProcessParallel(ProcessorExchangePair processorExchangePair) throws Exception {
        Exchange exchange = processorExchangePair.getExchange();
        Processor processor = processorExchangePair.getProcessor();
        Producer producer = processorExchangePair.getProducer();
        TracedRouteNodes tracedRouteNodes = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
        StopWatch stopWatch = null;
        if (tracedRouteNodes != null) {
            try {
                tracedRouteNodes.pushBlock();
            } catch (Throwable th) {
                processorExchangePair.done();
                if (tracedRouteNodes != null) {
                    tracedRouteNodes.popBlock();
                }
                if (producer != null && stopWatch != null) {
                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, producer.getEndpoint(), stopWatch.taken());
                }
                throw th;
            }
        }
        if (producer != null && EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint())) {
            stopWatch = new StopWatch();
        }
        AsyncProcessor convert = AsyncProcessorConverterHelper.convert(processor);
        processorExchangePair.begin();
        AsyncProcessorHelper.process(convert, exchange);
        processorExchangePair.done();
        if (tracedRouteNodes != null) {
            tracedRouteNodes.popBlock();
        }
        if (producer == null || stopWatch == null) {
            return;
        }
        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, producer.getEndpoint(), stopWatch.taken());
    }

    protected void doDone(Exchange exchange, Exchange exchange2, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback, boolean z, boolean z2) {
        if (iterable instanceof Closeable) {
            IOHelper.close((Closeable) iterable, "pairs", LOG);
        }
        AggregationStrategy aggregationStrategy = getAggregationStrategy(exchange2);
        if (aggregationStrategy instanceof DelegateAggregationStrategy) {
            aggregationStrategy = ((DelegateAggregationStrategy) aggregationStrategy).getDelegate();
        }
        if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
            ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange2);
        }
        removeAggregationStrategyFromExchange(exchange);
        boolean z3 = false;
        boolean z4 = false;
        boolean z5 = z2 || (exchange2 != null && (exchange2.getException() != null || ExchangeHelper.isRedeliveryExhausted(exchange2)));
        if (exchange.getException() != null || (exchange2 != null && exchange2.getException() != null)) {
            z3 = isStopOnException();
            z4 = true;
        }
        if (exchange2 != null) {
            if (z3) {
                exchange.setException(exchange2.getException());
            } else {
                ExchangeHelper.copyResults(exchange, exchange2);
            }
        }
        if (z4) {
            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.valueOf(z5));
        }
        asyncCallback.done(z);
    }

    protected synchronized void doAggregate(AggregationStrategy aggregationStrategy, AtomicExchange atomicExchange, Exchange exchange) {
        doAggregateInternal(aggregationStrategy, atomicExchange, exchange);
    }

    protected void doAggregateInternal(AggregationStrategy aggregationStrategy, AtomicExchange atomicExchange, Exchange exchange) {
        if (aggregationStrategy != null) {
            Exchange exchange2 = atomicExchange.get();
            ExchangeHelper.prepareAggregation(exchange2, exchange);
            atomicExchange.set(aggregationStrategy.aggregate(exchange2, exchange));
        }
    }

    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> iterable, Iterator<ProcessorExchangePair> it) {
        exchange.setProperty(Exchange.MULTICAST_INDEX, Integer.valueOf(i));
        if (it.hasNext()) {
            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
        } else {
            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
        }
    }

    protected Integer getExchangeIndex(Exchange exchange) {
        return (Integer) exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
    }

    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
        StreamCache copy;
        ArrayList arrayList = new ArrayList(this.processors.size());
        StreamCache streamCache = null;
        if (isParallelProcessing() && (exchange.getIn().getBody() instanceof StreamCache)) {
            streamCache = (StreamCache) exchange.getIn().getBody();
        }
        int i = 0;
        for (Processor processor : this.processors) {
            Exchange createCorrelatedCopy = ExchangeHelper.createCorrelatedCopy(exchange, false);
            if (streamCache != null && i > 0 && (copy = streamCache.copy(createCorrelatedCopy)) != null) {
                createCorrelatedCopy.getIn().setBody(copy);
            }
            if (createCorrelatedCopy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
                createCorrelatedCopy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
            }
            if (isShareUnitOfWork()) {
                prepareSharedUnitOfWork(createCorrelatedCopy, exchange);
            }
            int i2 = i;
            i++;
            arrayList.add(createProcessorExchangePair(i2, processor, createCorrelatedCopy, exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null));
        }
        if (exchange.getException() != null) {
            throw exchange.getException();
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ProcessorExchangePair createProcessorExchangePair(int i, Processor processor, Exchange exchange, RouteContext routeContext) {
        setToEndpoint(exchange, processor);
        Processor createErrorHandler = createErrorHandler(routeContext, exchange, processor);
        if (this.onPrepare != null) {
            try {
                this.onPrepare.process(exchange);
            } catch (Exception e) {
                exchange.setException(e);
            }
        }
        return new DefaultProcessorExchangePair(i, processor, createErrorHandler, exchange);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
        Processor createUnitOfWorkProcessor;
        if (((Boolean) exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, Boolean.TYPE)).booleanValue() || routeContext == null) {
            createUnitOfWorkProcessor = createUnitOfWorkProcessor(routeContext, processor, exchange);
        } else {
            PreparedErrorHandler preparedErrorHandler = new PreparedErrorHandler(routeContext, processor);
            Processor processor2 = this.errorHandlers.get(preparedErrorHandler);
            if (processor2 != null) {
                LOG.trace("Using existing error handler for: {}", processor);
                return processor2;
            }
            LOG.trace("Creating error handler for: {}", processor);
            try {
                createUnitOfWorkProcessor = createUnitOfWorkProcessor(routeContext, routeContext.getRoute().getErrorHandlerBuilder().createErrorHandler(routeContext, processor), exchange);
                boolean z = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;
                ServiceHelper.startServices(createUnitOfWorkProcessor);
                if (!z) {
                    this.errorHandlers.putIfAbsent(preparedErrorHandler, createUnitOfWorkProcessor);
                }
            } catch (Exception e) {
                throw ObjectHelper.wrapRuntimeCamelException(e);
            }
        }
        return createUnitOfWorkProcessor;
    }

    protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
        CamelInternalProcessor camelInternalProcessor = new CamelInternalProcessor(processor);
        UnitOfWork unitOfWork = (UnitOfWork) exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
        if (unitOfWork != null) {
            camelInternalProcessor.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, unitOfWork));
        } else {
            camelInternalProcessor.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
        }
        return camelInternalProcessor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareSharedUnitOfWork(Exchange exchange, Exchange exchange2) {
        exchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, exchange2.getUnitOfWork());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        if (isParallelProcessing() && this.executorService == null) {
            throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
        }
        if (this.timeout > 0 && !isParallelProcessing()) {
            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
        }
        if (isParallelProcessing() && this.aggregateExecutorService == null) {
            this.aggregateExecutorService = createAggregateExecutorService(getClass().getSimpleName() + "-AggregateTask");
        }
        if (this.aggregationStrategy instanceof CamelContextAware) {
            ((CamelContextAware) this.aggregationStrategy).setCamelContext(this.camelContext);
        }
        ServiceHelper.startServices(this.aggregationStrategy, this.processors);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized ExecutorService createAggregateExecutorService(String str) {
        return this.camelContext.getExecutorServiceManager().newCachedThreadPool(this, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        ServiceHelper.stopServices(this.processors, this.errorHandlers, this.aggregationStrategy);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownServices(this.processors, this.errorHandlers, this.aggregationStrategy);
        this.errorHandlers.clear();
        if (this.shutdownExecutorService && this.executorService != null) {
            getCamelContext().getExecutorServiceManager().shutdownNow(this.executorService);
        }
        if (this.aggregateExecutorService != null) {
            getCamelContext().getExecutorServiceManager().shutdownNow(this.aggregateExecutorService);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void setToEndpoint(Exchange exchange, Processor processor) {
        if (processor instanceof Producer) {
            exchange.setProperty(Exchange.TO_ENDPOINT, ((Producer) processor).getEndpoint().getEndpointUri());
        }
    }

    protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
        Map cast;
        AggregationStrategy aggregationStrategy = null;
        if (exchange != null && (cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class))) != null) {
            aggregationStrategy = (AggregationStrategy) cast.get(this);
        }
        if (aggregationStrategy == null) {
            aggregationStrategy = getAggregationStrategy();
        }
        return aggregationStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
        Map cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class));
        ConcurrentHashMap concurrentHashMap = cast == null ? new ConcurrentHashMap() : new ConcurrentHashMap(cast);
        concurrentHashMap.put(this, aggregationStrategy);
        exchange.setProperty(Exchange.AGGREGATION_STRATEGY, concurrentHashMap);
    }

    protected void removeAggregationStrategyFromExchange(Exchange exchange) {
        Map cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class));
        if (cast == null) {
            return;
        }
        cast.remove(this);
    }

    public boolean isStreaming() {
        return this.streaming;
    }

    public boolean isStopOnException() {
        return this.stopOnException;
    }

    public Collection<Processor> getProcessors() {
        return this.processors;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public AggregationStrategy getAggregationStrategy() {
        return this.aggregationStrategy;
    }

    public boolean isParallelProcessing() {
        return this.parallelProcessing;
    }

    public boolean isParallelAggregate() {
        return this.parallelAggregate;
    }

    public boolean isStopOnAggregateException() {
        return this.stopOnAggregateException;
    }

    public boolean isShareUnitOfWork() {
        return this.shareUnitOfWork;
    }

    @Override // org.apache.camel.Navigate
    public List<Processor> next() {
        if (hasNext()) {
            return new ArrayList(this.processors);
        }
        return null;
    }

    @Override // org.apache.camel.Navigate
    public boolean hasNext() {
        return (this.processors == null || this.processors.isEmpty()) ? false : true;
    }
}
