001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.Collection;
020    
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlElement;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.Expression;
031    import org.apache.camel.Predicate;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Route;
034    import org.apache.camel.builder.ExpressionClause;
035    import org.apache.camel.model.language.ExpressionType;
036    import org.apache.camel.processor.Aggregator;
037    import org.apache.camel.processor.aggregate.AggregationCollection;
038    import org.apache.camel.processor.aggregate.AggregationStrategy;
039    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
040    import org.apache.camel.spi.RouteContext;
041    
042    /**
043     * Represents an XML <aggregator/> element
044     *
045     * @version $Revision: 41908 $
046     */
047    @XmlRootElement(name = "aggregator")
048    @XmlAccessorType(XmlAccessType.FIELD)
049    public class AggregatorType extends ExpressionNode {
050        @XmlTransient
051        private AggregationStrategy aggregationStrategy;
052        @XmlTransient
053        private AggregationCollection aggregationCollection;
054        @XmlAttribute(required = false)
055        private Integer batchSize;
056        @XmlAttribute(required = false)
057        private Long batchTimeout;
058        @XmlAttribute(required = false)
059        private String strategyRef;
060        @XmlElement(name = "completedPredicate", required = false)
061        private CompletedPredicate completedPredicate;
062    
063        public AggregatorType() {
064        }
065    
066        public AggregatorType(Expression correlationExpression) {
067            super(correlationExpression);
068        }
069    
070        public AggregatorType(ExpressionType correlationExpression) {
071            super(correlationExpression);
072        }
073    
074        public AggregatorType(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
075            super(correlationExpression);
076            this.aggregationStrategy = aggregationStrategy;
077        }
078    
079        @Override
080        public String toString() {
081            return "Aggregator[ " + getExpression() + " -> " + getOutputs() + "]";
082        }
083    
084        @SuppressWarnings("unchecked")
085        @Override
086        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
087            Endpoint from = routeContext.getEndpoint();
088            final Processor processor = routeContext.createProcessor(this);
089    
090            final Aggregator service;
091            if (aggregationCollection != null) {
092                service = new Aggregator(from, processor, aggregationCollection);
093            } else {
094                AggregationStrategy strategy = getAggregationStrategy();
095                if (strategy == null && strategyRef != null) {
096                    strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
097                }
098                if (strategy == null) {
099                    strategy = new UseLatestAggregationStrategy();
100                }
101                Expression aggregateExpression = getExpression().createExpression(routeContext);
102    
103                Predicate predicate = null;
104                if (completedPredicate != null) {
105                    predicate = completedPredicate.createPredicate(routeContext);
106                }
107                if (predicate != null) {
108                    service = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
109                } else {
110                    service = new Aggregator(from, processor, aggregateExpression, strategy);
111                }
112            }
113    
114            if (batchSize != null) {
115                service.setBatchSize(batchSize);
116            }
117            if (batchTimeout != null) {
118                service.setBatchTimeout(batchTimeout);
119            }
120    
121            Route route = new Route<Exchange>(from, service) {
122                @Override
123                public String toString() {
124                    return "AggregatorRoute[" + getEndpoint() + " -> " + processor + "]";
125                }
126            };
127    
128            routes.add(route);
129        }
130    
131        public AggregationCollection getAggregationCollection() {
132            return aggregationCollection;
133        }
134    
135        public void setAggregationCollection(AggregationCollection aggregationCollection) {
136            this.aggregationCollection = aggregationCollection;
137        }
138    
139        public AggregationStrategy getAggregationStrategy() {
140            return aggregationStrategy;
141        }
142    
143        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
144            this.aggregationStrategy = aggregationStrategy;
145        }
146    
147        public Integer getBatchSize() {
148            return batchSize;
149        }
150    
151        public void setBatchSize(Integer batchSize) {
152            this.batchSize = batchSize;
153        }
154    
155        public Long getBatchTimeout() {
156            return batchTimeout;
157        }
158    
159        public void setBatchTimeout(Long batchTimeout) {
160            this.batchTimeout = batchTimeout;
161        }
162    
163        public String getStrategyRef() {
164            return strategyRef;
165        }
166    
167        public void setStrategyRef(String strategyRef) {
168            this.strategyRef = strategyRef;
169        }
170    
171        public CompletedPredicate getCompletePredicate() {
172            return completedPredicate;
173        }
174    
175        public void setCompletePredicate(CompletedPredicate completedPredicate) {
176            this.completedPredicate = completedPredicate;
177        }
178    
179        // Fluent API
180        //-------------------------------------------------------------------------
181        public AggregatorType batchSize(int batchSize) {
182            setBatchSize(batchSize);
183            return this;
184        }
185    
186        public AggregatorType batchTimeout(long batchTimeout) {
187            setBatchTimeout(batchTimeout);
188            return this;
189        }
190    
191        /**
192         * Sets the predicate used to determine if the aggregation is completed
193         *
194         * @return the clause used to create the predicate
195         */
196        public ExpressionClause<AggregatorType> completedPredicate() {
197            checkNoCompletedPredicate();
198            ExpressionClause<AggregatorType> clause = new ExpressionClause<AggregatorType>(this);
199            completedPredicate = new CompletedPredicate(clause);
200            return clause;
201        }
202    
203        /**
204         * Sets the predicate used to determine if the aggregation is completed
205         */
206        public AggregatorType completedPredicate(Predicate predicate) {
207            checkNoCompletedPredicate();
208            completedPredicate = new CompletedPredicate(predicate);
209            return this;
210        }
211    
212        protected void checkNoCompletedPredicate() {
213            if (completedPredicate != null) {
214                throw new IllegalArgumentException("There already is a completedPredicate defined for this aggregator: " + this);
215            }
216        }
217    }