/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.rx;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.rx.RuntimeCamelRxException;
import org.apache.camel.rx.support.EndpointObservable;
import org.apache.camel.rx.support.EndpointSubscribeFunc;
import org.apache.camel.rx.support.ExchangeToBodyFunc1;
import org.apache.camel.rx.support.ExchangeToMessageFunc1;
import org.apache.camel.rx.support.ObserverSender;
import org.apache.camel.util.CamelContextHelper;
import rx.Observable;
import rx.functions.Func1;

public class ReactiveCamel {
    private final CamelContext camelContext;

    public ReactiveCamel(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    public Observable<Message> toObservable(String uri) {
        return this.toObservable(this.endpoint(uri));
    }

    public <T> Observable<T> toObservable(String uri, Class<T> bodyType) {
        return this.toObservable(this.endpoint(uri), bodyType);
    }

    public Observable<Message> toObservable(Endpoint endpoint) {
        return this.createEndpointObservable(endpoint, ExchangeToMessageFunc1.getInstance());
    }

    public <T> Observable<T> toObservable(Endpoint endpoint, Class<T> bodyType) {
        return this.createEndpointObservable(endpoint, new ExchangeToBodyFunc1<T>(bodyType));
    }

    public <T> void sendTo(Observable<T> observable, String endpointUri) {
        this.sendTo(observable, this.endpoint(endpointUri));
    }

    public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
        try {
            ObserverSender observer = new ObserverSender(endpoint);
            observable.subscribe(observer);
        }
        catch (Exception e) {
            throw new RuntimeCamelRxException(e);
        }
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    public Endpoint endpoint(String endpointUri) {
        return CamelContextHelper.getMandatoryEndpoint((CamelContext)this.camelContext, (String)endpointUri);
    }

    protected <T> Observable<T> createEndpointObservable(Endpoint endpoint, Func1<Exchange, T> converter) {
        EndpointSubscribeFunc<T> func = new EndpointSubscribeFunc<T>(endpoint, converter);
        return new EndpointObservable<T>(endpoint, func);
    }
}

