package org.apache.camel.component.kestrel;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.spy.memcached.MemcachedClient;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.spi.ShutdownAware;

/* loaded from: input_file:org/apache/camel/component/kestrel/KestrelConsumer.class */
public class KestrelConsumer extends DefaultConsumer implements ShutdownAware {
    private final KestrelEndpoint endpoint;
    private final MemcachedClient memcachedClient;
    private final BlockingQueue<Exchanger> exchangerQueue;
    private ExecutorService pollerExecutor;
    private ExecutorService handlerExecutor;
    private volatile boolean shutdownPending;
    private CountDownLatch shutdownLatch;
    private AtomicInteger pendingExchangeCount;

    /* loaded from: input_file:org/apache/camel/component/kestrel/KestrelConsumer$Handler.class */
    private final class Handler implements Runnable {
        private Exchanger exchanger;

        private Handler() {
            this.exchanger = new Exchanger();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (KestrelConsumer.this.log.isTraceEnabled()) {
                KestrelConsumer.this.log.trace("{} is starting", Thread.currentThread().getName());
            }
            while (KestrelConsumer.this.isRunAllowed() && !KestrelConsumer.this.shutdownPending) {
                try {
                    KestrelConsumer.this.exchangerQueue.put(this.exchanger);
                    KestrelConsumer.this.pendingExchangeCount.incrementAndGet();
                    try {
                        try {
                            Object exchange = this.exchanger.exchange(this);
                            KestrelConsumer.this.log.trace("Got a value from the exchanger");
                            Exchange exchange2 = null;
                            try {
                                exchange2 = KestrelConsumer.this.endpoint.createExchange();
                                exchange2.getIn().setBody(exchange);
                                KestrelConsumer.this.getProcessor().process(exchange2);
                            } catch (Exception e) {
                                if (exchange2 != null) {
                                    KestrelConsumer.this.getExceptionHandler().handleException("Error processing exchange", exchange2, e);
                                } else {
                                    KestrelConsumer.this.getExceptionHandler().handleException(e);
                                }
                            }
                            KestrelConsumer.this.pendingExchangeCount.decrementAndGet();
                        } catch (InterruptedException e2) {
                            if (KestrelConsumer.this.log.isDebugEnabled()) {
                                KestrelConsumer.this.log.debug("Interrupted, are we stopping? {}", Boolean.valueOf(KestrelConsumer.this.isStopping() || KestrelConsumer.this.isStopped()));
                            }
                            KestrelConsumer.this.pendingExchangeCount.decrementAndGet();
                        }
                    } catch (Throwable th) {
                        KestrelConsumer.this.pendingExchangeCount.decrementAndGet();
                        throw th;
                    }
                } catch (InterruptedException e3) {
                    if (KestrelConsumer.this.log.isDebugEnabled()) {
                        KestrelConsumer.this.log.debug("Interrupted, are we stopping? {}", Boolean.valueOf(KestrelConsumer.this.isStopping() || KestrelConsumer.this.isStopped()));
                    }
                }
            }
            KestrelConsumer.this.shutdownLatch.countDown();
            if (KestrelConsumer.this.log.isTraceEnabled()) {
                KestrelConsumer.this.log.trace("{} is finished", Thread.currentThread().getName());
            }
        }
    }

    /* loaded from: input_file:org/apache/camel/component/kestrel/KestrelConsumer$Poller.class */
    private final class Poller implements Runnable {
        private boolean concurrent;

