001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.management;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024
025 import javax.management.JMException;
026 import javax.management.MalformedObjectNameException;
027 import javax.management.ObjectName;
028
029 import org.apache.camel.CamelContext;
030 import org.apache.camel.Consumer;
031 import org.apache.camel.Endpoint;
032 import org.apache.camel.Exchange;
033 import org.apache.camel.Route;
034 import org.apache.camel.Service;
035 import org.apache.camel.impl.DefaultCamelContext;
036 import org.apache.camel.impl.ServiceSupport;
037 import org.apache.camel.model.ExceptionType;
038 import org.apache.camel.model.ProcessorType;
039 import org.apache.camel.model.RouteType;
040 import org.apache.camel.spi.InstrumentationAgent;
041 import org.apache.camel.spi.LifecycleStrategy;
042 import org.apache.camel.spi.RouteContext;
043 import org.apache.commons.logging.Log;
044 import org.apache.commons.logging.LogFactory;
045
046 /**
047 * JMX agent that registeres Camel lifecycle events in JMX.
048 *
049 * @version $Revision: 702954 $
050 */
051 public class InstrumentationLifecycleStrategy implements LifecycleStrategy {
052 private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
053
054 private InstrumentationAgent agent;
055 private CamelNamingStrategy namingStrategy;
056 private boolean initialized;
057
058 // A map (Endpoint -> InstrumentationProcessor) to facilitate
059 // adding per-route interceptor and registering ManagedRoute MBean
060 private Map<Endpoint, InstrumentationProcessor> interceptorMap =
061 new HashMap<Endpoint, InstrumentationProcessor>();
062
063 public InstrumentationLifecycleStrategy() {
064 this(new DefaultInstrumentationAgent());
065 }
066
067 public InstrumentationLifecycleStrategy(InstrumentationAgent agent) {
068 this.agent = agent;
069 }
070 /**
071 * Constructor for camel context that has been started.
072 *
073 * @param agent the agent
074 * @param context the camel context
075 */
076 public InstrumentationLifecycleStrategy(InstrumentationAgent agent, CamelContext context) {
077 this.agent = agent;
078 onContextStart(context);
079 }
080
081 public void onContextStart(CamelContext context) {
082 if (context instanceof DefaultCamelContext) {
083 try {
084 initialized = true;
085 DefaultCamelContext dc = (DefaultCamelContext)context;
086 // call addService so that context will start and stop the agent
087 dc.addService(agent);
088 namingStrategy = new CamelNamingStrategy(agent.getMBeanObjectDomainName());
089 ManagedService ms = new ManagedService(dc);
090 agent.register(ms, getNamingStrategy().getObjectName(dc));
091 } catch (Exception e) {
092 LOG.warn("Could not register CamelContext MBean", e);
093 }
094 }
095 }
096
097 public void onEndpointAdd(Endpoint<? extends Exchange> endpoint) {
098 // the agent hasn't been started
099 if (!initialized) {
100 return;
101 }
102
103 try {
104 ManagedEndpoint me = new ManagedEndpoint(endpoint);
105 agent.register(me, getNamingStrategy().getObjectName(me));
106 } catch (JMException e) {
107 LOG.warn("Could not register Endpoint MBean", e);
108 }
109 }
110
111 public void onRoutesAdd(Collection<Route> routes) {
112 // the agent hasn't been started
113 if (!initialized) {
114 return;
115 }
116
117 for (Route route : routes) {
118 try {
119 ManagedRoute mr = new ManagedRoute(route);
120 // retrieve the per-route intercept for this route
121 InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint());
122 if (interceptor == null) {
123 LOG.warn("Instrumentation processor not found for route endpoint "
124 + route.getEndpoint());
125 } else {
126 interceptor.setCounter(mr);
127 }
128 agent.register(mr, getNamingStrategy().getObjectName(mr));
129 } catch (JMException e) {
130 LOG.warn("Could not register Route MBean", e);
131 }
132 }
133 }
134
135 public void onServiceAdd(CamelContext context, Service service) {
136 // the agent hasn't been started
137 if (!initialized) {
138 return;
139 }
140 if (service instanceof ServiceSupport && service instanceof Consumer) {
141 // TODO: add support for non-consumer services?
142 try {
143 ManagedService ms = new ManagedService((ServiceSupport)service);
144 agent.register(ms, getNamingStrategy().getObjectName(context, ms));
145 } catch (JMException e) {
146 LOG.warn("Could not register Service MBean", e);
147 }
148 }
149 }
150
151 public void onRouteContextCreate(RouteContext routeContext) {
152 // the agent hasn't been started
153 if (!initialized) {
154 return;
155 }
156
157 // Create a map (ProcessorType -> PerformanceCounter)
158 // to be passed to InstrumentationInterceptStrategy.
159 Map<ProcessorType, PerformanceCounter> counterMap =
160 new HashMap<ProcessorType, PerformanceCounter>();
161
162 // Each processor in a route will have its own performance counter
163 // The performance counter are MBeans that we register with MBeanServer.
164 // These performance counter will be embedded
165 // to InstrumentationProcessor and wrap the appropriate processor
166 // by InstrumentationInterceptStrategy.
167 RouteType route = routeContext.getRoute();
168
169 for (ProcessorType processor : route.getOutputs()) {
170 ObjectName name = null;
171 try {
172 // get the mbean name
173 name = getNamingStrategy().getObjectName(routeContext, processor);
174
175 // register mbean wrapped in the performance counter mbean
176 PerformanceCounter pc = new PerformanceCounter();
177 agent.register(pc, name);
178
179 // add to map now that it has been registered
180 counterMap.put(processor, pc);
181 } catch (MalformedObjectNameException e) {
182 LOG.warn("Could not create MBean name: " + name, e);
183 } catch (JMException e) {
184 LOG.warn("Could not register PerformanceCounter MBean: " + name, e);
185 }
186 }
187
188 routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(counterMap));
189
190 routeContext.setErrorHandlerWrappingStrategy(
191 new InstrumentationErrorHandlerWrappingStrategy(counterMap));
192
193 // Add an InstrumentationProcessor at the beginning of each route and
194 // set up the interceptorMap for onRoutesAdd() method to register the
195 // ManagedRoute MBeans.
196
197 RouteType routeType = routeContext.getRoute();
198 if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) {
199 if (routeType.getInputs().size() > 1) {
200 LOG.warn("Add InstrumentationProcessor to first input only.");
201 }
202
203 Endpoint endpoint = routeType.getInputs().get(0).getEndpoint();
204
205 List<ProcessorType<?>> exceptionHandlers = new ArrayList<ProcessorType<?>>();
206 List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
207
208 // separate out the exception handers in the outputs
209 for (ProcessorType output : routeType.getOutputs()) {
210 if (output instanceof ExceptionType) {
211 exceptionHandlers.add(output);
212 } else {
213 outputs.add(output);
214 }
215 }
216
217 // clearing the outputs
218 routeType.clearOutput();
219
220 // add exception handlers as top children
221 routeType.getOutputs().addAll(exceptionHandlers);
222
223 // add an interceptor
224 InstrumentationProcessor processor = new InstrumentationProcessor();
225 routeType.intercept(processor);
226
227 // add the output
228 for (ProcessorType<?> processorType : outputs) {
229 routeType.addOutput(processorType);
230 }
231
232 interceptorMap.put(endpoint, processor);
233 }
234
235 }
236
237 public CamelNamingStrategy getNamingStrategy() {
238 return namingStrategy;
239 }
240
241 public void setNamingStrategy(CamelNamingStrategy strategy) {
242 this.namingStrategy = strategy;
243 }
244
245 public void setAgent(InstrumentationAgent agent) {
246 this.agent = agent;
247 }
248
249 }