001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.concurrent.ThreadPoolExecutor;
026    
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Expression;
029    import org.apache.camel.Message;
030    import org.apache.camel.Processor;
031    import org.apache.camel.processor.aggregate.AggregationStrategy;
032    import org.apache.camel.util.CollectionHelper;
033    import org.apache.camel.util.ObjectHelper;
034    
035    import static org.apache.camel.util.ObjectHelper.notNull;
036    
037    /**
038     * Implements a dynamic <a
039     * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> pattern
040     * where an expression is evaluated to iterate through each of the parts of a
041     * message and then each part is then send to some endpoint.
042     *
043     * @version $Revision: 1235 $
044     */
045    public class Splitter extends MulticastProcessor implements Processor {
046        public static final String SPLIT_SIZE = "org.apache.camel.splitSize";
047        public static final String SPLIT_COUNTER = "org.apache.camel.splitCounter";
048    
049        private final Expression expression;
050    
051        public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
052            this(expression, destination, aggregationStrategy, false, null, false);
053        }
054    
055        public Splitter(Expression expression, Processor destination,
056                AggregationStrategy aggregationStrategy,
057                boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor, boolean streaming) {
058            super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, threadPoolExecutor, streaming);
059    
060            this.expression = expression;
061            notNull(expression, "expression");
062            notNull(destination, "destination");
063        }
064    
065        @Override
066        public String toString() {
067            return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]";
068        }
069    
070        @Override
071        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
072            Object value = expression.evaluate(exchange);
073    
074            if (isStreaming()) {
075                return createProcessorExchangePairsIterable(exchange, value);
076            } else {
077                return createProcessorExchangePairsList(exchange, value);
078            }
079        }
080    
081        private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, Object value) {
082            final Iterator iterator = ObjectHelper.createIterator(value);
083            return new Iterable() {
084    
085                public Iterator iterator() {
086                    return new Iterator() {
087    
088                        public boolean hasNext() {
089                            return iterator.hasNext();
090                        }
091    
092                        public Object next() {
093                            Object part = iterator.next();
094                            Exchange newExchange = exchange.copy();
095                            Message in = newExchange.getIn();
096                            in.setBody(part);
097                            return new ProcessorExchangePair(getProcessors().iterator().next(), newExchange);
098                        }
099    
100                        public void remove() {
101                            throw new UnsupportedOperationException("remove is not supported by this iterator");
102                        }
103                    };
104                }
105    
106            };
107        }
108    
109        private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
110            List<ProcessorExchangePair> result;
111            Integer collectionSize = CollectionHelper.size(value);
112            if (collectionSize != null) {
113                result = new ArrayList<ProcessorExchangePair>(collectionSize);
114            } else {
115                result = new ArrayList<ProcessorExchangePair>();
116            }
117            Iterator<Object> iter = ObjectHelper.createIterator(value);
118            while (iter.hasNext()) {
119                Object part = iter.next();
120                Exchange newExchange = exchange.copy();
121                Message in = newExchange.getIn();
122                in.setBody(part);
123                result.add(new ProcessorExchangePair(getProcessors().iterator().next(), newExchange));
124            }
125            return result;
126        }
127    
128        @Override
129        protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> allPairs) {
130            exchange.getIn().setHeader(SPLIT_COUNTER, i);
131            if (allPairs instanceof Collection) {
132                exchange.getIn().setHeader(SPLIT_SIZE, ((Collection) allPairs).size());
133            }
134        }
135    }