package org.apache.camel.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.management.DefaultManagementNamingStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TracedRouteNodes;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.concurrent.AtomicExchange;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.2.0-fuse-03-00.jar:org/apache/camel/processor/MulticastProcessor.class */
public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
    private static final int DEFAULT_THREADPOOL_SIZE = 10;
    private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
    private Collection<Processor> processors;
    private final AggregationStrategy aggregationStrategy;
    private final boolean isParallelProcessing;
    private final boolean streaming;
    private final boolean stopOnException;
    private ExecutorService executorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-2.2.0-fuse-03-00.jar:org/apache/camel/processor/MulticastProcessor$ProcessorExchangePair.class */
    public static final class ProcessorExchangePair {
        private final Processor processor;
        private final Processor prepared;
        private final Exchange exchange;

        private ProcessorExchangePair(Processor processor, Processor processor2, Exchange exchange) {
            this.processor = processor;
            this.prepared = processor2;
            this.exchange = exchange;
        }

        public Processor getProcessor() {
            return this.processor;
        }

        public Processor getPrepared() {
            return this.prepared;
        }

        public Exchange getExchange() {
            return this.exchange;
        }
    }

    public MulticastProcessor(Collection<Processor> collection) {
        this(collection, null);
    }

    public MulticastProcessor(Collection<Processor> collection, AggregationStrategy aggregationStrategy) {
        this(collection, aggregationStrategy, false, null, false, false);
    }

    public MulticastProcessor(Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ExecutorService executorService, boolean z2, boolean z3) {
        ObjectHelper.notNull(collection, DefaultManagementNamingStrategy.TYPE_PROCESSOR);
        this.processors = collection;
        this.aggregationStrategy = aggregationStrategy;
        this.isParallelProcessing = z;
        this.executorService = executorService;
        this.streaming = z2;
        this.stopOnException = z3;
        if (isParallelProcessing() && this.executorService == null) {
            this.executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "Multicast", true);
        }
    }

    public String toString() {
        return "Multicast[" + getProcessors() + "]";
    }

    @Override // org.apache.camel.processor.Traceable
    public String getTraceLabel() {
        return "multicast";
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        AtomicExchange atomicExchange = new AtomicExchange();
        Iterable<ProcessorExchangePair> createProcessorExchangePairs = createProcessorExchangePairs(exchange);
        try {
            if (isParallelProcessing()) {
                doProcessParallel(atomicExchange, createProcessorExchangePairs, isStreaming());
            } else {
                doProcessSequential(atomicExchange, createProcessorExchangePairs);
            }
            if (atomicExchange.get() != null) {
                ExchangeHelper.copyResults(exchange, atomicExchange.get());
            }
        } catch (Exception e) {
            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
            exchange.setException(e);
        }
    }

    protected void doProcessParallel(AtomicExchange atomicExchange, Iterable<ProcessorExchangePair> iterable, boolean z) throws InterruptedException, ExecutionException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        CompletionService executorCompletionService = z ? new ExecutorCompletionService(this.executorService) : new SubmitOrderedCompletionService(this.executorService);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (ProcessorExchangePair processorExchangePair : iterable) {
            final Processor processor = processorExchangePair.getProcessor();
            final Processor prepared = processorExchangePair.getPrepared();
            final Exchange exchange = processorExchangePair.getExchange();
            updateNewExchange(exchange, atomicInteger.intValue(), iterable);
            executorCompletionService.submit(new Callable<Exchange>() { // from class: org.apache.camel.processor.MulticastProcessor.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Exchange call() throws Exception {
                    if (!atomicBoolean.get()) {
                        return exchange;
                    }
                    MulticastProcessor.this.doProcess(processor, prepared, exchange);
                    if (MulticastProcessor.this.stopOnException && exchange.getException() != null) {
                        atomicBoolean.set(false);
                        throw new CamelExchangeException("Parallel processing failed for number " + atomicInteger.intValue(), exchange, exchange.getException());
                    }
                    if (MulticastProcessor.LOG.isTraceEnabled()) {
                        MulticastProcessor.LOG.trace("Parallel processing complete for exchange: " + exchange);
                    }
                    return exchange;
                }
            });
            atomicInteger.incrementAndGet();
        }
        for (int i = 0; i < atomicInteger.intValue(); i++) {
            Exchange exchange2 = (Exchange) executorCompletionService.take().get();
            if (this.aggregationStrategy != null) {
                doAggregate(atomicExchange, exchange2);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Done parallel processing " + atomicInteger + " exchanges");
        }
    }

    protected void doProcessSequential(AtomicExchange atomicExchange, Iterable<ProcessorExchangePair> iterable) throws Exception {
        int i = 0;
        for (ProcessorExchangePair processorExchangePair : iterable) {
            Processor processor = processorExchangePair.getProcessor();
            Processor prepared = processorExchangePair.getPrepared();
            Exchange exchange = processorExchangePair.getExchange();
            updateNewExchange(exchange, i, iterable);
            doProcess(processor, prepared, exchange);
            if (this.stopOnException && exchange.getException() != null) {
                throw new CamelExchangeException("Sequential processing failed for number " + i, exchange, exchange.getException());
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Sequential processing complete for number " + i + " exchange: " + exchange);
            }
            if (this.aggregationStrategy != null) {
                doAggregate(atomicExchange, exchange);
            }
            i++;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Done sequential processing " + i + " exchanges");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doProcess(Processor processor, Processor processor2, Exchange exchange) {
        TracedRouteNodes tracedRouteNodes = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
        if (tracedRouteNodes != null) {
            try {
                try {
                    tracedRouteNodes.pushBlock();
                } catch (Exception e) {
                    exchange.setException(e);
                    if (tracedRouteNodes != null) {
                        tracedRouteNodes.popBlock();
                        return;
                    }
                    return;
                }
            } catch (Throwable th) {
                if (tracedRouteNodes != null) {
                    tracedRouteNodes.popBlock();
                }
                throw th;
            }
        }
        processor2.process(exchange);
        if (tracedRouteNodes != null) {
            tracedRouteNodes.popBlock();
        }
    }

    protected synchronized void doAggregate(AtomicExchange atomicExchange, Exchange exchange) {
        if (this.aggregationStrategy != null) {
            Exchange exchange2 = atomicExchange.get();
            ExchangeHelper.prepareAggregation(exchange2, exchange);
            atomicExchange.set(this.aggregationStrategy.aggregate(exchange2, exchange));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> iterable) {
        exchange.setProperty(Exchange.MULTICAST_INDEX, Integer.valueOf(i));
    }

    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
        ArrayList arrayList = new ArrayList(this.processors.size());
        Iterator<Processor> it = this.processors.iterator();
        while (it.hasNext()) {
            arrayList.add(createProcessorExchangePair(it.next(), exchange.copy()));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static ProcessorExchangePair createProcessorExchangePair(Processor processor, Exchange exchange) {
        Processor processor2 = processor;
        setToEndpoint(exchange, processor2);
        if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) {
            RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
            try {
                processor2 = routeContext.getRoute().getErrorHandlerBuilder().createErrorHandler(routeContext, processor2);
            } catch (Exception e) {
                throw ObjectHelper.wrapRuntimeCamelException(e);
            }
        }
        return new ProcessorExchangePair(processor, processor2, exchange);
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStop() throws Exception {
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService.awaitTermination(0L, TimeUnit.SECONDS);
            this.executorService = null;
        }
        ServiceHelper.stopServices(this.processors);
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStart() throws Exception {
        ServiceHelper.startServices(this.processors);
    }

    private static void setToEndpoint(Exchange exchange, Processor processor) {
        if (processor instanceof Producer) {
            exchange.setProperty(Exchange.TO_ENDPOINT, ((Producer) processor).getEndpoint().getEndpointUri());
        }
    }

    public boolean isStreaming() {
        return this.streaming;
    }

    public boolean isStopOnException() {
        return this.stopOnException;
    }

    public Collection<Processor> getProcessors() {
        return this.processors;
    }

    public AggregationStrategy getAggregationStrategy() {
        return this.aggregationStrategy;
    }

    public boolean isParallelProcessing() {
        return this.isParallelProcessing;
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override // org.apache.camel.Navigate
    public List<Processor> next() {
        if (hasNext()) {
            return new ArrayList(this.processors);
        }
        return null;
    }

    @Override // org.apache.camel.Navigate
    public boolean hasNext() {
        return (this.processors == null || this.processors.isEmpty()) ? false : true;
    }
}
