001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    
025    import javax.management.JMException;
026    import javax.management.MalformedObjectNameException;
027    import javax.management.ObjectName;
028    
029    import org.apache.camel.CamelContext;
030    import org.apache.camel.Consumer;
031    import org.apache.camel.Endpoint;
032    import org.apache.camel.Exchange;
033    import org.apache.camel.Route;
034    import org.apache.camel.Service;
035    import org.apache.camel.impl.DefaultCamelContext;
036    import org.apache.camel.impl.ServiceSupport;
037    import org.apache.camel.model.ExceptionType;
038    import org.apache.camel.model.ProcessorType;
039    import org.apache.camel.model.RouteType;
040    import org.apache.camel.spi.InstrumentationAgent;
041    import org.apache.camel.spi.LifecycleStrategy;
042    import org.apache.camel.spi.RouteContext;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    /**
047     * JMX agent that registeres Camel lifecycle events in JMX.
048     *
049     * @version $Revision: 1388 $
050     */
051    public class InstrumentationLifecycleStrategy implements LifecycleStrategy {
052        private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
053    
054        private InstrumentationAgent agent;
055        private CamelNamingStrategy namingStrategy;
056        private boolean initialized;
057    
058        // A map (Endpoint -> InstrumentationProcessor) to facilitate
059        // adding per-route interceptor and registering ManagedRoute MBean
060        private Map<Endpoint, InstrumentationProcessor> interceptorMap =
061            new HashMap<Endpoint, InstrumentationProcessor>();
062    
063        public InstrumentationLifecycleStrategy() {
064            this(new DefaultInstrumentationAgent());
065        }
066    
067        public InstrumentationLifecycleStrategy(InstrumentationAgent agent) {
068            this.agent = agent;
069        }
070        /**
071         * Constructor for camel context that has been started.
072         *
073         * @param agent    the agent
074         * @param context  the camel context
075         */
076        public InstrumentationLifecycleStrategy(InstrumentationAgent agent, CamelContext context) {
077            this.agent = agent;
078            onContextStart(context);
079        }
080    
081        public void onContextStart(CamelContext context) {
082            if (context instanceof DefaultCamelContext) {
083                try {
084                    initialized = true;
085                    DefaultCamelContext dc = (DefaultCamelContext)context;
086                    // call addService so that context will start and stop the agent
087                    dc.addService(agent);
088                    namingStrategy = new CamelNamingStrategy(agent.getMBeanObjectDomainName());
089                    ManagedService ms = new ManagedService(dc);
090                    agent.register(ms, getNamingStrategy().getObjectName(dc));
091                } catch (Exception e) {
092                    LOG.warn("Could not register CamelContext MBean", e);
093                }
094            }
095        }
096    
097        public void onEndpointAdd(Endpoint<? extends Exchange> endpoint) {
098            // the agent hasn't been started
099            if (!initialized) {
100                return;
101            }
102    
103            try {
104                ManagedEndpoint me = new ManagedEndpoint(endpoint);
105                agent.register(me, getNamingStrategy().getObjectName(me));
106            } catch (JMException e) {
107                LOG.warn("Could not register Endpoint MBean", e);
108            }
109        }
110    
111        public void onRoutesAdd(Collection<Route> routes) {
112            // the agent hasn't been started
113            if (!initialized) {
114                return;
115            }
116    
117            for (Route route : routes) {
118                try {
119                    ManagedRoute mr = new ManagedRoute(route);
120                    // retrieve the per-route intercept for this route
121                    InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint());
122                    if (interceptor == null) {
123                        LOG.warn("Instrumentation processor not found for route endpoint "
124                                 + route.getEndpoint());
125                    } else {
126                        interceptor.setCounter(mr);
127                    }
128                    agent.register(mr, getNamingStrategy().getObjectName(mr));
129                } catch (JMException e) {
130                    LOG.warn("Could not register Route MBean", e);
131                }
132            }
133        }
134    
135        public void onServiceAdd(CamelContext context, Service service) {
136            // the agent hasn't been started
137            if (!initialized) {
138                return;
139            }
140            if (service instanceof ServiceSupport && service instanceof Consumer) {
141                // TODO: add support for non-consumer services?
142                try {
143                    ManagedService ms = new ManagedService((ServiceSupport)service);
144                    agent.register(ms, getNamingStrategy().getObjectName(context, ms));
145                } catch (JMException e) {
146                    LOG.warn("Could not register Service MBean", e);
147                }
148            }
149        }
150    
151        public void onRouteContextCreate(RouteContext routeContext) {
152            // the agent hasn't been started
153            if (!initialized) {
154                return;
155            }
156    
157            // Create a map (ProcessorType -> PerformanceCounter)
158            // to be passed to InstrumentationInterceptStrategy.
159            Map<ProcessorType, PerformanceCounter> counterMap =
160                new HashMap<ProcessorType, PerformanceCounter>();
161    
162            // Each processor in a route will have its own performance counter
163            // The performance counter are MBeans that we register with MBeanServer.
164            // These performance counter will be embedded
165            // to InstrumentationProcessor and wrap the appropriate processor
166            // by InstrumentationInterceptStrategy.
167            RouteType route = routeContext.getRoute();
168            
169            for (ProcessorType processor : route.getOutputs()) {
170                ObjectName name = null;
171                try {
172                    // get the mbean name
173                    name = getNamingStrategy().getObjectName(routeContext, processor);
174    
175                    // register mbean wrapped in the performance counter mbean
176                    PerformanceCounter pc = new PerformanceCounter();
177                    agent.register(pc, name);
178    
179                    // add to map now that it has been registered
180                    counterMap.put(processor, pc);
181                } catch (MalformedObjectNameException e) {
182                    LOG.warn("Could not create MBean name: " + name, e);
183                } catch (JMException e) {
184                    LOG.warn("Could not register PerformanceCounter MBean: " + name, e);
185                }
186            }
187            
188            routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(counterMap));
189    
190            routeContext.setErrorHandlerWrappingStrategy(
191                    new InstrumentationErrorHandlerWrappingStrategy(counterMap));
192    
193            // Add an InstrumentationProcessor at the beginning of each route and
194            // set up the interceptorMap for onRoutesAdd() method to register the
195            // ManagedRoute MBeans.
196    
197            RouteType routeType = routeContext.getRoute();
198            if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) {
199                if (routeType.getInputs().size() > 1) {
200                    LOG.warn("Add InstrumentationProcessor to first input only.");
201                }
202    
203                Endpoint endpoint  = routeType.getInputs().get(0).getEndpoint();
204    
205                List<ProcessorType<?>> exceptionHandlers = new ArrayList<ProcessorType<?>>();
206                List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
207    
208                // separate out the exception handers in the outputs
209                for (ProcessorType output : routeType.getOutputs()) {
210                    if (output instanceof ExceptionType) {
211                        exceptionHandlers.add(output);
212                    } else {
213                        outputs.add(output);
214                    }
215                }
216    
217                // clearing the outputs
218                routeType.clearOutput();
219    
220                // add exception handlers as top children
221                routeType.getOutputs().addAll(exceptionHandlers);
222    
223                // add an interceptor
224                InstrumentationProcessor processor = new InstrumentationProcessor();
225                routeType.intercept(processor);
226    
227                // add the output
228                for (ProcessorType<?> processorType : outputs) {
229                    routeType.addOutput(processorType);
230                }
231    
232                interceptorMap.put(endpoint, processor);
233            }
234    
235        }
236    
237        public CamelNamingStrategy getNamingStrategy() {
238            return namingStrategy;
239        }
240    
241        public void setNamingStrategy(CamelNamingStrategy strategy) {
242            this.namingStrategy = strategy;
243        }
244    
245        public void setAgent(InstrumentationAgent agent) {
246            this.agent = agent;
247        }
248    
249    }