001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    
020    import javax.xml.bind.annotation.XmlAccessType;
021    import javax.xml.bind.annotation.XmlAccessorType;
022    import javax.xml.bind.annotation.XmlAttribute;
023    import javax.xml.bind.annotation.XmlElement;
024    import javax.xml.bind.annotation.XmlRootElement;
025    import javax.xml.bind.annotation.XmlTransient;
026    
027    import org.apache.camel.Expression;
028    import org.apache.camel.Predicate;
029    import org.apache.camel.Processor;
030    import org.apache.camel.builder.ExpressionClause;
031    import org.apache.camel.model.language.ExpressionType;
032    import org.apache.camel.processor.Aggregator;
033    import org.apache.camel.processor.aggregate.AggregationCollection;
034    import org.apache.camel.processor.aggregate.AggregationStrategy;
035    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
036    import org.apache.camel.spi.RouteContext;
037    
038    /**
039     * Represents an XML <aggregator/> element
040     *
041     * @version $Revision: 64127 $
042     */
043    @XmlRootElement(name = "aggregator")
044    @XmlAccessorType(XmlAccessType.FIELD)
045    public class AggregatorType extends ExpressionNode {
046        @XmlTransient
047        private AggregationStrategy aggregationStrategy;
048        @XmlTransient
049        private AggregationCollection aggregationCollection;
050        @XmlAttribute(required = false)
051        private Integer batchSize;
052        @XmlAttribute(required = false)
053        private Integer outBatchSize;
054        @XmlAttribute(required = false)
055        private Long batchTimeout;
056        @XmlAttribute(required = false)
057        private String strategyRef;
058        @XmlAttribute(required = false)
059        private String collectionRef;    
060        @XmlElement(name = "completedPredicate", required = false)
061        private ExpressionSubElementType completedPredicate;
062    
063        public AggregatorType() {
064        }
065    
066        public AggregatorType(Expression correlationExpression) {
067            super(correlationExpression);
068        }
069    
070        public AggregatorType(ExpressionType correlationExpression) {
071            super(correlationExpression);
072        }
073    
074        public AggregatorType(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
075            super(correlationExpression);
076            this.aggregationStrategy = aggregationStrategy;
077        }
078    
079        @Override
080        public String toString() {
081            return "Aggregator[" + getExpression() + " -> " + getOutputs() + "]";
082        }
083    
084        @Override
085        public String getShortName() {
086            return "aggregator";
087        }
088        @Override
089        public Processor createProcessor(RouteContext routeContext) throws Exception {
090            return createAggregator(routeContext);
091        }
092    
093        protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
094            final Processor processor = routeContext.createProcessor(this);
095    
096            final Aggregator aggregator;
097            if (getAggregationCollection() == null) {
098                setAggregationCollection(createAggregationCollection(routeContext));
099            }
100            
101            if (aggregationCollection != null) {
102                // create the aggregator using the collection
103                // pre configure the collection if its expression and strategy is not set, then
104                // use the ones that is pre configured with this type
105                if (aggregationCollection.getCorrelationExpression() == null) {
106                    aggregationCollection.setCorrelationExpression(getExpression());
107                }
108                if (aggregationCollection.getAggregationStrategy() == null) {
109                    AggregationStrategy strategy = createAggregationStrategy(routeContext);
110                    aggregationCollection.setAggregationStrategy(strategy);
111                }
112                aggregator = new Aggregator(processor, aggregationCollection);
113            } else {
114                // create the aggregator using a default collection
115                AggregationStrategy strategy = createAggregationStrategy(routeContext);
116    
117                Expression aggregateExpression = getExpression().createExpression(routeContext);
118    
119                Predicate predicate = null;
120                if (getCompletedPredicate() != null) {
121                    predicate = getCompletedPredicate().createPredicate(routeContext);
122                }
123                if (predicate != null) {
124                    aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate);
125                } else {
126                    aggregator = new Aggregator(processor, aggregateExpression, strategy);
127                }
128            }
129            
130            if (batchSize != null) {
131                aggregator.setBatchSize(batchSize);
132            }
133            
134            if (batchTimeout != null) {
135                aggregator.setBatchTimeout(batchTimeout);
136            }
137    
138            if (outBatchSize != null) {
139                aggregator.setOutBatchSize(outBatchSize);
140            }
141            
142            return aggregator;
143        }
144    
145        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
146            AggregationStrategy strategy = getAggregationStrategy();
147            if (strategy == null && strategyRef != null) {
148                strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
149            }
150            if (strategy == null) {
151                // fallback to use latest
152                strategy = new UseLatestAggregationStrategy();
153            }
154            return strategy;
155        }
156    
157        private AggregationCollection createAggregationCollection(RouteContext routeContext) {
158            AggregationCollection collection = getAggregationCollection();
159            if (collection == null && collectionRef != null) {
160                collection = routeContext.lookup(collectionRef, AggregationCollection.class);
161            }
162            return collection;
163        }    
164        
165        public AggregationCollection getAggregationCollection() {
166            return aggregationCollection;
167        }
168    
169        public void setAggregationCollection(AggregationCollection aggregationCollection) {
170            this.aggregationCollection = aggregationCollection;
171        }
172    
173        public AggregationStrategy getAggregationStrategy() {
174            return aggregationStrategy;
175        }
176    
177        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
178            this.aggregationStrategy = aggregationStrategy;
179        }
180    
181        public Integer getBatchSize() {
182            return batchSize;
183        }
184    
185        public void setBatchSize(Integer batchSize) {
186            this.batchSize = batchSize;
187        }
188    
189        public Integer getOutBatchSize() {
190            return outBatchSize;
191        }
192    
193        public void setOutBatchSize(Integer outBatchSize) {
194            this.outBatchSize = outBatchSize;
195        }
196    
197        public Long getBatchTimeout() {
198            return batchTimeout;
199        }
200    
201        public void setBatchTimeout(Long batchTimeout) {
202            this.batchTimeout = batchTimeout;
203        }
204    
205        public String getStrategyRef() {
206            return strategyRef;
207        }
208    
209        public void setStrategyRef(String strategyRef) {
210            this.strategyRef = strategyRef;
211        }
212    
213        public void setCompletedPredicate(ExpressionSubElementType completedPredicate) {
214            this.completedPredicate = completedPredicate;
215        }
216    
217        public ExpressionSubElementType getCompletedPredicate() {
218            return completedPredicate;
219        }
220    
221        // Fluent API
222        //-------------------------------------------------------------------------
223        public AggregatorType batchSize(int batchSize) {
224            setBatchSize(batchSize);
225            return this;
226        }
227    
228        public AggregatorType outBatchSize(int batchSize) {
229            setOutBatchSize(batchSize);
230            return this;
231        }
232    
233        public AggregatorType batchTimeout(long batchTimeout) {
234            setBatchTimeout(batchTimeout);
235            return this;
236        }
237    
238        public AggregatorType aggregationCollection(AggregationCollection aggregationCollection) {
239            setAggregationCollection(aggregationCollection);
240            return this;
241        }
242    
243        public AggregatorType aggregationStrategy(AggregationStrategy aggregationStrategy) {
244            setAggregationStrategy(aggregationStrategy);
245            return this;
246        }
247    
248        public AggregatorType strategyRef(String strategyRef) {
249            setStrategyRef(strategyRef);
250            return this;
251        }
252    
253        /**
254         * Sets the predicate used to determine if the aggregation is completed
255         *
256         * @return the clause used to create the predicate
257         */
258        public ExpressionClause<AggregatorType> completedPredicate() {
259            checkNoCompletedPredicate();
260            ExpressionClause<AggregatorType> clause = new ExpressionClause<AggregatorType>(this);
261            setCompletedPredicate(new ExpressionSubElementType((Expression)clause));
262            return clause;
263        }
264    
265        /**
266         * Sets the predicate used to determine if the aggregation is completed
267         */
268        public AggregatorType completedPredicate(Predicate predicate) {
269            checkNoCompletedPredicate();
270            setCompletedPredicate(new ExpressionSubElementType(predicate));
271            return this;
272        }
273    
274        protected void checkNoCompletedPredicate() {
275            if (getCompletedPredicate() != null) {
276                throw new IllegalArgumentException("There is already a completedPredicate defined for this aggregator: " + this);
277            }
278        }
279    }