/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.rx.support;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Service;
import org.apache.camel.rx.support.ProcessorToObserver;
import org.apache.camel.util.ServiceHelper;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;

public class EndpointSubscription<T>
implements Subscription {
    private final Endpoint endpoint;
    private final Observer<T> observer;
    private Consumer consumer;

    public EndpointSubscription(Endpoint endpoint, Observer<T> observer, Func1<Exchange, T> func) {
        this.endpoint = endpoint;
        this.observer = observer;
        ProcessorToObserver<T> processor = new ProcessorToObserver<T>(func, observer);
        try {
            this.consumer = endpoint.createConsumer(processor);
            ServiceHelper.startService((Service)this.consumer);
        }
        catch (Exception e) {
            observer.onError((Throwable)e);
        }
    }

    public String toString() {
        return "EndpointSubscription[" + this.endpoint + " observer: " + this.observer + "]";
    }

    public void unsubscribe() {
        if (this.consumer != null) {
            try {
                ServiceHelper.stopServices((Object[])new Object[]{this.consumer});
                this.observer.onCompleted();
            }
            catch (Exception e) {
                this.observer.onError((Throwable)e);
            }
        }
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public Observer<T> getObserver() {
        return this.observer;
    }
}

