001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.lang.reflect.Method;
020    
021    import javax.xml.bind.annotation.XmlTransient;
022    
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.CamelContextAware;
025    import org.apache.camel.Consume;
026    import org.apache.camel.Consumer;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.MessageDriven;
029    import org.apache.camel.PollingConsumer;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Producer;
032    import org.apache.camel.ProducerTemplate;
033    import org.apache.camel.Service;
034    import org.apache.camel.component.bean.BeanProcessor;
035    import org.apache.camel.component.bean.ProxyHelper;
036    import org.apache.camel.util.CamelContextHelper;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * A helper class for Camel based injector or post processing hooks which can be reused by
042     * both the <a href="http://activemq.apache.org/camel/spring.html">Spring</a>
043     * and <a href="http://activemq.apache.org/camel/guice.html">Guice</a> support.
044     *
045     * @version $Revision: 63996 $
046     */
047    public class CamelPostProcessorHelper implements CamelContextAware {
048        private static final transient Log LOG = LogFactory.getLog(CamelPostProcessorHelper.class);
049    
050        @XmlTransient
051        private CamelContext camelContext;
052    
053        public CamelPostProcessorHelper() {
054        }
055    
056        public CamelPostProcessorHelper(CamelContext camelContext) {
057            this.setCamelContext(camelContext);
058        }
059    
060        public CamelContext getCamelContext() {
061            return camelContext;
062        }
063    
064        public void setCamelContext(CamelContext camelContext) {
065            this.camelContext = camelContext;
066        }
067    
068        public void consumerInjection(Method method, Object bean) {
069            MessageDriven annotation = method.getAnnotation(MessageDriven.class);
070            if (annotation != null) {
071                LOG.info("Creating a consumer for: " + annotation);
072                subscribeMethod(method, bean, annotation.uri(), annotation.name());
073            }
074    
075            Consume consume = method.getAnnotation(Consume.class);
076            if (consume != null) {
077                LOG.info("Creating a consumer for: " + consume);
078                subscribeMethod(method, bean, consume.uri(), consume.ref());
079            }
080        }
081    
082        protected void subscribeMethod(Method method, Object bean, String endpointUri, String endpointName) {
083            // lets bind this method to a listener
084            String injectionPointName = method.getName();
085            Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true);
086            if (endpoint != null) {
087                try {
088                    Processor processor = createConsumerProcessor(bean, method, endpoint);
089                    LOG.info("Created processor: " + processor);
090                    Consumer consumer = endpoint.createConsumer(processor);
091                    startService(consumer);
092                } catch (Exception e) {
093                    LOG.warn(e);
094                    throw org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(e);
095                }
096            }
097        }
098    
099        public void startService(Service service) throws Exception {
100            CamelContext camelContext = getCamelContext();
101            if (camelContext instanceof DefaultCamelContext) {
102                DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
103                defaultCamelContext.addService(service);
104            } else {
105                service.start();
106            }
107        }
108    
109        /**
110         * Create a processor which invokes the given method when an incoming
111         * message exchange is received
112         */
113        protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
114            BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
115            answer.setMethodObject(method);
116            return answer;
117        }
118    
119        protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
120            return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
121        }
122    
123        /**
124         * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
125         */
126        public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName) {
127            if (type.isAssignableFrom(ProducerTemplate.class)) {
128                // endpoint is optional for this injection point
129                Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
130                return new DefaultProducerTemplate(getCamelContext(), endpoint);
131            } else {
132                Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true);
133                if (endpoint != null) {
134                    if (type.isInstance(endpoint)) {
135                        return endpoint;
136                    } else if (type.isAssignableFrom(Producer.class)) {
137                        return createInjectionProducer(endpoint);
138                    } else if (type.isAssignableFrom(PollingConsumer.class)) {
139                        return createInjectionPollingConsumer(endpoint);
140                    } else if (type.isInterface()) {
141                        // lets create a proxy
142                        try {
143                            return ProxyHelper.createProxy(endpoint, type);
144                        } catch (Exception e) {
145                            throw createProxyInstantiationRuntimeException(type, endpoint, e);
146                        }
147                    } else {
148                        throw new IllegalArgumentException("Invalid type: " + type.getName() + " which cannot be injected via @EndpointInject for " + endpoint);
149                    }
150                }
151                return null;
152            }
153        }
154    
155        protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
156            return new ProxyInstantiationException(type, endpoint, e);
157        }
158    
159        /**
160         * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected
161         * into a POJO
162         */
163        protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint) {
164            try {
165                PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
166                startService(pollingConsumer);
167                return pollingConsumer;
168            } catch (Exception e) {
169                throw org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(e);
170            }
171        }
172    
173        /**
174         * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into
175         * a POJO
176         */
177        protected Producer createInjectionProducer(Endpoint endpoint) {
178            try {
179                Producer producer = endpoint.createProducer();
180                startService(producer);
181                return producer;
182            } catch (Exception e) {
183                throw org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(e);
184            }
185        }
186    }