package org.apache.camel.component.pulsar;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/pulsar/PulsarMessageListener.class */
public class PulsarMessageListener implements MessageListener<byte[]> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageListener.class);
    private final PulsarEndpoint endpoint;
    private final ExceptionHandler exceptionHandler;
    private final Processor processor;

    public PulsarMessageListener(PulsarEndpoint pulsarEndpoint, ExceptionHandler exceptionHandler, Processor processor) {
        this.endpoint = pulsarEndpoint;
        this.exceptionHandler = exceptionHandler;
        this.processor = processor;
    }

    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        Exchange updateExchange = PulsarMessageUtils.updateExchange(message, this.endpoint.createExchange());
        try {
            if (this.endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
                updateExchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, this.endpoint.m0getComponent().getPulsarMessageReceiptFactory().newInstance(updateExchange, message, consumer));
                this.processor.process(updateExchange);
            } else {
                this.processor.process(updateExchange);
                consumer.acknowledge(message.getMessageId());
            }
        } catch (Exception e) {
            handleProcessorException(updateExchange, e);
        }
    }

    private void handleProcessorException(Exchange exchange, Exception exc) {
        this.exceptionHandler.handleException("An error occurred", PulsarMessageUtils.updateExchangeWithException(exc, exchange), exc);
        LOGGER.error("An error occurred while processing this exchange :: {}", exc);
    }
}
