package org.apache.camel.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Exchange;
import org.apache.camel.MessageHistory;
import org.apache.camel.processor.DefaultExchangeFormatter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/camel-core-2.17.0.redhat-630396.jar:org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.class */
public class DefaultAsyncProcessorAwaitManager extends org.apache.camel.support.ServiceSupport implements AsyncProcessorAwaitManager {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
    private final ExchangeFormatter exchangeFormatter;
    private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics();
    private final AtomicLong blockedCounter = new AtomicLong();
    private final AtomicLong interruptedCounter = new AtomicLong();
    private final AtomicLong totalDuration = new AtomicLong();
    private final AtomicLong minDuration = new AtomicLong();
    private final AtomicLong maxDuration = new AtomicLong();
    private final AtomicLong meanDuration = new AtomicLong();
    private final Map<Exchange, AsyncProcessorAwaitManager.AwaitThread> inflight = new ConcurrentHashMap();
    private boolean interruptThreadsWhileStopping = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/camel-core-2.17.0.redhat-630396.jar:org/apache/camel/impl/DefaultAsyncProcessorAwaitManager$AwaitThreadEntry.class */
    public static final class AwaitThreadEntry implements AsyncProcessorAwaitManager.AwaitThread {
        private final Thread thread;
        private final Exchange exchange;
        private final CountDownLatch latch;
        private final long start;
        private String routeId;
        private String nodeId;

