001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.Collection;
020    import java.util.HashMap;
021    import java.util.Map;
022    
023    import javax.management.JMException;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Endpoint;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Route;
029    import org.apache.camel.Service;
030    import org.apache.camel.impl.DefaultCamelContext;
031    import org.apache.camel.impl.ServiceSupport;
032    import org.apache.camel.model.ProcessorType;
033    import org.apache.camel.model.RouteType;
034    import org.apache.camel.processor.interceptor.Debugger;
035    import org.apache.camel.spi.InstrumentationAgent;
036    import org.apache.camel.spi.LifecycleStrategy;
037    import org.apache.camel.spi.RouteContext;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    
041    /**
042     * JMX agent that registeres Camel lifecycle events in JMX.
043     */
044    public class InstrumentationLifecycleStrategy implements LifecycleStrategy {
045        private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
046    
047        private InstrumentationAgent agent;
048        private CamelNamingStrategy namingStrategy;
049    
050        // A map (Endpoint -> InstrumentationProcessor) to facilitate
051        // adding per-route interceptor and registering ManagedRoute MBean
052        private Map<Endpoint, InstrumentationProcessor> interceptorMap =
053            new HashMap<Endpoint, InstrumentationProcessor>();
054    
055        public InstrumentationLifecycleStrategy(InstrumentationAgent agent,
056                CamelNamingStrategy namingStrategy) {
057            this.agent = agent;
058            this.namingStrategy = namingStrategy;
059        }
060    
061        public void onContextCreate(CamelContext context) {
062            if (context instanceof DefaultCamelContext) {
063                try {
064                    DefaultCamelContext dc = (DefaultCamelContext)context;
065                    ManagedService ms = new ManagedService(dc);
066                    agent.register(ms, getNamingStrategy().getObjectName(dc));
067                } catch (JMException e) {
068                    LOG.warn("Could not register CamelContext MBean", e);
069                }
070            }
071        }
072    
073        public void onEndpointAdd(Endpoint<? extends Exchange> endpoint) {
074            try {
075                ManagedEndpoint me = new ManagedEndpoint(endpoint);
076                agent.register(me, getNamingStrategy().getObjectName(me));
077            } catch (JMException e) {
078                LOG.warn("Could not register Endpoint MBean", e);
079            }
080        }
081    
082        public void onRoutesAdd(Collection<Route> routes) {
083            for (Route route : routes) {
084                try {
085                    ManagedRoute mr = new ManagedRoute(route);
086                    // retrieve the per-route intercept for this route
087                    InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint());
088                    if (interceptor == null) {
089                        LOG.warn("Instrumentation processor not found for route endpoint "
090                                 + route.getEndpoint());
091                    } else {
092                        interceptor.setCounter(mr);
093                    }
094                    agent.register(mr, getNamingStrategy().getObjectName(mr));
095                } catch (JMException e) {
096                    LOG.warn("Could not register Route MBean", e);
097                }
098            }
099        }
100    
101        public void onServiceAdd(CamelContext context, Service service) {
102            if (service instanceof ServiceSupport) {
103                try {
104                    ManagedService ms = new ManagedService((ServiceSupport)service);
105                    agent.register(ms, getNamingStrategy().getObjectName(context, ms));
106                } catch (JMException e) {
107                    LOG.warn("Could not register Service MBean", e);
108                }
109            }
110        }
111    
112        public void onRouteContextCreate(RouteContext routeContext) {
113    
114            // Create a map (ProcessorType -> PerformanceCounter)
115            // to be passed to InstrumentationInterceptStrategy.
116            Map<ProcessorType, PerformanceCounter> counterMap =
117                new HashMap<ProcessorType, PerformanceCounter>();
118    
119            // Each processor in a route will have its own performance counter
120            // The performance counter are MBeans that we register with MBeanServer.
121            // These performance counter will be embedded
122            // to InstrumentationProcessor and wrap the appropriate processor
123            // by InstrumentationInterceptStrategy.
124            RouteType route = routeContext.getRoute();
125            for (ProcessorType processor : route.getOutputs()) {
126                PerformanceCounter pc = new PerformanceCounter();
127                try {
128                    agent.register(pc, getNamingStrategy().getObjectName(
129                            routeContext, processor));
130                } catch (JMException e) {
131                    LOG.warn("Could not register Counter MBean", e);
132                }
133                counterMap.put(processor, pc);
134            }
135    
136            routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(counterMap));
137    
138    
139            // Add an InstrumentationProcessor at the beginning of each route and
140            // set up the interceptorMap for onRoutesAdd() method to register the
141            // ManagedRoute MBeans.
142    
143            RouteType routeType = routeContext.getRoute();
144            if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) {
145                if (routeType.getInputs().size() > 1) {
146                    LOG.warn("Add InstrumentationProcessor to first input only.");
147                }
148    
149                Endpoint endpoint  = routeType.getInputs().get(0).getEndpoint();
150                ProcessorType<?>[] outputs =
151                    routeType.getOutputs().toArray(new ProcessorType<?>[0]);
152    
153                routeType.clearOutput();
154                InstrumentationProcessor processor = new InstrumentationProcessor();
155                routeType.intercept(processor);
156                for (ProcessorType<?> output : outputs) {
157                    routeType.addOutput(output);
158                }
159    
160                interceptorMap.put(endpoint, processor);
161            }
162    
163        }
164    
165        public CamelNamingStrategy getNamingStrategy() {
166            return namingStrategy;
167        }
168    
169        public void setNamingStrategy(CamelNamingStrategy strategy) {
170            this.namingStrategy = strategy;
171        }
172    }