001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl.converter;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.AsyncProcessor;
021    import org.apache.camel.Exchange;
022    import org.apache.camel.Processor;
023    import org.apache.camel.TypeConverter;
024    import org.apache.camel.processor.DelegateProcessor;
025    
026    /**
027     * A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}.
028     * Processing will still occur synchronously but it will provide the required
029     * notifications that the caller expects.
030     *
031     * @version $Revision: 1092 $
032     */
033    public class AsyncProcessorTypeConverter implements TypeConverter {
034    
035        private static final class ProcessorToAsyncProcessorBridge extends DelegateProcessor implements AsyncProcessor {
036    
037            private ProcessorToAsyncProcessorBridge(Processor processor) {
038                super(processor);
039            }
040    
041            public boolean process(Exchange exchange, AsyncCallback callback) {
042                try {
043                    processor.process(exchange);
044                } catch (Throwable e) {
045                    exchange.setException(e);
046                }
047                // false means processing of the exchange asynchronously,
048                callback.done(true);
049                return true;
050            }
051        }
052    
053        public <T> T convertTo(Class<T> toType, Object value) {
054            if (value != null) {
055                if (toType.equals(AsyncProcessor.class)) {
056                    if (value instanceof AsyncProcessor) {
057                        return toType.cast(value);
058                    } else if (value instanceof Processor) {
059                        // Provide an async bridge to the regular processor.
060                        final Processor processor = (Processor)value;
061                        return toType.cast(new ProcessorToAsyncProcessorBridge(processor));
062                    }
063                }
064            }
065            return null;
066        }
067    
068        public <T> T convertTo(Class<T> toType, Exchange exchange, Object value) {
069            return convertTo(toType, value);
070        }
071    
072        public static AsyncProcessor convert(Processor value) {
073            if (value instanceof AsyncProcessor) {
074                return (AsyncProcessor)value;
075            }
076            return new ProcessorToAsyncProcessorBridge(value);
077        }
078    }