        private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch countDownLatch) {
            this.thread = thread;
            this.exchange = exchange;
            this.latch = countDownLatch;
            this.start = System.currentTimeMillis();
            List list = (List) exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
            if (list == null || list.isEmpty()) {
                return;
            }
            MessageHistory messageHistory = (MessageHistory) list.get(list.size() - 1);
            this.routeId = messageHistory.getRouteId();
            this.nodeId = messageHistory.getNode() != null ? messageHistory.getNode().getId() : null;
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.AwaitThread
        public Thread getBlockedThread() {
            return this.thread;
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.AwaitThread
        public Exchange getExchange() {
            return this.exchange;
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.AwaitThread
        public long getWaitDuration() {
            return System.currentTimeMillis() - this.start;
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.AwaitThread
        public String getRouteId() {
            return this.routeId;
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.AwaitThread
        public String getNodeId() {
            return this.nodeId;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public String toString() {
            return "AwaitThreadEntry[name=" + this.thread.getName() + ", exchangeId=" + this.exchange.getExchangeId() + "]";
        }
    }

    /* loaded from: input_file:lib/camel-core-2.17.0.redhat-630396.jar:org/apache/camel/impl/DefaultAsyncProcessorAwaitManager$UtilizationStatistics.class */
    private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics {
        private boolean statisticsEnabled;

        private UtilizationStatistics() {
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public long getThreadsBlocked() {
            return DefaultAsyncProcessorAwaitManager.this.blockedCounter.get();
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public long getThreadsInterrupted() {
            return DefaultAsyncProcessorAwaitManager.this.interruptedCounter.get();
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public long getTotalDuration() {
            return DefaultAsyncProcessorAwaitManager.this.totalDuration.get();
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public long getMinDuration() {
            return DefaultAsyncProcessorAwaitManager.this.minDuration.get();
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public long getMaxDuration() {
            return DefaultAsyncProcessorAwaitManager.this.maxDuration.get();
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public long getMeanDuration() {
            return DefaultAsyncProcessorAwaitManager.this.meanDuration.get();
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public void reset() {
            DefaultAsyncProcessorAwaitManager.this.blockedCounter.set(0L);
            DefaultAsyncProcessorAwaitManager.this.interruptedCounter.set(0L);
            DefaultAsyncProcessorAwaitManager.this.totalDuration.set(0L);
            DefaultAsyncProcessorAwaitManager.this.minDuration.set(0L);
            DefaultAsyncProcessorAwaitManager.this.maxDuration.set(0L);
            DefaultAsyncProcessorAwaitManager.this.meanDuration.set(0L);
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public boolean isStatisticsEnabled() {
            return this.statisticsEnabled;
        }

        @Override // org.apache.camel.spi.AsyncProcessorAwaitManager.Statistics
        public void setStatisticsEnabled(boolean z) {
            this.statisticsEnabled = z;
        }

        public String toString() {
            return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]", Long.valueOf(getThreadsBlocked()), Long.valueOf(getThreadsInterrupted()), Long.valueOf(getTotalDuration()), Long.valueOf(getMinDuration()), Long.valueOf(getMaxDuration()), Long.valueOf(getMeanDuration()));
        }
    }

    public DefaultAsyncProcessorAwaitManager() {
        DefaultExchangeFormatter defaultExchangeFormatter = new DefaultExchangeFormatter();
        defaultExchangeFormatter.setShowExchangeId(true);
        defaultExchangeFormatter.setMultiline(true);
        defaultExchangeFormatter.setShowHeaders(true);
        defaultExchangeFormatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
        this.exchangeFormatter = defaultExchangeFormatter;
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public void await(Exchange exchange, CountDownLatch countDownLatch) {
        LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
        try {
            try {
                if (this.statistics.isStatisticsEnabled()) {
                    this.blockedCounter.incrementAndGet();
                }
                this.inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, countDownLatch));
                countDownLatch.await();
                LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
                AsyncProcessorAwaitManager.AwaitThread remove = this.inflight.remove(exchange);
                if (!this.statistics.isStatisticsEnabled() || remove == null) {
                    return;
                }
                long waitDuration = remove.getWaitDuration();
                long j = this.totalDuration.get() + waitDuration;
                this.totalDuration.set(j);
                if (waitDuration < this.minDuration.get()) {
                    this.minDuration.set(waitDuration);
                } else if (waitDuration > this.maxDuration.get()) {
                    this.maxDuration.set(waitDuration);
                }
                long j2 = this.blockedCounter.get();
                this.meanDuration.set(j2 > 0 ? j / j2 : 0L);
            } catch (InterruptedException e) {
                LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
                exchange.setException(e);
                AsyncProcessorAwaitManager.AwaitThread remove2 = this.inflight.remove(exchange);
                if (!this.statistics.isStatisticsEnabled() || remove2 == null) {
                    return;
                }
                long waitDuration2 = remove2.getWaitDuration();
                long j3 = this.totalDuration.get() + waitDuration2;
                this.totalDuration.set(j3);
                if (waitDuration2 < this.minDuration.get()) {
                    this.minDuration.set(waitDuration2);
                } else if (waitDuration2 > this.maxDuration.get()) {
                    this.maxDuration.set(waitDuration2);
                }
                long j4 = this.blockedCounter.get();
                this.meanDuration.set(j4 > 0 ? j3 / j4 : 0L);
            }
        } catch (Throwable th) {
            AsyncProcessorAwaitManager.AwaitThread remove3 = this.inflight.remove(exchange);
            if (this.statistics.isStatisticsEnabled() && remove3 != null) {
                long waitDuration3 = remove3.getWaitDuration();
                long j5 = this.totalDuration.get() + waitDuration3;
                this.totalDuration.set(j5);
                if (waitDuration3 < this.minDuration.get()) {
                    this.minDuration.set(waitDuration3);
                } else if (waitDuration3 > this.maxDuration.get()) {
                    this.maxDuration.set(waitDuration3);
                }
                long j6 = this.blockedCounter.get();
                this.meanDuration.set(j6 > 0 ? j5 / j6 : 0L);
            }
            throw th;
        }
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public void countDown(Exchange exchange, CountDownLatch countDownLatch) {
        LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
        countDownLatch.countDown();
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public int size() {
        return this.inflight.size();
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public Collection<AsyncProcessorAwaitManager.AwaitThread> browse() {
        return Collections.unmodifiableCollection(this.inflight.values());
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public void interrupt(String str) {
        Exchange exchange = null;
        Iterator<AsyncProcessorAwaitManager.AwaitThread> it = browse().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Exchange exchange2 = it.next().getExchange();
            if (str.equals(exchange2.getExchangeId())) {
                exchange = exchange2;
                break;
            }
        }
        if (exchange != null) {
            interrupt(exchange);
        }
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public void interrupt(Exchange exchange) {
        AwaitThreadEntry awaitThreadEntry = (AwaitThreadEntry) this.inflight.get(exchange);
        try {
            if (awaitThreadEntry != null) {
                try {
                    StringBuilder sb = new StringBuilder();
                    sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
                    sb.append(exchange.getExchangeId());
                    sb.append("\n");
                    sb.append(dumpBlockedThread(awaitThreadEntry));
                    String dumpMessageHistoryStacktrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, this.exchangeFormatter, false);
                    if (dumpMessageHistoryStacktrace != null) {
                        sb.append(dumpMessageHistoryStacktrace);
                    }
                    LOG.warn(sb.toString());
                    if (this.statistics.isStatisticsEnabled()) {
                        this.interruptedCounter.incrementAndGet();
                    }
                    exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
                    awaitThreadEntry.getLatch().countDown();
                } catch (Exception e) {
                    throw ObjectHelper.wrapRuntimeCamelException(e);
                }
            }
        } catch (Throwable th) {
            if (this.statistics.isStatisticsEnabled()) {
                this.interruptedCounter.incrementAndGet();
            }
            exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
            awaitThreadEntry.getLatch().countDown();
            throw th;
        }
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public boolean isInterruptThreadsWhileStopping() {
        return this.interruptThreadsWhileStopping;
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public void setInterruptThreadsWhileStopping(boolean z) {
        this.interruptThreadsWhileStopping = z;
    }

    @Override // org.apache.camel.spi.AsyncProcessorAwaitManager
    public AsyncProcessorAwaitManager.Statistics getStatistics() {
        return this.statistics;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        Collection<AsyncProcessorAwaitManager.AwaitThread> browse = browse();
        int size = browse.size();
        if (size > 0) {
            LOG.warn("Shutting down while there are still " + size + " inflight threads currently blocked.");
            StringBuilder sb = new StringBuilder();
            Iterator<AsyncProcessorAwaitManager.AwaitThread> it = browse.iterator();
            while (it.hasNext()) {
                sb.append(dumpBlockedThread(it.next()));
            }
            if (isInterruptThreadsWhileStopping()) {
                LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString());
                for (AsyncProcessorAwaitManager.AwaitThread awaitThread : browse) {
                    try {
                        interrupt(awaitThread.getExchange());
                    } catch (Throwable th) {
                        LOG.warn("Error while interrupting thread: " + awaitThread.getBlockedThread().getName() + ". This exception is ignored.", th);
                    }
                }
            } else {
                LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString());
            }
        } else {
            LOG.debug("Shutting down with no inflight threads.");
        }
        this.inflight.clear();
    }

    private static String dumpBlockedThread(AsyncProcessorAwaitManager.AwaitThread awaitThread) {
        StringBuilder sb = new StringBuilder();
        sb.append("\n");
        sb.append("Blocked Thread\n");
        sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
        sb.append(style("Id:")).append(awaitThread.getBlockedThread().getId()).append("\n");
        sb.append(style("Name:")).append(awaitThread.getBlockedThread().getName()).append("\n");
        sb.append(style("RouteId:")).append(safeNull(awaitThread.getRouteId())).append("\n");
        sb.append(style("NodeId:")).append(safeNull(awaitThread.getNodeId())).append("\n");
        sb.append(style("Duration:")).append(awaitThread.getWaitDuration()).append(" msec.\n");
        return sb.toString();
    }

    private static String style(String str) {
        return String.format("\t%-20s", str);
    }

    private static String safeNull(Object obj) {
        return obj != null ? obj.toString() : "";
    }
}
