package org.apache.camel.impl;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
import org.apache.camel.Service;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.15.1.redhat-621177.jar:org/apache/camel/impl/EventDrivenPollingConsumer.class */
public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor, IsSingleton {
    private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
    private final BlockingQueue<Exchange> queue;
    private ExceptionHandler interruptedExceptionHandler;
    private Consumer consumer;
    private boolean blockWhenFull;
    private final int queueCapacity;

    public EventDrivenPollingConsumer(Endpoint endpoint) {
        this(endpoint, 1000);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, int i) {
        super(endpoint);
        this.blockWhenFull = true;
        this.queueCapacity = i;
        if (i <= 0) {
            this.queue = new LinkedBlockingQueue();
        } else {
            this.queue = new ArrayBlockingQueue(i);
        }
        this.interruptedExceptionHandler = new org.apache.camel.support.LoggingExceptionHandler(endpoint.getCamelContext(), (Class<?>) EventDrivenPollingConsumer.class);
    }

    public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> blockingQueue) {
        super(endpoint);
        this.blockWhenFull = true;
        this.queue = blockingQueue;
        this.queueCapacity = blockingQueue.remainingCapacity();
        this.interruptedExceptionHandler = new org.apache.camel.support.LoggingExceptionHandler(endpoint.getCamelContext(), (Class<?>) EventDrivenPollingConsumer.class);
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean z) {
        this.blockWhenFull = z;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    @Override // org.apache.camel.PollingConsumer
    public Exchange receiveNoWait() {
        return receive(0L);
    }

    @Override // org.apache.camel.PollingConsumer
    public Exchange receive() {
        if (!isRunAllowed() || !isStarted()) {
            throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
        }
        while (isRunAllowed()) {
            try {
                beforePoll(0L);
                Exchange take = this.queue.take();
                afterPoll();
                return take;
            } catch (InterruptedException e) {
                try {
                    handleInterruptedException(e);
                    afterPoll();
                } catch (Throwable th) {
                    afterPoll();
                    throw th;
                }
            }
        }
        LOG.trace("Consumer is not running, so returning null");
        return null;
    }

    @Override // org.apache.camel.PollingConsumer
    public Exchange receive(long j) {
        if (isRunAllowed()) {
            try {
                if (isStarted()) {
                    try {
                        Exchange poll = this.queue.poll(beforePoll(j), TimeUnit.MILLISECONDS);
                        afterPoll();
                        return poll;
                    } catch (InterruptedException e) {
                        handleInterruptedException(e);
                        afterPoll();
                        return null;
                    }
                }
            } catch (Throwable th) {
                afterPoll();
                throw th;
            }
        }
        throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        if (!isBlockWhenFull()) {
            this.queue.add(exchange);
            return;
        }
        try {
            this.queue.put(exchange);
        } catch (InterruptedException e) {
            this.log.debug("Put interrupted, are we stopping? {}", Boolean.valueOf(isStopping() || isStopped()));
        }
    }

    public ExceptionHandler getInterruptedExceptionHandler() {
        return this.interruptedExceptionHandler;
    }

    public void setInterruptedExceptionHandler(ExceptionHandler exceptionHandler) {
        this.interruptedExceptionHandler = exceptionHandler;
    }

    protected void handleInterruptedException(InterruptedException interruptedException) {
        getInterruptedExceptionHandler().handleException(interruptedException);
    }

    protected long beforePoll(long j) {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            try {
                j = ((PollingConsumerPollingStrategy) this.consumer).beforePoll(j);
            } catch (Exception e) {
                LOG.debug("Error occurred before polling " + this.consumer + ". This exception will be ignored.", (Throwable) e);
            }
        }
        return j;
    }

    protected void afterPoll() {
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            try {
                ((PollingConsumerPollingStrategy) this.consumer).afterPoll();
            } catch (Exception e) {
                LOG.debug("Error occurred after polling " + this.consumer + ". This exception will be ignored.", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        this.consumer = getEndpoint().createConsumer(this);
        if (this.consumer instanceof PollingConsumerPollingStrategy) {
            ((PollingConsumerPollingStrategy) this.consumer).onInit();
        } else {
            ServiceHelper.startService((Service) this.consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        ServiceHelper.stopService(this.consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownService(this.consumer);
        this.queue.clear();
    }

    @Override // org.apache.camel.IsSingleton
    public boolean isSingleton() {
        return true;
    }
}
