001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.impl.ServiceSupport;
023    import org.apache.camel.util.ObjectHelper;
024    import org.apache.camel.util.ProducerCache;
025    
026    /**
027     * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
028     * et al) for working with Camel and sending {@link Message} instances in an
029     * {@link Exchange} to an {@link Endpoint}.
030     *
031     * @version $Revision: 40987 $
032     */
033    public class CamelTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> {
034        private CamelContext context;
035        private ProducerCache<E> producerCache = new ProducerCache<E>();
036        private boolean useEndpointCache = true;
037        private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
038        private Endpoint<E> defaultEndpoint;
039    
040        public CamelTemplate(CamelContext context) {
041            this.context = context;
042        }
043    
044        public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) {
045            this(context);
046            this.defaultEndpoint = defaultEndpoint;
047        }
048    
049        /**
050         * Sends the exchange to the given endpoint
051         *
052         * @param endpointUri the endpoint URI to send the exchange to
053         * @param exchange    the exchange to send
054         */
055        public E send(String endpointUri, E exchange) {
056            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
057            return send(endpoint, exchange);
058        }
059    
060        /**
061         * Sends an exchange to an endpoint using a supplied
062         *
063         * @param endpointUri the endpoint URI to send the exchange to
064         * @param processor   the transformer used to populate the new exchange
065         * {@link Processor} to populate the exchange
066         */
067        public E send(String endpointUri, Processor processor) {
068            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
069            return send(endpoint, processor);
070        }
071    
072        /**
073         * Sends an exchange to an endpoint using a supplied
074         *
075         * @param endpointUri the endpoint URI to send the exchange to
076         * @param processor   the transformer used to populate the new exchange
077         * {@link Processor} to populate the exchange.  The callback
078         * will be called when the exchange is completed.
079         */
080        public E send(String endpointUri, Processor processor, AsyncCallback callback) {
081            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
082            return send(endpoint, processor, callback);
083        }
084    
085        /**
086         * Sends an exchange to an endpoint using a supplied
087         *
088         * @param endpointUri the endpoint URI to send the exchange to
089         * @param pattern     the message {@link ExchangePattern} such as
090         *                    {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
091         * @param processor   the transformer used to populate the new exchange
092         * {@link Processor} to populate the exchange
093         */
094        public E send(String endpointUri, ExchangePattern pattern, Processor processor) {
095            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
096            return send(endpoint, pattern, processor);
097        }
098    
099        /**
100         * Sends the exchange to the given endpoint
101         *
102         * @param endpoint the endpoint to send the exchange to
103         * @param exchange the exchange to send
104         */
105        public E send(Endpoint<E> endpoint, E exchange) {
106            //E convertedExchange = endpoint.createExchange(exchange);
107            E convertedExchange = exchange;
108            producerCache.send(endpoint, convertedExchange);
109            return convertedExchange;
110        }
111    
112        /**
113         * Sends an exchange to an endpoint using a supplied
114         *
115         * @param endpoint  the endpoint to send the exchange to
116         * @param processor the transformer used to populate the new exchange
117         * {@link Processor} to populate the exchange
118         */
119        public E send(Endpoint<E> endpoint, Processor processor) {
120            return producerCache.send(endpoint, processor);
121        }
122    
123        /**
124         * Sends an exchange to an endpoint using a supplied
125         *
126         * @param endpoint  the endpoint to send the exchange to
127         * @param processor the transformer used to populate the new exchange
128         * {@link Processor} to populate the exchange.  The callback
129         * will be called when the exchange is completed.
130         */
131        public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) {
132            return producerCache.send(endpoint, processor, callback);
133        }
134    
135        /**
136         * Sends an exchange to an endpoint using a supplied
137         *
138         * @param endpoint  the endpoint to send the exchange to
139         * @param pattern   the message {@link ExchangePattern} such as
140         *                  {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
141         * @param processor the transformer used to populate the new exchange
142         * {@link Processor} to populate the exchange
143         */
144        public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) {
145            return producerCache.send(endpoint, pattern, processor);
146        }
147    
148        /**
149         * Send the body to an endpoint with the given {@link ExchangePattern}
150         * returning any result output body
151         *
152         * @param endpoint
153         * @param body     = the payload
154         * @param pattern  the message {@link ExchangePattern} such as
155         *                 {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
156         * @return the result
157         */
158        public Object sendBody(Endpoint<E> endpoint, ExchangePattern pattern, Object body) {
159            E result = send(endpoint, pattern, createSetBodyProcessor(body));
160            return extractResultBody(result);
161        }
162    
163        /**
164         * Send the body to an endpoint returning any result output body
165         *
166         * @param endpoint
167         * @param body     = the payload
168         * @return the result
169         */
170        public Object sendBody(Endpoint<E> endpoint, Object body) {
171            E result = send(endpoint, createSetBodyProcessor(body));
172            return extractResultBody(result);
173        }
174    
175        /**
176         * Send the body to an endpoint
177         *
178         * @param endpointUri
179         * @param body        = the payload
180         * @return the result
181         */
182        public Object sendBody(String endpointUri, Object body) {
183            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
184            return sendBody(endpoint, body);
185        }
186    
187        /**
188         * Send the body to an endpoint
189         *
190         * @param endpointUri
191         * @param pattern     the message {@link ExchangePattern} such as
192         *                    {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
193         * @param body        = the payload
194         * @return the result
195         */
196        public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) {
197            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
198            return sendBody(endpoint, pattern, body);
199        }
200    
201        /**
202         * Sends the body to an endpoint with a specified header and header value
203         *
204         * @param endpointUri the endpoint URI to send to
205         * @param body        the payload send
206         * @param header      the header name
207         * @param headerValue the header value
208         * @return the result
209         */
210        public Object sendBodyAndHeader(String endpointUri, final Object body, final String header,
211                final Object headerValue) {
212            return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
213        }
214    
215        /**
216         * Sends the body to an endpoint with a specified header and header value
217         *
218         * @param endpoint    the Endpoint to send to
219         * @param body        the payload send
220         * @param header      the header name
221         * @param headerValue the header value
222         * @return the result
223         */
224        public Object sendBodyAndHeader(Endpoint endpoint, final Object body, final String header,
225                final Object headerValue) {
226            E result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
227            return extractResultBody(result);
228        }
229    
230        /**
231         * Sends the body to an endpoint with a specified header and header value
232         *
233         * @param endpoint    the Endpoint to send to
234         * @param pattern     the message {@link ExchangePattern} such as
235         *                    {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
236         * @param body        the payload send
237         * @param header      the header name
238         * @param headerValue the header value
239         * @return the result
240         */
241        public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body, final String header,
242                final Object headerValue) {
243            E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
244            return extractResultBody(result);
245        }
246    
247        /**
248         * Sends the body to an endpoint with a specified header and header value
249         *
250         * @param endpoint    the Endpoint URI to send to
251         * @param pattern     the message {@link ExchangePattern} such as
252         *                    {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
253         * @param body        the payload send
254         * @param header      the header name
255         * @param headerValue the header value
256         * @return the result
257         */
258        public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, final String header,
259                final Object headerValue) {
260            E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
261            return extractResultBody(result);
262        }
263    
264        /**
265         * Sends the body to an endpoint with the specified headers and header
266         * values
267         *
268         * @param endpointUri the endpoint URI to send to
269         * @param body        the payload send
270         * @return the result
271         */
272        public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
273            return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
274        }
275    
276        /**
277         * Sends the body to an endpoint with the specified headers and header
278         * values
279         *
280         * @param endpoint the endpoint URI to send to
281         * @param body     the payload send
282         * @return the result
283         */
284        public Object sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
285            E result = send(endpoint, new Processor() {
286                public void process(Exchange exchange) {
287                    Message in = exchange.getIn();
288                    for (Map.Entry<String, Object> header : headers.entrySet()) {
289                        in.setHeader(header.getKey(), header.getValue());
290                    }
291                    in.setBody(body);
292                }
293            });
294            return extractResultBody(result);
295        }
296    
297        // Methods using an InOut ExchangePattern
298        // -----------------------------------------------------------------------
299    
300        /**
301         * Send the body to an endpoint returning any result output body
302         *
303         * @param endpoint
304         * @param processor the processor which will populate the exchange before sending
305         * @return the result
306         */
307        public E request(Endpoint<E> endpoint, Processor processor) {
308            return send(endpoint, ExchangePattern.InOut, processor);
309        }
310    
311        /**
312         * Send the body to an endpoint returning any result output body
313         *
314         * @param endpoint
315         * @param body     = the payload
316         * @return the result
317         */
318        public Object requestBody(Endpoint<E> endpoint, Object body) {
319            return sendBody(endpoint, ExchangePattern.InOut, body);
320        }
321    
322        /**
323         * Send the body to an endpoint returning any result output body
324         *
325         * @param endpoint
326         * @param body        = the payload
327         * @param header
328         * @param headerValue
329         * @return the result
330         */
331        public Object requestBodyAndHeader(Endpoint<E> endpoint, Object body, String header, Object headerValue) {
332            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
333        }
334    
335        /**
336         * Send the body to an endpoint returning any result output body
337         *
338         * @param endpoint
339         * @param processor the processor which will populate the exchange before sending
340         * @return the result
341         */
342        public E request(String endpoint, Processor processor) {
343            return send(endpoint, ExchangePattern.InOut, processor);
344        }
345    
346        /**
347         * Send the body to an endpoint returning any result output body
348         *
349         * @param endpoint
350         * @param body     = the payload
351         * @return the result
352         */
353        public Object requestBody(String endpoint, Object body) {
354            return sendBody(endpoint, ExchangePattern.InOut, body);
355        }
356    
357        /**
358         * Send the body to an endpoint returning any result output body
359         *
360         * @param endpoint
361         * @param body        = the payload
362         * @param header
363         * @param headerValue
364         * @return the result
365         */
366        public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) {
367            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
368        }
369    
370        // Methods using the default endpoint
371        // -----------------------------------------------------------------------
372    
373        /**
374         * Sends the body to the default endpoint and returns the result content
375         *
376         * @param body the body to send
377         * @return the returned message body
378         */
379        public Object sendBody(Object body) {
380            return sendBody(getMandatoryDefaultEndpoint(), body);
381        }
382    
383        /**
384         * Sends the exchange to the default endpoint
385         *
386         * @param exchange the exchange to send
387         */
388        public E send(E exchange) {
389            return send(getMandatoryDefaultEndpoint(), exchange);
390        }
391    
392        /**
393         * Sends an exchange to the default endpoint using a supplied
394         *
395         * @param processor the transformer used to populate the new exchange
396         * {@link Processor} to populate the exchange
397         */
398        public E send(Processor processor) {
399            return send(getMandatoryDefaultEndpoint(), processor);
400        }
401    
402        public Object sendBodyAndHeader(Object body, String header, Object headerValue) {
403            return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
404        }
405    
406        public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) {
407            return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
408        }
409    
410        // Properties
411        // -----------------------------------------------------------------------
412        public Producer<E> getProducer(Endpoint<E> endpoint) {
413            return producerCache.getProducer(endpoint);
414        }
415    
416        public CamelContext getContext() {
417            return context;
418        }
419    
420        public Endpoint<E> getDefaultEndpoint() {
421            return defaultEndpoint;
422        }
423    
424        public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
425            this.defaultEndpoint = defaultEndpoint;
426        }
427    
428        /**
429         * Sets the default endpoint to use if none is specified
430         */
431        public void setDefaultEndpointUri(String endpointUri) {
432            setDefaultEndpoint(getContext().getEndpoint(endpointUri));
433        }
434    
435        public boolean isUseEndpointCache() {
436            return useEndpointCache;
437        }
438    
439        public void setUseEndpointCache(boolean useEndpointCache) {
440            this.useEndpointCache = useEndpointCache;
441        }
442    
443        public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
444            Endpoint<?> e = null;
445            synchronized (endpointCache) {
446                e = endpointCache.get(endpointUri);
447            }
448            if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
449                return expectedClass.asSubclass(expectedClass).cast(e);
450            }
451            return null;
452        }
453        
454        // Implementation methods
455        // -----------------------------------------------------------------------
456    
457        protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
458            return new Processor() {
459                public void process(Exchange exchange) {
460                    Message in = exchange.getIn();
461                    in.setHeader(header, headerValue);
462                    in.setBody(body);
463                }
464            };
465        }
466    
467        protected Processor createSetBodyProcessor(final Object body) {
468            return new Processor() {
469                public void process(Exchange exchange) {
470                    Message in = exchange.getIn();
471                    in.setBody(body);
472                }
473            };
474        }
475    
476        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
477            Endpoint endpoint = null;
478    
479            if (isUseEndpointCache()) {
480                synchronized (endpointCache) {
481                    endpoint = endpointCache.get(endpointUri);
482                    if (endpoint == null) {
483                        endpoint = context.getEndpoint(endpointUri);
484                        if (endpoint != null) {
485                            endpointCache.put(endpointUri, endpoint);
486                        }
487                    }
488                }
489            } else {
490                endpoint = context.getEndpoint(endpointUri);
491            }
492            if (endpoint == null) {
493                throw new NoSuchEndpointException(endpointUri);
494            }
495            return endpoint;
496        }
497    
498        protected Endpoint<E> getMandatoryDefaultEndpoint() {
499            Endpoint<E> answer = getDefaultEndpoint();
500            ObjectHelper.notNull(answer, "defaultEndpoint");
501            return answer;
502        }
503    
504        protected void doStart() throws Exception {
505            producerCache.start();
506        }
507    
508        protected void doStop() throws Exception {
509            producerCache.stop();
510        }
511    
512        protected Object extractResultBody(E result) {
513            Object answer = null;
514            if (result != null) {
515                Message out = result.getOut(false);
516                if (out != null) {
517                    answer = out.getBody();
518                } else {
519                    answer = result.getIn().getBody();
520                }
521            }
522            return answer;
523        }
524    }