package org.apache.camel.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.management.CamelNamingStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.concurrent.AtomicExchange;
import org.apache.camel.util.concurrent.CountingLatch;

/* loaded from: input_file:WEB-INF/lib/camel-core-1.6.2.2-fuse.jar:org/apache/camel/processor/MulticastProcessor.class */
public class MulticastProcessor extends ServiceSupport implements Processor {
    private static final int DEFAULT_THREADPOOL_SIZE = 10;
    private Collection<Processor> processors;
    private AggregationStrategy aggregationStrategy;
    private boolean isParallelProcessing;
    private ThreadPoolExecutor executor;
    private final boolean streaming;
    private final AtomicBoolean shutdown;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-1.6.2.2-fuse.jar:org/apache/camel/processor/MulticastProcessor$ProcessCall.class */
    public class ProcessCall implements Runnable {
        private final Exchange exchange;
        private final AsyncCallback callback;
        private final Processor processor;

        public ProcessCall(Exchange exchange, Processor processor, AsyncCallback asyncCallback) {
            this.exchange = exchange;
            this.callback = asyncCallback;
            this.processor = processor;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (MulticastProcessor.this.shutdown.get()) {
                this.exchange.setException(new RejectedExecutionException());
                this.callback.done(false);
            } else {
                try {
                    this.processor.process(this.exchange);
                } catch (Exception e) {
                    this.exchange.setException(e);
                }
                this.callback.done(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-1.6.2.2-fuse.jar:org/apache/camel/processor/MulticastProcessor$ProcessorExchangePair.class */
    public static class ProcessorExchangePair {
        private final Processor processor;
        private final Exchange exchange;

        public ProcessorExchangePair(Processor processor, Exchange exchange) {
            this.processor = processor;
            this.exchange = exchange;
        }

        public Processor getProcessor() {
            return this.processor;
        }

        public Exchange getExchange() {
            return this.exchange;
        }
    }

    public MulticastProcessor(Collection<Processor> collection) {
        this(collection, null);
    }

    public MulticastProcessor(Collection<Processor> collection, AggregationStrategy aggregationStrategy) {
        this(collection, aggregationStrategy, false, null);
    }

    public MulticastProcessor(Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ThreadPoolExecutor threadPoolExecutor) {
        this(collection, aggregationStrategy, z, threadPoolExecutor, false);
    }

    public MulticastProcessor(Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ThreadPoolExecutor threadPoolExecutor, boolean z2) {
        this.shutdown = new AtomicBoolean(true);
        ObjectHelper.notNull(collection, CamelNamingStrategy.TYPE_PROCESSOR);
        this.processors = collection;
        this.aggregationStrategy = aggregationStrategy;
        this.isParallelProcessing = z;
        if (this.isParallelProcessing) {
            if (threadPoolExecutor != null) {
                this.executor = threadPoolExecutor;
            } else {
                this.executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
            }
        }
        this.streaming = z2;
    }

    public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> collection) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Endpoint> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().createProducer());
        }
        return arrayList;
    }

    public String toString() {
        return "Multicast" + getProcessors();
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        final AtomicExchange atomicExchange = new AtomicExchange();
        Iterable<ProcessorExchangePair> createProcessorExchangePairs = createProcessorExchangePairs(exchange);
        if (this.isParallelProcessing) {
            LinkedList linkedList = new LinkedList();
            final CountingLatch countingLatch = new CountingLatch();
            int i = 0;
            for (ProcessorExchangePair processorExchangePair : createProcessorExchangePairs) {
                Processor processor = processorExchangePair.getProcessor();
                final Exchange exchange2 = processorExchangePair.getExchange();
                updateNewExchange(exchange2, i, createProcessorExchangePairs);
                linkedList.add(exchange2);
                countingLatch.increment();
                this.executor.execute(new ProcessCall(exchange2, processor, new AsyncCallback() { // from class: org.apache.camel.processor.MulticastProcessor.1
                    @Override // org.apache.camel.AsyncCallback
                    public void done(boolean z) {
                        if (MulticastProcessor.this.streaming && MulticastProcessor.this.aggregationStrategy != null) {
                            MulticastProcessor.this.doAggregate(atomicExchange, exchange2);
                        }
                        countingLatch.decrement();
                    }
                }));
                i++;
            }
            countingLatch.await();
            if (!this.streaming && this.aggregationStrategy != null) {
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    doAggregate(atomicExchange, (Exchange) it.next());
                }
            }
        } else {
            int i2 = 0;
            for (ProcessorExchangePair processorExchangePair2 : createProcessorExchangePairs) {
                Processor processor2 = processorExchangePair2.getProcessor();
                Exchange exchange3 = processorExchangePair2.getExchange();
                updateNewExchange(exchange3, i2, createProcessorExchangePairs);
                try {
                    processor2.process(exchange3);
                } catch (Exception e) {
                    exchange3.setException(e);
                }
                doAggregate(atomicExchange, exchange3);
                i2++;
            }
        }
        if (atomicExchange.get() != null) {
            ExchangeHelper.copyResults(exchange, atomicExchange.get());
        }
    }

    protected synchronized void doAggregate(AtomicExchange atomicExchange, Exchange exchange) {
        if (this.aggregationStrategy != null) {
            if (atomicExchange.get() == null) {
                atomicExchange.set(exchange);
            } else {
                atomicExchange.set(this.aggregationStrategy.aggregate(atomicExchange.get(), exchange));
            }
        }
    }

    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> iterable) {
    }

    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
        ArrayList arrayList = new ArrayList(this.processors.size());
        for (Processor processor : (Processor[]) this.processors.toArray(new Processor[this.processors.size()])) {
            arrayList.add(new ProcessorExchangePair(processor, exchange.copy()));
        }
        return arrayList;
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStop() throws Exception {
        this.shutdown.set(true);
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor.awaitTermination(0L, TimeUnit.SECONDS);
        }
        ServiceHelper.stopServices(this.processors);
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStart() throws Exception {
        this.shutdown.set(false);
        if (this.executor != null) {
            this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { // from class: org.apache.camel.processor.MulticastProcessor.2
                @Override // java.util.concurrent.RejectedExecutionHandler
                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                    ProcessCall processCall = (ProcessCall) runnable;
                    processCall.exchange.setException(new RejectedExecutionException());
                    processCall.callback.done(false);
                }
            });
        }
        ServiceHelper.startServices(this.processors);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isStreaming() {
        return this.streaming;
    }

    public Collection<Processor> getProcessors() {
        return this.processors;
    }

    public AggregationStrategy getAggregationStrategy() {
        return this.aggregationStrategy;
    }
}