        private Poller(boolean z) {
            this.concurrent = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            KestrelConsumer.this.log.trace("Kestrel poller is running");
            String queue = KestrelConsumer.this.endpoint.getConfiguration().getWaitTimeMs() > 0 ? KestrelConsumer.this.endpoint.getQueue() + "/t=" + KestrelConsumer.this.endpoint.getConfiguration().getWaitTimeMs() : KestrelConsumer.this.endpoint.getQueue();
            Exchanger exchanger = null;
            while (KestrelConsumer.this.isRunAllowed() && !KestrelConsumer.this.shutdownPending) {
                if (this.concurrent) {
                    try {
                        exchanger = (Exchanger) KestrelConsumer.this.exchangerQueue.take();
                    } catch (InterruptedException e) {
                        if (KestrelConsumer.this.log.isDebugEnabled()) {
                            KestrelConsumer.this.log.debug("Interrupted, are we stopping? {}", Boolean.valueOf(KestrelConsumer.this.isStopping() || KestrelConsumer.this.isStopped()));
                        }
                    }
                }
                Object obj = null;
                while (KestrelConsumer.this.isRunAllowed() && !KestrelConsumer.this.shutdownPending) {
                    KestrelConsumer.this.log.trace("Polling {}", queue);
                    try {
                        obj = KestrelConsumer.this.memcachedClient.get(queue);
                        if (obj != null) {
                            break;
                        }
                    } catch (Exception e2) {
                        if (KestrelConsumer.this.isRunAllowed() && !KestrelConsumer.this.shutdownPending) {
                            KestrelConsumer.this.getExceptionHandler().handleException("Failed to get object from kestrel", e2);
                        }
                    }
                    if (KestrelConsumer.this.isRunAllowed() && !KestrelConsumer.this.shutdownPending && KestrelConsumer.this.endpoint.getConfiguration().getWaitTimeMs() <= 0) {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e3) {
                        }
                    }
                }
                KestrelConsumer.this.log.trace("Got object from {}", queue);
                if (this.concurrent) {
                    try {
                        exchanger.exchange(obj);
                    } catch (InterruptedException e4) {
                        if (KestrelConsumer.this.log.isDebugEnabled()) {
                            KestrelConsumer.this.log.debug("Interrupted, are we stopping? {}", Boolean.valueOf(KestrelConsumer.this.isStopping() || KestrelConsumer.this.isStopped()));
                        }
                    }
                } else {
                    KestrelConsumer.this.pendingExchangeCount.incrementAndGet();
                    Exchange exchange = null;
                    try {
                        try {
                            exchange = KestrelConsumer.this.endpoint.createExchange();
                            exchange.getIn().setBody(obj);
                            KestrelConsumer.this.getProcessor().process(exchange);
                        } finally {
                            KestrelConsumer.this.pendingExchangeCount.decrementAndGet();
                        }
                    } catch (Exception e5) {
                        if (exchange != null) {
                            KestrelConsumer.this.getExceptionHandler().handleException("Error processing exchange", exchange, e5);
                        } else {
                            KestrelConsumer.this.getExceptionHandler().handleException(e5);
                        }
                    }
                }
            }
            KestrelConsumer.this.log.trace("Finished polling {}", queue);
            KestrelConsumer.this.shutdownLatch.countDown();
        }
    }

    public KestrelConsumer(KestrelEndpoint kestrelEndpoint, Processor processor, MemcachedClient memcachedClient) {
        super(kestrelEndpoint, processor);
        this.exchangerQueue = new LinkedBlockingQueue();
        this.pendingExchangeCount = new AtomicInteger(0);
        this.endpoint = kestrelEndpoint;
        this.memcachedClient = memcachedClient;
    }

    protected void doStart() throws Exception {
        this.log.info("Starting consumer for " + this.endpoint.getEndpointUri());
        int concurrentConsumers = this.endpoint.getConfiguration().getConcurrentConsumers();
        this.shutdownPending = false;
        if (concurrentConsumers > 1) {
            this.shutdownLatch = new CountDownLatch(concurrentConsumers + 1);
            this.handlerExecutor = this.endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "Handlers-" + this.endpoint.getEndpointUri(), concurrentConsumers);
            for (int i = 0; i < concurrentConsumers; i++) {
                this.handlerExecutor.execute(new Handler());
            }
        } else {
            this.shutdownLatch = new CountDownLatch(1);
        }
        this.pollerExecutor = this.endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Poller-" + this.endpoint.getEndpointUri());
        this.pollerExecutor.submit(new Poller(concurrentConsumers > 1));
        super.doStart();
    }

    protected void doStop() throws Exception {
        this.log.info("Stopping consumer for " + this.endpoint.getEndpointUri());
        if (this.pollerExecutor != null) {
            this.endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(this.pollerExecutor);
        }
        if (this.handlerExecutor != null) {
            this.endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(this.handlerExecutor);
        }
        super.doStop();
    }

    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        return false;
    }

    public int getPendingExchangesSize() {
        return this.pendingExchangeCount.get();
    }

    public void prepareShutdown(boolean z) {
        this.shutdownPending = true;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Preparing to shutdown, waiting for {} threads to complete.", Long.valueOf(this.shutdownLatch.getCount()));
        }
        try {
            this.shutdownLatch.await();
        } catch (InterruptedException e) {
        }
    }
}
