001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.CamelContext; 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.ExchangePattern; 027 import org.apache.camel.Message; 028 import org.apache.camel.NoSuchEndpointException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Producer; 031 import org.apache.camel.ProducerTemplate; 032 import org.apache.camel.util.ObjectHelper; 033 import org.apache.camel.util.CamelContextHelper; 034 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; 035 036 /** 037 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate 038 * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an 039 * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}. 040 * 041 * @version $Revision: 62373 $ 042 */ 043 public class DefaultProducerTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> { 044 private CamelContext context; 045 private final ProducerCache<E> producerCache = new ProducerCache<E>(); 046 private boolean useEndpointCache = true; 047 private final Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>(); 048 private Endpoint<E> defaultEndpoint; 049 050 public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) { 051 Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri); 052 return new DefaultProducerTemplate(camelContext, endpoint); 053 } 054 055 public DefaultProducerTemplate(CamelContext context) { 056 this.context = context; 057 } 058 059 public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) { 060 this(context); 061 this.defaultEndpoint = defaultEndpoint; 062 } 063 064 public E send(String endpointUri, E exchange) { 065 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 066 return send(endpoint, exchange); 067 } 068 069 public E send(String endpointUri, Processor processor) { 070 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 071 return send(endpoint, processor); 072 } 073 074 public E send(String endpointUri, Processor processor, AsyncCallback callback) { 075 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 076 return send(endpoint, processor, callback); 077 } 078 079 public E send(String endpointUri, ExchangePattern pattern, Processor processor) { 080 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 081 return send(endpoint, pattern, processor); 082 } 083 084 public E send(Endpoint<E> endpoint, E exchange) { 085 E convertedExchange = exchange; 086 producerCache.send(endpoint, convertedExchange); 087 return convertedExchange; 088 } 089 090 public E send(Endpoint<E> endpoint, Processor processor) { 091 return producerCache.send(endpoint, processor); 092 } 093 094 public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) { 095 return producerCache.send(endpoint, processor, callback); 096 } 097 098 public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) { 099 return producerCache.send(endpoint, pattern, processor); 100 } 101 102 public Object sendBody(Endpoint<E> endpoint, ExchangePattern pattern, Object body) { 103 E result = send(endpoint, pattern, createSetBodyProcessor(body)); 104 return extractResultBody(result, pattern); 105 } 106 107 public Object sendBody(Endpoint<E> endpoint, Object body) { 108 E result = send(endpoint, createSetBodyProcessor(body)); 109 return extractResultBody(result); 110 } 111 112 public Object sendBody(String endpointUri, Object body) { 113 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 114 return sendBody(endpoint, body); 115 } 116 117 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) { 118 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 119 return sendBody(endpoint, pattern, body); 120 } 121 122 public Object sendBodyAndHeader(String endpointUri, final Object body, final String header, 123 final Object headerValue) { 124 return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 125 } 126 127 public Object sendBodyAndHeader(Endpoint<E> endpoint, final Object body, final String header, 128 final Object headerValue) { 129 E result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue)); 130 return extractResultBody(result); 131 } 132 133 public Object sendBodyAndHeader(Endpoint<E> endpoint, ExchangePattern pattern, final Object body, final String header, 134 final Object headerValue) { 135 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 136 return extractResultBody(result, pattern); 137 } 138 139 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, final String header, 140 final Object headerValue) { 141 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 142 return extractResultBody(result, pattern); 143 } 144 145 public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) { 146 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 147 } 148 149 public Object sendBodyAndHeaders(Endpoint<E> endpoint, final Object body, final Map<String, Object> headers) { 150 E result = send(endpoint, new Processor() { 151 public void process(Exchange exchange) { 152 Message in = exchange.getIn(); 153 for (Map.Entry<String, Object> header : headers.entrySet()) { 154 in.setHeader(header.getKey(), header.getValue()); 155 } 156 in.setBody(body); 157 } 158 }); 159 return extractResultBody(result); 160 } 161 162 public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) { 163 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers); 164 } 165 166 public Object sendBodyAndHeaders(Endpoint<E> endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) { 167 E result = send(endpoint, pattern, new Processor() { 168 public void process(Exchange exchange) throws Exception { 169 Message in = exchange.getIn(); 170 for (Map.Entry<String, Object> header : headers.entrySet()) { 171 in.setHeader(header.getKey(), header.getValue()); 172 } 173 in.setBody(body); 174 } 175 }); 176 return extractResultBody(result); 177 } 178 179 // Methods using an InOut ExchangePattern 180 // ----------------------------------------------------------------------- 181 182 public E request(Endpoint<E> endpoint, Processor processor) { 183 return send(endpoint, ExchangePattern.InOut, processor); 184 } 185 186 public Object requestBody(Endpoint<E> endpoint, Object body) { 187 return sendBody(endpoint, ExchangePattern.InOut, body); 188 } 189 190 public Object requestBodyAndHeader(Endpoint<E> endpoint, Object body, String header, Object headerValue) { 191 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 192 } 193 194 public E request(String endpoint, Processor processor) { 195 return send(endpoint, ExchangePattern.InOut, processor); 196 } 197 198 public Object requestBody(String endpoint, Object body) { 199 return sendBody(endpoint, ExchangePattern.InOut, body); 200 } 201 202 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) { 203 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 204 } 205 206 public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) { 207 return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 208 } 209 210 public Object requestBodyAndHeaders(Endpoint<E> endpoint, final Object body, final Map<String, Object> headers) { 211 return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers); 212 } 213 214 // Methods using the default endpoint 215 // ----------------------------------------------------------------------- 216 217 public Object sendBody(Object body) { 218 return sendBody(getMandatoryDefaultEndpoint(), body); 219 } 220 221 public E send(E exchange) { 222 return send(getMandatoryDefaultEndpoint(), exchange); 223 } 224 225 public E send(Processor processor) { 226 return send(getMandatoryDefaultEndpoint(), processor); 227 } 228 229 public Object sendBodyAndHeader(Object body, String header, Object headerValue) { 230 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue); 231 } 232 233 public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) { 234 return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers); 235 } 236 237 // Properties 238 // ----------------------------------------------------------------------- 239 public Producer<E> getProducer(Endpoint<E> endpoint) { 240 return producerCache.getProducer(endpoint); 241 } 242 243 public CamelContext getContext() { 244 return context; 245 } 246 247 public Endpoint<E> getDefaultEndpoint() { 248 return defaultEndpoint; 249 } 250 251 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) { 252 this.defaultEndpoint = defaultEndpoint; 253 } 254 255 /** 256 * Sets the default endpoint to use if none is specified 257 */ 258 public void setDefaultEndpointUri(String endpointUri) { 259 setDefaultEndpoint(getContext().getEndpoint(endpointUri)); 260 } 261 262 public boolean isUseEndpointCache() { 263 return useEndpointCache; 264 } 265 266 public void setUseEndpointCache(boolean useEndpointCache) { 267 this.useEndpointCache = useEndpointCache; 268 } 269 270 public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) { 271 Endpoint<?> e = null; 272 synchronized (endpointCache) { 273 e = endpointCache.get(endpointUri); 274 } 275 if (e != null && expectedClass.isAssignableFrom(e.getClass())) { 276 return expectedClass.asSubclass(expectedClass).cast(e); 277 } 278 return null; 279 } 280 281 // Implementation methods 282 // ----------------------------------------------------------------------- 283 284 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) { 285 return new Processor() { 286 public void process(Exchange exchange) { 287 Message in = exchange.getIn(); 288 in.setHeader(header, headerValue); 289 in.setBody(body); 290 } 291 }; 292 } 293 294 protected Processor createSetBodyProcessor(final Object body) { 295 return new Processor() { 296 public void process(Exchange exchange) { 297 Message in = exchange.getIn(); 298 in.setBody(body); 299 } 300 }; 301 } 302 303 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 304 Endpoint endpoint = null; 305 306 if (isUseEndpointCache()) { 307 synchronized (endpointCache) { 308 endpoint = endpointCache.get(endpointUri); 309 if (endpoint == null) { 310 endpoint = context.getEndpoint(endpointUri); 311 if (endpoint != null) { 312 endpointCache.put(endpointUri, endpoint); 313 } 314 } 315 } 316 } else { 317 endpoint = context.getEndpoint(endpointUri); 318 } 319 if (endpoint == null) { 320 throw new NoSuchEndpointException(endpointUri); 321 } 322 return endpoint; 323 } 324 325 protected Endpoint<E> getMandatoryDefaultEndpoint() { 326 Endpoint<E> answer = getDefaultEndpoint(); 327 ObjectHelper.notNull(answer, "defaultEndpoint"); 328 return answer; 329 } 330 331 protected void doStart() throws Exception { 332 producerCache.start(); 333 } 334 335 protected void doStop() throws Exception { 336 producerCache.stop(); 337 endpointCache.clear(); 338 } 339 340 /** 341 * Extracts the body from the given result. 342 * 343 * @param result the result 344 * @return the result, can be <tt>null</tt>. 345 */ 346 protected Object extractResultBody(E result) { 347 return extractResultBody(result, null); 348 } 349 350 /** 351 * Extracts the body from the given result. 352 * <p/> 353 * If the exchange pattern is provided it will try to honor it and retrive the body 354 * from either IN or OUT according to the pattern. 355 * 356 * @param result the result 357 * @param pattern exchange pattern if given, can be <tt>null</tt> 358 * @return the result, can be <tt>null</tt>. 359 */ 360 protected Object extractResultBody(E result, ExchangePattern pattern) { 361 Object answer = null; 362 if (result != null) { 363 // rethrow if there was an exception 364 if (result.getException() != null) { 365 throw wrapRuntimeCamelException(result.getException()); 366 } 367 368 // result could have a fault message 369 if (hasFaultMessage(result)) { 370 return result.getFault().getBody(); 371 } 372 373 // okay no fault then return the response according to the pattern 374 // try to honor pattern if provided 375 boolean notOut = pattern != null && !pattern.isOutCapable(); 376 boolean hasOut = result.getOut(false) != null; 377 if (hasOut && !notOut) { 378 answer = result.getOut().getBody(); 379 } else { 380 answer = result.getIn().getBody(); 381 } 382 } 383 return answer; 384 } 385 386 protected boolean hasFaultMessage(E result) { 387 Message faultMessage = result.getFault(false); 388 if (faultMessage != null) { 389 Object faultBody = faultMessage.getBody(); 390 if (faultBody != null) { 391 return true; 392 } 393 } 394 return false; 395 } 396 397 }