001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.Collection;
020    
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlElement;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.Expression;
031    import org.apache.camel.Predicate;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Route;
034    import org.apache.camel.builder.ExpressionClause;
035    import org.apache.camel.impl.RouteContext;
036    import org.apache.camel.model.language.ExpressionType;
037    import org.apache.camel.processor.Aggregator;
038    import org.apache.camel.processor.aggregate.AggregationCollection;
039    import org.apache.camel.processor.aggregate.AggregationStrategy;
040    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
041    
042    /**
043     * @version $Revision: 36321 $
044     */
045    @XmlRootElement(name = "aggregator")
046    @XmlAccessorType(XmlAccessType.FIELD)
047    public class AggregatorType extends ExpressionNode {
048        @XmlTransient
049        private AggregationStrategy aggregationStrategy;
050        @XmlTransient
051        private AggregationCollection aggregationCollection;
052        @XmlAttribute(required = false)
053        private Integer batchSize;
054        @XmlAttribute(required = false)
055        private Long batchTimeout;
056        @XmlAttribute(required = false)
057        private String strategyRef;
058        @XmlElement(name = "completedPredicate", required = false)
059        private CompletedPredicate completedPredicate;
060    
061        public AggregatorType() {
062        }
063    
064        public AggregatorType(Expression correlationExpression) {
065            super(correlationExpression);
066        }
067    
068        public AggregatorType(ExpressionType correlationExpression) {
069            super(correlationExpression);
070        }
071    
072        public AggregatorType(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
073            super(correlationExpression);
074            this.aggregationStrategy = aggregationStrategy;
075        }
076    
077        @Override
078        public String toString() {
079            return "Aggregator[ " + getExpression() + " -> " + getOutputs() + "]";
080        }
081    
082        @SuppressWarnings("unchecked")
083        @Override
084        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
085            Endpoint from = routeContext.getEndpoint();
086            final Processor processor = routeContext.createProcessor(this);
087    
088            final Aggregator service;
089            if (aggregationCollection != null) {
090                service = new Aggregator(from, processor, aggregationCollection);
091            } else {
092                AggregationStrategy strategy = getAggregationStrategy();
093                if (strategy == null && strategyRef != null) {
094                    strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
095                }
096                if (strategy == null) {
097                    strategy = new UseLatestAggregationStrategy();
098                }
099                Expression aggregateExpression = getExpression().createExpression(routeContext);
100    
101                Predicate predicate = null;
102                if (completedPredicate != null) {
103                    predicate = completedPredicate.createPredicate(routeContext);
104                }
105                if (predicate != null) {
106                    service = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
107                } else {
108                    service = new Aggregator(from, processor, aggregateExpression, strategy);
109                }
110            }
111    
112            if (batchSize != null) {
113                service.setBatchSize(batchSize);
114            }
115            if (batchTimeout != null) {
116                service.setBatchTimeout(batchTimeout);
117            }
118    
119            Route route = new Route<Exchange>(from, service) {
120                @Override
121                public String toString() {
122                    return "AggregatorRoute[" + getEndpoint() + " -> " + processor + "]";
123                }
124            };
125    
126            routes.add(route);
127        }
128    
129        public AggregationCollection getAggregationCollection() {
130            return aggregationCollection;
131        }
132    
133        public void setAggregationCollection(AggregationCollection aggregationCollection) {
134            this.aggregationCollection = aggregationCollection;
135        }
136    
137        public AggregationStrategy getAggregationStrategy() {
138            return aggregationStrategy;
139        }
140    
141        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
142            this.aggregationStrategy = aggregationStrategy;
143        }
144    
145        public Integer getBatchSize() {
146            return batchSize;
147        }
148    
149        public void setBatchSize(Integer batchSize) {
150            this.batchSize = batchSize;
151        }
152    
153        public Long getBatchTimeout() {
154            return batchTimeout;
155        }
156    
157        public void setBatchTimeout(Long batchTimeout) {
158            this.batchTimeout = batchTimeout;
159        }
160    
161        public String getStrategyRef() {
162            return strategyRef;
163        }
164    
165        public void setStrategyRef(String strategyRef) {
166            this.strategyRef = strategyRef;
167        }
168    
169        public CompletedPredicate getCompletePredicate() {
170            return completedPredicate;
171        }
172    
173        public void setCompletePredicate(CompletedPredicate completedPredicate) {
174            this.completedPredicate = completedPredicate;
175        }
176    
177        // Fluent API
178        //-------------------------------------------------------------------------
179        public AggregatorType batchSize(int batchSize) {
180            setBatchSize(batchSize);
181            return this;
182        }
183    
184        public AggregatorType batchTimeout(long batchTimeout) {
185            setBatchTimeout(batchTimeout);
186            return this;
187        }
188    
189        /**
190         * Sets the predicate used to determine if the aggregation is completed
191         *
192         * @return the clause used to create the predicate
193         */
194        public ExpressionClause<AggregatorType> completedPredicate() {
195            checkNoCompletedPredicate();
196            ExpressionClause<AggregatorType> clause = new ExpressionClause<AggregatorType>(this);
197            completedPredicate = new CompletedPredicate(clause);
198            return clause;
199        }
200    
201        /**
202         * Sets the predicate used to determine if the aggregation is completed
203         */
204        public AggregatorType completedPredicate(Predicate predicate) {
205            checkNoCompletedPredicate();
206            completedPredicate = new CompletedPredicate(predicate);
207            return this;
208        }
209    
210        protected void checkNoCompletedPredicate() {
211            if (completedPredicate != null) {
212                throw new IllegalArgumentException("There already is a completedPredicate defined for this aggregator: " + this);
213            }
214        }
215    }