package org.apache.camel.component.jms.reply;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.Service;
import org.apache.camel.component.jms.JmsConstants;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.jms.JmsMessageHelper;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.qpid.jms.message.JmsMessageSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.AbstractMessageListenerContainer;

/* loaded from: input_file:BOOT-INF/lib/camel-jms-2.23.2.fuse-750029-redhat-00001.jar:org/apache/camel/component/jms/reply/ReplyManagerSupport.class */
public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
    protected final CamelContext camelContext;
    protected ScheduledExecutorService scheduledExecutorService;
    protected ExecutorService executorService;
    protected JmsEndpoint endpoint;
    protected Destination replyTo;
    protected AbstractMessageListenerContainer listenerContainer;
    protected CorrelationTimeoutMap correlation;
    protected String correlationProperty;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
    protected final long replyToTimeout = 10000;

    public ReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public void setOnTimeoutExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public void setEndpoint(JmsEndpoint jmsEndpoint) {
        this.endpoint = jmsEndpoint;
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public void setReplyTo(Destination destination) {
        this.log.trace("ReplyTo destination: {}", destination);
        this.replyTo = destination;
        this.replyToLatch.countDown();
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public void setCorrelationProperty(String str) {
        this.correlationProperty = str;
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public Destination getReplyTo() {
        if (this.replyTo != null) {
            return this.replyTo;
        }
        try {
            this.log.trace("Waiting for replyTo to be set");
            if (this.replyToLatch.await(10000L, TimeUnit.MILLISECONDS)) {
                this.log.trace("Waiting for replyTo to be set done");
            } else {
                this.log.warn("ReplyTo destination was not set and timeout occurred");
            }
        } catch (InterruptedException e) {
        }
        return this.replyTo;
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback asyncCallback, String str, String str2, long j) {
        if (this.correlation.putIfAbsent(str2, (ReplyHandler) new QueueReplyHandler(replyManager, exchange, asyncCallback, str, str2, j), j) != null) {
            throw new IllegalArgumentException(String.format("The correlationId [%s] is not unique.", str2));
        }
        return str2;
    }

    protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback asyncCallback, String str, String str2, long j);

    @Override // org.springframework.jms.listener.SessionAwareMessageListener
    public void onMessage(Message message, Session session) throws JMSException {
        String str = null;
        try {
            str = this.correlationProperty == null ? message.getJMSCorrelationID() : message.getStringProperty(this.correlationProperty);
        } catch (JMSException e) {
        }
        if (str == null) {
            this.log.warn("Ignoring message with no correlationID: {}", message);
        } else {
            this.log.debug("Received reply message with correlationID [{}] -> {}", str, message);
            handleReplyMessage(str, message, session);
        }
    }

    @Override // org.apache.camel.component.jms.reply.ReplyManager
    public void processReply(ReplyHolder replyHolder) {
        Object removeHeader;
        if (replyHolder == null || !isRunAllowed()) {
            return;
        }
        try {
            Exchange exchange = replyHolder.getExchange();
            if (replyHolder.isTimeout()) {
                if (this.log.isWarnEnabled()) {
                    this.log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}. Setting ExchangeTimedOutException on {} and continue routing.", Long.valueOf(replyHolder.getRequestTimeout()), replyHolder.getCorrelationId(), this.replyTo, ExchangeHelper.logIds(exchange));
                }
                exchange.setException(new ExchangeTimedOutException(exchange, replyHolder.getRequestTimeout(), "reply message with correlationID: " + replyHolder.getCorrelationId() + " not received on destination: " + this.replyTo));
            } else {
                Message message = replyHolder.getMessage();
                JmsMessage jmsMessage = new JmsMessage(message, replyHolder.getSession(), this.endpoint.getBinding());
                exchange.setOut(jmsMessage);
                Object body = jmsMessage.getBody();
                if (this.endpoint.isTransferException() && (body instanceof Exception)) {
                    this.log.debug("Reply was an Exception. Setting the Exception on the Exchange: {}", body);
                    exchange.setException((Exception) body);
                } else {
                    this.log.debug("Reply received. OUT message body set to reply payload: {}", body);
                }
                if (this.endpoint.isTransferFault() && (removeHeader = jmsMessage.removeHeader(JmsConstants.JMS_TRANSFER_FAULT)) != null) {
                    boolean booleanValue = ((Boolean) exchange.getContext().getTypeConverter().tryConvertTo(Boolean.TYPE, removeHeader)).booleanValue();
                    this.log.debug("Transfer fault on OUT message: {}", Boolean.valueOf(booleanValue));
                    if (booleanValue) {
                        exchange.getOut().setFault(true);
                    }
                }
                if (replyHolder.getOriginalCorrelationId() != null) {
                    JmsMessageHelper.setCorrelationId(message, replyHolder.getOriginalCorrelationId());
                    exchange.getOut().setHeader(JmsMessageSupport.JMS_CORRELATIONID, replyHolder.getOriginalCorrelationId());
                }
            }
        } finally {
            replyHolder.getCallback().done(false);
        }
    }

    protected abstract void handleReplyMessage(String str, Message message, Session session);

    protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public ReplyHandler waitForProvisionCorrelationToBeUpdated(String str, Message message) {
        if (this.log.isWarnEnabled()) {
            this.log.warn("Early reply received with correlationID [{}] -> {}", str, message);
        }
        ReplyHandler replyHandler = null;
        boolean z = false;
        int i = 0;
        while (!z) {
            int i2 = i;
            i++;
            if (i2 >= this.endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter()) {
                break;
            }
            this.log.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", Integer.valueOf(i));
            try {
                Thread.sleep(this.endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime());
            } catch (InterruptedException e) {
            }
            replyHandler = this.correlation.get(str);
            z = replyHandler != null;
            if (replyHandler != null && this.log.isTraceEnabled()) {
                this.log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}", str, Integer.valueOf(i), replyHandler);
            }
        }
        return replyHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        ObjectHelper.notNull(this.executorService, "executorService", this);
        ObjectHelper.notNull(this.scheduledExecutorService, "scheduledExecutorService", this);
        ObjectHelper.notNull(this.endpoint, "endpoint", this);
        this.log.trace("Using timeout checker interval with {} millis", Long.valueOf(this.endpoint.getRequestTimeoutCheckerInterval()));
        this.correlation = new CorrelationTimeoutMap(this.scheduledExecutorService, this.endpoint.getRequestTimeoutCheckerInterval(), this.executorService);
        ServiceHelper.startService((Service) this.correlation);
        this.listenerContainer = createListenerContainer();
        this.listenerContainer.afterPropertiesSet();
        this.log.debug("Starting reply listener container on endpoint: {}", this.endpoint);
        this.endpoint.onListenerContainerStarting(this.listenerContainer);
        this.listenerContainer.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        ServiceHelper.stopService(this.correlation);
        if (this.listenerContainer != null) {
            this.log.debug("Stopping reply listener container on endpoint: {}", this.endpoint);
            try {
                this.listenerContainer.stop();
                this.listenerContainer.destroy();
            } finally {
                this.endpoint.onListenerContainerStopped(this.listenerContainer);
                this.listenerContainer = null;
            }
        }
        if (this.scheduledExecutorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.scheduledExecutorService);
            this.scheduledExecutorService = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.executorService);
            this.executorService = null;
        }
    }
}
