package io.smallrye.reactive.messaging.camel;

import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.camel.CamelFailureHandler;
import io.smallrye.reactive.messaging.camel.i18n.CamelExceptions;
import io.smallrye.reactive.messaging.camel.i18n.CamelLogging;
import java.util.Map;
import java.util.Optional;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory;
import org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultExchange;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Connector(CamelConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "endpoint-uri", description = "The URI of the Camel endpoint (read from or written to)", mandatory = true, type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING), @ConnectorAttribute(name = "failure-strategy", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a Camel exchange is nacked. Values can be `fail` (default) or `ignore`", defaultValue = "fail"), @ConnectorAttribute(name = "merge", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false")})
/* loaded from: input_file:io/smallrye/reactive/messaging/camel/CamelConnector.class */
public class CamelConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    private static final String REACTIVE_STREAMS_SCHEME = "reactive-streams:";
    public static final String CONNECTOR_NAME = "smallrye-camel";

    @Inject
    private CamelContext camel;
    private CamelReactiveStreamsService reactive;

    @Produces
    public CamelReactiveStreamsService getCamelReactive() {
        if (this.reactive != null) {
            return this.reactive;
        }
        CamelReactiveStreamsService camelReactiveStreamsService = (CamelReactiveStreamsService) this.camel.hasService(CamelReactiveStreamsService.class);
        if (camelReactiveStreamsService != null) {
            CamelLogging.log.camelReactiveStreamsServiceAlreadyDefined();
            this.reactive = camelReactiveStreamsService;
            return camelReactiveStreamsService;
        }
        DefaultCamelReactiveStreamsServiceFactory defaultCamelReactiveStreamsServiceFactory = new DefaultCamelReactiveStreamsServiceFactory();
        ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration = new ReactiveStreamsEngineConfiguration();
        Config config = ConfigProvider.getConfig();
        Optional optionalValue = config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-max-size", Integer.class);
        reactiveStreamsEngineConfiguration.getClass();
        optionalValue.ifPresent((v1) -> {
            r1.setThreadPoolMaxSize(v1);
        });
        Optional optionalValue2 = config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-min-size", Integer.class);
        reactiveStreamsEngineConfiguration.getClass();
        optionalValue2.ifPresent((v1) -> {
            r1.setThreadPoolMinSize(v1);
        });
        Optional optionalValue3 = config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-name", String.class);
        reactiveStreamsEngineConfiguration.getClass();
        optionalValue3.ifPresent(reactiveStreamsEngineConfiguration::setThreadPoolName);
        this.reactive = defaultCamelReactiveStreamsServiceFactory.newInstance(this.camel, reactiveStreamsEngineConfiguration);
        try {
            this.camel.addService(this.reactive, true, true);
            return this.reactive;
        } catch (Exception e) {
            throw CamelExceptions.ex.unableToRegisterService(e);
        }
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        Publisher from;
        CamelConnectorIncomingConfiguration camelConnectorIncomingConfiguration = new CamelConnectorIncomingConfiguration(config);
        String endpointUri = camelConnectorIncomingConfiguration.getEndpointUri();
        CamelFailureHandler createFailureHandler = createFailureHandler(CamelFailureHandler.Strategy.from(camelConnectorIncomingConfiguration.getFailureStrategy()), camelConnectorIncomingConfiguration.getChannel());
        if (endpointUri.startsWith(REACTIVE_STREAMS_SCHEME)) {
            String substring = endpointUri.substring(REACTIVE_STREAMS_SCHEME.length());
            CamelLogging.log.creatingPublisherFromStream(substring);
            from = getCamelReactive().fromStream(substring);
        } else {
            CamelLogging.log.creatingPublisherFromEndpoint(endpointUri);
            from = getCamelReactive().from(endpointUri);
        }
        return ReactiveStreams.fromPublisher(from).map(exchange -> {
            return new CamelMessage(exchange, createFailureHandler);
        });
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        SubscriberBuilder<? extends Message<?>, Void> subscriberBuilder;
        String endpointUri = new CamelConnectorOutgoingConfiguration(config).getEndpointUri();
        if (endpointUri.startsWith(REACTIVE_STREAMS_SCHEME)) {
            String substring = endpointUri.substring(REACTIVE_STREAMS_SCHEME.length());
            CamelLogging.log.creatingSubscriberFromStream(substring);
            subscriberBuilder = ReactiveStreams.builder().map(this::createExchangeFromMessage).to(getCamelReactive().streamSubscriber(substring));
        } else {
            CamelLogging.log.creatingSubscriberFromEndpoint(endpointUri);
            subscriberBuilder = ReactiveStreams.builder().map(this::createExchangeFromMessage).to(getCamelReactive().subscriber(endpointUri));
        }
        return subscriberBuilder;
    }

    private Exchange createExchangeFromMessage(final Message<?> message) {
        if (message.getPayload() instanceof Exchange) {
            return (Exchange) message.getPayload();
        }
        OutgoingExchangeMetadata outgoingExchangeMetadata = (OutgoingExchangeMetadata) message.getMetadata(OutgoingExchangeMetadata.class).orElse(null);
        DefaultExchange defaultExchange = new DefaultExchange(this.camel);
        if (outgoingExchangeMetadata != null) {
            Map<String, Object> properties = outgoingExchangeMetadata.getProperties();
            defaultExchange.getClass();
            properties.forEach((str, obj) -> {
                defaultExchange.setProperty(str, obj);
            });
            if (outgoingExchangeMetadata.getExchangePattern() != null) {
                defaultExchange.setPattern(outgoingExchangeMetadata.getExchangePattern());
            }
            Map<String, Object> headers = outgoingExchangeMetadata.getHeaders();
            org.apache.camel.Message in = defaultExchange.getIn();
            in.getClass();
            headers.forEach(in::setHeader);
        }
        defaultExchange.getIn().setBody(message.getPayload());
        defaultExchange.addOnCompletion(new Synchronization() { // from class: io.smallrye.reactive.messaging.camel.CamelConnector.1
            public void onComplete(Exchange exchange) {
                message.ack();
            }

            public void onFailure(Exchange exchange) {
                CamelLogging.log.exchangeFailed(exchange.getException());
                message.nack(exchange.getException());
            }
        });
        return defaultExchange;
    }

    private CamelFailureHandler createFailureHandler(CamelFailureHandler.Strategy strategy, String str) {
        switch (strategy) {
            case IGNORE:
                return new CamelIgnoreFailure(str);
            case FAIL:
                return new CamelFailStop(str);
            default:
                throw CamelExceptions.ex.illegalArgumentUnknownStrategy(strategy);
        }
    }
}
