package org.apache.camel.processor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.2.0-fuse-02-00.jar:org/apache/camel/processor/SendAsyncProcessor.class */
public class SendAsyncProcessor extends SendProcessor implements Runnable, Navigate<Processor> {
    private static final int DEFAULT_THREADPOOL_SIZE = 10;
    protected final Processor target;
    protected final BlockingQueue<Exchange> completedTasks;
    protected ExecutorService executorService;
    protected ExecutorService producerExecutorService;
    protected int poolSize;
    protected ExceptionHandler exceptionHandler;

    public SendAsyncProcessor(Endpoint endpoint, Processor processor) {
        super(endpoint);
        this.completedTasks = new LinkedBlockingQueue();
        this.poolSize = 10;
        this.target = processor;
    }

    public SendAsyncProcessor(Endpoint endpoint, ExchangePattern exchangePattern, Processor processor) {
        super(endpoint, exchangePattern);
        this.completedTasks = new LinkedBlockingQueue();
        this.poolSize = 10;
        this.target = processor;
    }

    @Override // org.apache.camel.processor.SendProcessor
    protected Exchange configureExchange(Exchange exchange, ExchangePattern exchangePattern) {
        Exchange createCorrelatedCopy = ExchangeHelper.createCorrelatedCopy(exchange, true);
        if (exchangePattern != null) {
            createCorrelatedCopy.setPattern(exchangePattern);
        } else {
            createCorrelatedCopy.setPattern(ExchangePattern.InOut);
        }
        createCorrelatedCopy.setProperty(Exchange.TO_ENDPOINT, this.destination.getEndpointUri());
        return createCorrelatedCopy;
    }

    @Override // org.apache.camel.processor.SendProcessor
    public Exchange doProcess(Exchange exchange) throws Exception {
        return (Exchange) getProducerCache(exchange).doInProducer(this.destination, exchange, this.pattern, new ProducerCallback<Exchange>() { // from class: org.apache.camel.processor.SendAsyncProcessor.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.camel.ProducerCallback
            public Exchange doInProducer(Producer producer, Exchange exchange2, ExchangePattern exchangePattern) throws Exception {
                Exchange configureExchange = SendAsyncProcessor.this.configureExchange(exchange2, exchangePattern);
                AsyncCallback asyncCallback = new AsyncCallback() { // from class: org.apache.camel.processor.SendAsyncProcessor.1.1
                    @Override // org.apache.camel.AsyncCallback
                    public void onTaskCompleted(Exchange exchange3) {
                        SendAsyncProcessor.this.completedTasks.add(exchange3);
                    }
                };
                if (producer instanceof AsyncProcessor) {
                    SendAsyncProcessor.this.doAsyncProcess((AsyncProcessor) producer, configureExchange, asyncCallback);
                } else {
                    SendAsyncProcessor.this.doSimulateAsyncProcess(producer, configureExchange, asyncCallback);
                }
                return configureExchange;
            }
        });
    }

    protected void doAsyncProcess(AsyncProcessor asyncProcessor, Exchange exchange, AsyncCallback asyncCallback) throws Exception {
        asyncProcessor.process(exchange, asyncCallback);
    }

    protected void doSimulateAsyncProcess(final Processor processor, final Exchange exchange, final AsyncCallback asyncCallback) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Producer " + processor + " is not an instanceof AsyncProcessor. Will fallback to simulate async behavior by transferring task to a producer thread pool for further processing.");
        }
        getProducerExecutorService().submit(new Callable<Exchange>() { // from class: org.apache.camel.processor.SendAsyncProcessor.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Exchange call() throws Exception {
                try {
                    ((AsyncProcessor) exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class, processor)).process(exchange, asyncCallback);
                } catch (Exception e) {
                    if (SendProcessor.LOG.isDebugEnabled()) {
                        SendProcessor.LOG.debug("Caught exception while processing: " + exchange, e);
                    }
                    exchange.setException(e);
                }
                return exchange;
            }
        });
    }

    @Override // org.apache.camel.processor.SendProcessor
    public String toString() {
        return "sendAsyncTo(" + this.destination + (this.pattern != null ? " " + this.pattern : "") + " -> " + this.target + ")";
    }

    public ExecutorService getExecutorService() {
        if (this.executorService == null) {
            this.executorService = createExecutorService("SendAsyncProcessor-Consumer");
        }
        return this.executorService;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public ExecutorService getProducerExecutorService() {
        if (this.producerExecutorService == null) {
            this.producerExecutorService = ExecutorServiceHelper.newCachedThreadPool("SendAsyncProcessor-Producer", true);
        }
        return this.producerExecutorService;
    }

    public void setProducerExecutorService(ExecutorService executorService) {
        this.producerExecutorService = executorService;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public ExceptionHandler getExceptionHandler() {
        if (this.exceptionHandler == null) {
            this.exceptionHandler = new LoggingExceptionHandler(getClass());
        }
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    @Override // org.apache.camel.Navigate
    public boolean hasNext() {
        return this.target != null;
    }

    @Override // org.apache.camel.Navigate
    public List<Processor> next() {
        if (!hasNext()) {
            return null;
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(this.target);
        return arrayList;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (isRunAllowed()) {
            try {
                Exchange poll = this.completedTasks.poll(1000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    try {
                        if (poll.hasOut()) {
                            poll.setIn(poll.getOut());
                            poll.setOut(null);
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Async reply received now routing the Exchange: " + poll);
                        }
                        this.target.process(poll);
                    } catch (Exception e) {
                        getExceptionHandler().handleException(e);
                    }
                }
            } catch (InterruptedException e2) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
                }
            }
        }
    }

    protected ExecutorService createExecutorService(String str) {
        return ExecutorServiceHelper.newScheduledThreadPool(this.poolSize, str, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.processor.SendProcessor, org.apache.camel.impl.ServiceSupport
    public void doStart() throws Exception {
        super.doStart();
        for (int i = 0; i < this.poolSize; i++) {
            getExecutorService().execute(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.processor.SendProcessor, org.apache.camel.impl.ServiceSupport
    public void doStop() throws Exception {
        super.doStop();
        if (this.producerExecutorService != null) {
            this.producerExecutorService.shutdownNow();
            this.producerExecutorService = null;
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService = null;
        }
        this.completedTasks.clear();
    }
}
