package org.fusesource.fabric.camel;

import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.fusesource.fabric.groups.ChangeListener;
import org.fusesource.fabric.groups.Group;

/* loaded from: input_file:fuse-esb-7.0.1.fuse-084/system/org/fusesource/fabric/fabric-camel/7.0.1.fuse-084/fabric-camel-7.0.1.fuse-084.jar:org/fusesource/fabric/camel/FabricLocatorEndpoint.class */
public class FabricLocatorEndpoint extends DefaultEndpoint {
    private static final transient Log LOG = LogFactory.getLog(FabricLocatorEndpoint.class);
    private final FabricComponent component;
    private final Group group;
    private LoadBalancerFactory loadBalancerFactory;
    private LoadBalancer loadBalancer;

    public FabricLocatorEndpoint(String str, FabricComponent fabricComponent, Group group) {
        super(str, fabricComponent);
        this.component = fabricComponent;
        this.group = group;
    }

    @Override // org.apache.camel.Endpoint
    public Producer createProducer() throws Exception {
        return new DefaultProducer(this) { // from class: org.fusesource.fabric.camel.FabricLocatorEndpoint.1
            @Override // org.apache.camel.Processor
            public void process(Exchange exchange) throws Exception {
                FabricLocatorEndpoint.this.loadBalancer.process(exchange);
            }
        };
    }

    @Override // org.apache.camel.Endpoint
    public Consumer createConsumer(Processor processor) throws Exception {
        throw new UnsupportedOperationException("You cannot consume from a FABRIC endpoint using just its fabric name directly, you must use fabric:name:someActualUri instead");
    }

    @Override // org.apache.camel.IsSingleton
    public boolean isSingleton() {
        return true;
    }

    @Override // org.apache.camel.support.ServiceSupport, org.apache.camel.Service
    public void start() throws Exception {
        super.start();
        if (this.loadBalancer == null) {
            this.loadBalancer = createLoadBalancer();
        }
        this.group.add(new ChangeListener() { // from class: org.fusesource.fabric.camel.FabricLocatorEndpoint.2
            @Override // org.fusesource.fabric.groups.ChangeListener
            public void changed() {
                Iterator<byte[]> it = FabricLocatorEndpoint.this.group.members().values().iterator();
                while (it.hasNext()) {
                    try {
                        FabricLocatorEndpoint.this.loadBalancer.addProcessor(FabricLocatorEndpoint.this.getProcessor(new String(it.next(), "UTF-8")));
                    } catch (UnsupportedEncodingException e) {
                    }
                }
            }

            @Override // org.fusesource.fabric.groups.ChangeListener
            public void connected() {
                changed();
            }

            @Override // org.fusesource.fabric.groups.ChangeListener
            public void disconnected() {
                changed();
            }
        });
    }

    @Override // org.apache.camel.support.ServiceSupport, org.apache.camel.Service
    public void stop() throws Exception {
        super.stop();
        this.group.close();
    }

    public Processor getProcessor(String str) {
        final Endpoint endpoint = getCamelContext().getEndpoint(str);
        return new Processor() { // from class: org.fusesource.fabric.camel.FabricLocatorEndpoint.3
            @Override // org.apache.camel.Processor
            public void process(Exchange exchange) throws Exception {
                ProducerCache producerCache = FabricLocatorEndpoint.this.component.getProducerCache();
                Producer acquireProducer = producerCache.acquireProducer(endpoint);
                try {
                    acquireProducer.process(exchange);
                    producerCache.releaseProducer(endpoint, acquireProducer);
                } catch (Throwable th) {
                    producerCache.releaseProducer(endpoint, acquireProducer);
                    throw th;
                }
            }

            public String toString() {
                return "Producer for " + endpoint;
            }
        };
    }

    @Override // org.apache.camel.impl.DefaultEndpoint
    public FabricComponent getComponent() {
        return this.component;
    }

    public LoadBalancerFactory getLoadBalancerFactory() {
        if (this.loadBalancerFactory == null) {
            this.loadBalancerFactory = this.component.getLoadBalancerFactory();
        }
        return this.loadBalancerFactory;
    }

    public void setLoadBalancerFactory(LoadBalancerFactory loadBalancerFactory) {
        this.loadBalancerFactory = loadBalancerFactory;
    }

    public LoadBalancer createLoadBalancer() {
        return getLoadBalancerFactory().createLoadBalancer();
    }
}
