001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.management; 018 019 import java.util.Collection; 020 import java.util.HashMap; 021 import java.util.Map; 022 023 import javax.management.JMException; 024 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.Route; 029 import org.apache.camel.Service; 030 import org.apache.camel.impl.DefaultCamelContext; 031 import org.apache.camel.impl.ServiceSupport; 032 import org.apache.camel.model.ProcessorType; 033 import org.apache.camel.model.RouteType; 034 import org.apache.camel.processor.interceptor.Debugger; 035 import org.apache.camel.spi.InstrumentationAgent; 036 import org.apache.camel.spi.LifecycleStrategy; 037 import org.apache.camel.spi.RouteContext; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 /** 042 * JMX agent that registeres Camel lifecycle events in JMX. 043 */ 044 public class InstrumentationLifecycleStrategy implements LifecycleStrategy { 045 private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class); 046 047 private InstrumentationAgent agent; 048 private CamelNamingStrategy namingStrategy; 049 050 // A map (Endpoint -> InstrumentationProcessor) to facilitate 051 // adding per-route interceptor and registering ManagedRoute MBean 052 private Map<Endpoint, InstrumentationProcessor> interceptorMap = 053 new HashMap<Endpoint, InstrumentationProcessor>(); 054 055 public InstrumentationLifecycleStrategy(InstrumentationAgent agent, 056 CamelNamingStrategy namingStrategy) { 057 this.agent = agent; 058 this.namingStrategy = namingStrategy; 059 } 060 061 public void onContextCreate(CamelContext context) { 062 if (context instanceof DefaultCamelContext) { 063 try { 064 DefaultCamelContext dc = (DefaultCamelContext)context; 065 ManagedService ms = new ManagedService(dc); 066 agent.register(ms, getNamingStrategy().getObjectName(dc)); 067 } catch (JMException e) { 068 LOG.warn("Could not register CamelContext MBean", e); 069 } 070 } 071 } 072 073 public void onEndpointAdd(Endpoint<? extends Exchange> endpoint) { 074 try { 075 ManagedEndpoint me = new ManagedEndpoint(endpoint); 076 agent.register(me, getNamingStrategy().getObjectName(me)); 077 } catch (JMException e) { 078 LOG.warn("Could not register Endpoint MBean", e); 079 } 080 } 081 082 public void onRoutesAdd(Collection<Route> routes) { 083 for (Route route : routes) { 084 try { 085 ManagedRoute mr = new ManagedRoute(route); 086 // retrieve the per-route intercept for this route 087 InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint()); 088 if (interceptor == null) { 089 LOG.warn("Instrumentation processor not found for route endpoint " 090 + route.getEndpoint()); 091 } else { 092 interceptor.setCounter(mr); 093 } 094 agent.register(mr, getNamingStrategy().getObjectName(mr)); 095 } catch (JMException e) { 096 LOG.warn("Could not register Route MBean", e); 097 } 098 } 099 } 100 101 public void onServiceAdd(CamelContext context, Service service) { 102 if (service instanceof ServiceSupport) { 103 try { 104 ManagedService ms = new ManagedService((ServiceSupport)service); 105 agent.register(ms, getNamingStrategy().getObjectName(context, ms)); 106 } catch (JMException e) { 107 LOG.warn("Could not register Service MBean", e); 108 } 109 } 110 } 111 112 public void onRouteContextCreate(RouteContext routeContext) { 113 114 // Create a map (ProcessorType -> PerformanceCounter) 115 // to be passed to InstrumentationInterceptStrategy. 116 Map<ProcessorType, PerformanceCounter> counterMap = 117 new HashMap<ProcessorType, PerformanceCounter>(); 118 119 // Each processor in a route will have its own performance counter 120 // The performance counter are MBeans that we register with MBeanServer. 121 // These performance counter will be embedded 122 // to InstrumentationProcessor and wrap the appropriate processor 123 // by InstrumentationInterceptStrategy. 124 RouteType route = routeContext.getRoute(); 125 for (ProcessorType processor : route.getOutputs()) { 126 PerformanceCounter pc = new PerformanceCounter(); 127 try { 128 agent.register(pc, getNamingStrategy().getObjectName( 129 routeContext, processor)); 130 } catch (JMException e) { 131 LOG.warn("Could not register Counter MBean", e); 132 } 133 counterMap.put(processor, pc); 134 } 135 136 routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(counterMap)); 137 138 139 // Add an InstrumentationProcessor at the beginning of each route and 140 // set up the interceptorMap for onRoutesAdd() method to register the 141 // ManagedRoute MBeans. 142 143 RouteType routeType = routeContext.getRoute(); 144 if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) { 145 if (routeType.getInputs().size() > 1) { 146 LOG.warn("Add InstrumentationProcessor to first input only."); 147 } 148 149 Endpoint endpoint = routeType.getInputs().get(0).getEndpoint(); 150 ProcessorType<?>[] outputs = 151 routeType.getOutputs().toArray(new ProcessorType<?>[0]); 152 153 routeType.clearOutput(); 154 InstrumentationProcessor processor = new InstrumentationProcessor(); 155 routeType.intercept(processor); 156 for (ProcessorType<?> output : outputs) { 157 routeType.addOutput(output); 158 } 159 160 interceptorMap.put(endpoint, processor); 161 } 162 163 } 164 165 public CamelNamingStrategy getNamingStrategy() { 166 return namingStrategy; 167 } 168 169 public void setNamingStrategy(CamelNamingStrategy strategy) { 170 this.namingStrategy = strategy; 171 } 172 }