package org.apache.camel.component.sjms.batch;

import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer.class */
public class SjmsBatchConsumer extends DefaultConsumer {
    public static final String SJMS_BATCH_TIMEOUT_CHECKER = "SJmsBatchTimeoutChecker";
    private static final boolean TRANSACTED = true;
    private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
    private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
    private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong();
    private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong();
    private ScheduledExecutorService timeoutCheckerExecutorService;
    private boolean shutdownTimeoutCheckerExecutorService;
    private final SjmsBatchEndpoint sjmsBatchEndpoint;
    private final AggregationStrategy aggregationStrategy;
    private final int completionSize;
    private final int completionInterval;
    private final int completionTimeout;
    private final Predicate completionPredicate;
    private final boolean eagerCheckCompletion;
    private final int consumerCount;
    private final int pollDuration;
    private final ConnectionFactory connectionFactory;
    private final String destinationName;
    private ExecutorService jmsConsumerExecutors;
    private final AtomicBoolean running;
    private final AtomicReference<CountDownLatch> consumersShutdownLatchRef;
    private volatile Connection connection;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer$BatchConsumptionLoop.class */
    public class BatchConsumptionLoop implements Runnable {
        private final AtomicBoolean completionTimeoutTrigger;
        private final BatchConsumptionTask task;
        private int keepAliveDelay;

        /* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.class */
        private final class BatchConsumptionTask {
            private final AtomicBoolean timeoutInterval;
            private final AtomicBoolean timeout = new AtomicBoolean();
            private int messageCount;
            private long timeElapsed;
            private long startTime;
            private Exchange aggregatedExchange;

            BatchConsumptionTask(AtomicBoolean atomicBoolean) {
                this.timeoutInterval = atomicBoolean;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void consumeBatchesOnLoop(Session session, MessageConsumer messageConsumer) throws JMSException {
                boolean z = SjmsBatchConsumer.this.completionTimeout > 0;
                SjmsBatchConsumer.LOG.trace("BatchConsumptionTask +++ start +++");
                while (SjmsBatchConsumer.this.running.get()) {
                    SjmsBatchConsumer.LOG.trace("BatchConsumptionTask running");
                    if (this.timeout.compareAndSet(true, false) || this.timeoutInterval.compareAndSet(true, false)) {
                        SjmsBatchConsumer.LOG.trace("Completion batch due timeout");
                        completionBatch(session, SjmsBatchConsumer.this.completionInterval > 0 ? "interval" : "timeout");
                        reset();
                    } else if (SjmsBatchConsumer.this.completionSize <= 0 || this.messageCount < SjmsBatchConsumer.this.completionSize) {
                        Message receive = messageConsumer.receive((!z || this.timeElapsed <= 0) ? SjmsBatchConsumer.this.pollDuration : BatchConsumptionLoop.this.getReceiveWaitTime(this.timeElapsed));
                        if (SjmsBatchConsumer.this.running.get()) {
                            if (receive == null) {
                                SjmsBatchConsumer.LOG.trace("No message received");
                            } else {
                                this.messageCount += SjmsBatchConsumer.TRANSACTED;
                                SjmsBatchConsumer.LOG.debug("#{} messages received", Integer.valueOf(this.messageCount));
                                if (z && this.startTime == 0) {
                                    this.startTime = new Date().getTime();
                                }
                                Exchange createExchange = SjmsBatchConsumer.this.m13getEndpoint().createExchange(receive, session);
                                this.aggregatedExchange = SjmsBatchConsumer.this.aggregationStrategy.aggregate(this.aggregatedExchange, createExchange);
                                this.aggregatedExchange.setProperty("CamelBatchSize", Integer.valueOf(this.messageCount));
                                if (SjmsBatchConsumer.this.completionPredicate != null) {
                                    try {
                                        if (SjmsBatchConsumer.this.eagerCheckCompletion ? SjmsBatchConsumer.this.completionPredicate.matches(createExchange) : SjmsBatchConsumer.this.completionPredicate.matches(this.aggregatedExchange)) {
                                            SjmsBatchConsumer.LOG.trace("Completion batch due predicate");
                                            completionBatch(session, "predicate");
                                            reset();
                                        }
                                    } catch (Exception e) {
                                        SjmsBatchConsumer.LOG.warn("Error during evaluation of completion predicate " + e.getMessage() + ". This exception is ignored.", e);
                                    }
                                }
                            }
                            if (z && this.startTime > 0) {
                                this.timeElapsed = new Date().getTime() - this.startTime;
                                if (this.timeElapsed > SjmsBatchConsumer.this.completionTimeout) {
                                    this.timeout.set(true);
                                } else {
                                    SjmsBatchConsumer.LOG.trace("This batch has more time until the timeout, elapsed: {} timeout: {}", Long.valueOf(this.timeElapsed), Integer.valueOf(SjmsBatchConsumer.this.completionTimeout));
                                }
                            }
                        } else {
                            SjmsBatchConsumer.LOG.info("Shutdown signal received - rolling back {} pending in batch from destination {}", Integer.valueOf(this.messageCount), SjmsBatchConsumer.this.destinationName);
                            session.rollback();
                        }
                    } else {
                        SjmsBatchConsumer.LOG.trace("Completion batch due size");
                        completionBatch(session, "size");
                        reset();
                    }
                }
                SjmsBatchConsumer.LOG.trace("BatchConsumptionTask +++ end +++");
            }

            private void reset() {
                this.messageCount = 0;
                this.timeElapsed = 0L;
                this.startTime = 0L;
                this.aggregatedExchange = null;
            }

            private void completionBatch(Session session, String str) {
                if (this.aggregatedExchange == null && SjmsBatchConsumer.this.m13getEndpoint().isSendEmptyMessageWhenIdle()) {
                    BatchConsumptionLoop.this.processEmptyMessage();
                } else if (this.aggregatedExchange != null) {
                    BatchConsumptionLoop.this.processBatch(this.aggregatedExchange, session, str);
                }
            }
        }

        private BatchConsumptionLoop() {
            this.completionTimeoutTrigger = new AtomicBoolean();
            this.task = new BatchConsumptionTask(this.completionTimeoutTrigger);
        }

        public AtomicBoolean getCompletionTimeoutTrigger() {
            return this.completionTimeoutTrigger;
        }

        public void setKeepAliveDelay(int i) {
            this.keepAliveDelay = i;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            MessageConsumer createConsumer;
            while (true) {
                Session session = null;
                try {
                    try {
                        try {
                            try {
                                session = SjmsBatchConsumer.this.connection.createSession(true, 2);
                                createConsumer = session.createConsumer(session.createQueue(SjmsBatchConsumer.this.destinationName));
                            } catch (Throwable th) {
                                closeJmsSession(null);
                                throw th;
                            }
                        } catch (Throwable th2) {
                            ((CountDownLatch) SjmsBatchConsumer.this.consumersShutdownLatchRef.get()).countDown();
                            throw th2;
                        }
                    } catch (Exception e) {
                        if (this.keepAliveDelay < 0) {
                            throw e;
                        }
                        SjmsBatchConsumer.this.getExceptionHandler().handleException("Exception caught consuming from " + SjmsBatchConsumer.this.destinationName, e);
                        if (this.keepAliveDelay > 0) {
                            Thread.sleep(this.keepAliveDelay);
                        }
                        closeJmsSession(session);
                    }
                    try {
                        this.task.consumeBatchesOnLoop(session, createConsumer);
                        closeJmsConsumer(createConsumer);
                        closeJmsSession(session);
                        if (!SjmsBatchConsumer.this.running.get() && !SjmsBatchConsumer.this.isStarting()) {
                            ((CountDownLatch) SjmsBatchConsumer.this.consumersShutdownLatchRef.get()).countDown();
                            return;
                        }
                    } catch (Throwable th3) {
                        closeJmsConsumer(createConsumer);
                        throw th3;
                        break;
                    }
                } catch (Throwable th4) {
                    SjmsBatchConsumer.this.getExceptionHandler().handleException("Exiting consumption loop due to exception caught consuming from " + SjmsBatchConsumer.this.destinationName, th4);
                    ((CountDownLatch) SjmsBatchConsumer.this.consumersShutdownLatchRef.get()).countDown();
                    return;
                }
            }
        }

        private void closeJmsConsumer(MessageConsumer messageConsumer) {
            try {
                messageConsumer.close();
            } catch (JMSException e) {
                if (SjmsBatchConsumer.LOG.isDebugEnabled()) {
                    SjmsBatchConsumer.LOG.debug("Exception caught closing consumer", e);
                }
                SjmsBatchConsumer.LOG.warn("Exception caught closing consumer for : {}. This exception is ignored.", SjmsBatchConsumer.this.destinationName, e.getMessage());
            }
        }

        private void closeJmsSession(Session session) {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    if (SjmsBatchConsumer.LOG.isDebugEnabled()) {
                        SjmsBatchConsumer.LOG.debug("Exception caught closing session", e);
                    }
                    SjmsBatchConsumer.LOG.warn("Exception caught closing session for {}: {}. This exception is ignored.", SjmsBatchConsumer.this.destinationName, e.getMessage());
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getReceiveWaitTime(long j) {
            long timeRemaining = getTimeRemaining(j);
            if (timeRemaining <= 0) {
                timeRemaining = 1;
            }
            long min = Math.min(timeRemaining, SjmsBatchConsumer.this.pollDuration);
            SjmsBatchConsumer.LOG.trace("Waiting for {}", Long.valueOf(min));
            return min;
        }

        private long getTimeRemaining(long j) {
            long j2 = SjmsBatchConsumer.this.completionTimeout - j;
            if (SjmsBatchConsumer.LOG.isDebugEnabled() && j > 0) {
                SjmsBatchConsumer.LOG.debug("Time remaining this batch: {}", Long.valueOf(j2));
            }
            return j2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processEmptyMessage() {
            Exchange createExchange = SjmsBatchConsumer.this.m13getEndpoint().createExchange();
            SjmsBatchConsumer.LOG.debug("Sending empty message as there were no messages from polling: {}", SjmsBatchConsumer.this.m13getEndpoint());
            try {
                SjmsBatchConsumer.this.getProcessor().process(createExchange);
            } catch (Exception e) {
                SjmsBatchConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processBatch(Exchange exchange, Session session, String str) {
            int andIncrement = SjmsBatchConsumer.BATCH_COUNT.getAndIncrement();
            int intValue = ((Integer) exchange.getProperty("CamelBatchSize", Integer.class)).intValue();
            if (SjmsBatchConsumer.LOG.isDebugEnabled()) {
                SjmsBatchConsumer.LOG.debug("Processing batch[" + andIncrement + "]:size=" + intValue + ":total=" + (SjmsBatchConsumer.MESSAGE_RECEIVED.get() + intValue));
            }
            if ("timeout".equals(str)) {
                SjmsBatchConsumer.this.aggregationStrategy.timeout(exchange, andIncrement, intValue, SjmsBatchConsumer.this.completionTimeout);
            }
            exchange.setProperty("CamelAggregatedCompletedBy", str);
            SjmsBatchConsumer.this.aggregationStrategy.onCompletion(exchange);
            exchange.adapt(ExtendedExchange.class).addOnCompletion(new SessionCompletion(session));
            try {
                SjmsBatchConsumer.this.getProcessor().process(exchange);
                SjmsBatchConsumer.LOG.debug("Completed processing[{}]:total={}", Integer.valueOf(andIncrement), Long.valueOf(SjmsBatchConsumer.MESSAGE_PROCESSED.addAndGet(intValue)));
            } catch (Exception e) {
                SjmsBatchConsumer.this.getExceptionHandler().handleException("Error processing exchange from " + SjmsBatchConsumer.this.destinationName, exchange, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer$CompletionIntervalTask.class */
    public final class CompletionIntervalTask implements Runnable {
        private final List<AtomicBoolean> triggers;

        CompletionIntervalTask(List<AtomicBoolean> list) {
            this.triggers = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!SjmsBatchConsumer.this.m13getEndpoint().getCamelContext().getStatus().isStarted()) {
                SjmsBatchConsumer.LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", SjmsBatchConsumer.this.m13getEndpoint().getCamelContext().getName());
                return;
            }
            Iterator<AtomicBoolean> it = this.triggers.iterator();
            while (it.hasNext()) {
                it.next().set(true);
            }
        }
    }

    /* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer$StartConsumerTask.class */
    protected class StartConsumerTask implements Runnable {
        private boolean recoveryEnabled;
        private int recoveryInterval;
        private int keepAliveDelay;
        private long attempt;

        public StartConsumerTask(boolean z, int i, int i2) {
            this.recoveryEnabled = z;
            this.recoveryInterval = i;
            this.keepAliveDelay = i2;
        }

        /* JADX WARN: Removed duplicated region for block: B:35:0x0208  */
        /* JADX WARN: Removed duplicated region for block: B:43:0x023f A[SYNTHETIC] */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 647
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.camel.component.sjms.batch.SjmsBatchConsumer.StartConsumerTask.run():void");
        }
    }

    public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
        super(sjmsBatchEndpoint, processor);
        this.running = new AtomicBoolean();
        this.consumersShutdownLatchRef = new AtomicReference<>();
        this.sjmsBatchEndpoint = (SjmsBatchEndpoint) ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint");
        this.destinationName = StringHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
        this.completionSize = sjmsBatchEndpoint.getCompletionSize();
        this.completionInterval = sjmsBatchEndpoint.getCompletionInterval();
        this.completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
        if (this.completionInterval > 0 && this.completionTimeout != 500) {
            throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
        }
        if (sjmsBatchEndpoint.isSendEmptyMessageWhenIdle() && this.completionTimeout <= 0 && this.completionInterval <= 0) {
            throw new IllegalArgumentException("SendEmptyMessageWhenIdle can only be enabled if either completionInterval or completionTimeout is also set");
        }
        this.completionPredicate = sjmsBatchEndpoint.getCompletionPredicate();
        this.eagerCheckCompletion = sjmsBatchEndpoint.isEagerCheckCompletion();
        this.pollDuration = sjmsBatchEndpoint.getPollDuration();
        if (this.pollDuration < 0) {
            throw new IllegalArgumentException("pollDuration must be 0 or greater");
        }
        this.aggregationStrategy = (AggregationStrategy) ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy");
        this.consumerCount = sjmsBatchEndpoint.getConsumerCount();
        if (this.consumerCount <= 0) {
            throw new IllegalArgumentException("consumerCount must be greater than 0");
        }
        this.connectionFactory = (ConnectionFactory) ObjectHelper.notNull(sjmsBatchEndpoint.m14getComponent().getConnectionFactory(), "jmsBatchComponent.connectionFactory");
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public SjmsBatchEndpoint m13getEndpoint() {
        return this.sjmsBatchEndpoint;
    }

    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
        return this.timeoutCheckerExecutorService;
    }

    public void setTimeoutCheckerExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.timeoutCheckerExecutorService = scheduledExecutorService;
    }

    protected void doStart() throws Exception {
        super.doStart();
        boolean isAsyncStartListener = m13getEndpoint().isAsyncStartListener();
        StartConsumerTask startConsumerTask = new StartConsumerTask(isAsyncStartListener, m13getEndpoint().getRecoveryInterval(), m13getEndpoint().getKeepAliveDelay());
        if (!isAsyncStartListener) {
            startConsumerTask.run();
        } else {
            m13getEndpoint().getCamelContext().getExecutorServiceManager().newThread("AsyncStartStopListener[" + this.destinationName + "]", startConsumerTask).start();
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.running.set(false);
        CountDownLatch countDownLatch = this.consumersShutdownLatchRef.get();
        if (countDownLatch != null) {
            LOG.info("Stop signalled, waiting on consumers for {} to shut down", this.destinationName);
            if (countDownLatch.await(60L, TimeUnit.SECONDS)) {
                LOG.warn("Timeout waiting on consumer threads for {} to signal completion - shutting down", this.destinationName);
            } else {
                LOG.info("All consumers for {} have been shutdown", this.destinationName);
            }
        } else {
            LOG.info("Stop signalled while there are no consumers for {} yet, so no need to wait for consumers", this.destinationName);
        }
        try {
            LOG.debug("Shutting down JMS connection for {}", this.destinationName);
            this.connection.close();
        } catch (Exception e) {
        }
        m13getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.jmsConsumerExecutors);
        this.jmsConsumerExecutors = null;
        if (this.shutdownTimeoutCheckerExecutorService) {
            m13getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.timeoutCheckerExecutorService);
            this.timeoutCheckerExecutorService = null;
        }
    }
}
