001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.Collection; 020 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlElement; 025 import javax.xml.bind.annotation.XmlRootElement; 026 import javax.xml.bind.annotation.XmlTransient; 027 028 import org.apache.camel.Endpoint; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.Expression; 031 import org.apache.camel.Predicate; 032 import org.apache.camel.Processor; 033 import org.apache.camel.Route; 034 import org.apache.camel.builder.ExpressionClause; 035 import org.apache.camel.model.language.ExpressionType; 036 import org.apache.camel.processor.Aggregator; 037 import org.apache.camel.processor.aggregate.AggregationCollection; 038 import org.apache.camel.processor.aggregate.AggregationStrategy; 039 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 040 import org.apache.camel.spi.RouteContext; 041 042 /** 043 * Represents an XML <aggregator/> element 044 * 045 * @version $Revision: 41908 $ 046 */ 047 @XmlRootElement(name = "aggregator") 048 @XmlAccessorType(XmlAccessType.FIELD) 049 public class AggregatorType extends ExpressionNode { 050 @XmlTransient 051 private AggregationStrategy aggregationStrategy; 052 @XmlTransient 053 private AggregationCollection aggregationCollection; 054 @XmlAttribute(required = false) 055 private Integer batchSize; 056 @XmlAttribute(required = false) 057 private Long batchTimeout; 058 @XmlAttribute(required = false) 059 private String strategyRef; 060 @XmlElement(name = "completedPredicate", required = false) 061 private CompletedPredicate completedPredicate; 062 063 public AggregatorType() { 064 } 065 066 public AggregatorType(Expression correlationExpression) { 067 super(correlationExpression); 068 } 069 070 public AggregatorType(ExpressionType correlationExpression) { 071 super(correlationExpression); 072 } 073 074 public AggregatorType(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 075 super(correlationExpression); 076 this.aggregationStrategy = aggregationStrategy; 077 } 078 079 @Override 080 public String toString() { 081 return "Aggregator[ " + getExpression() + " -> " + getOutputs() + "]"; 082 } 083 084 @SuppressWarnings("unchecked") 085 @Override 086 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 087 Endpoint from = routeContext.getEndpoint(); 088 final Processor processor = routeContext.createProcessor(this); 089 090 final Aggregator service; 091 if (aggregationCollection != null) { 092 service = new Aggregator(from, processor, aggregationCollection); 093 } else { 094 AggregationStrategy strategy = getAggregationStrategy(); 095 if (strategy == null && strategyRef != null) { 096 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class); 097 } 098 if (strategy == null) { 099 strategy = new UseLatestAggregationStrategy(); 100 } 101 Expression aggregateExpression = getExpression().createExpression(routeContext); 102 103 Predicate predicate = null; 104 if (completedPredicate != null) { 105 predicate = completedPredicate.createPredicate(routeContext); 106 } 107 if (predicate != null) { 108 service = new Aggregator(from, processor, aggregateExpression, strategy, predicate); 109 } else { 110 service = new Aggregator(from, processor, aggregateExpression, strategy); 111 } 112 } 113 114 if (batchSize != null) { 115 service.setBatchSize(batchSize); 116 } 117 if (batchTimeout != null) { 118 service.setBatchTimeout(batchTimeout); 119 } 120 121 Route route = new Route<Exchange>(from, service) { 122 @Override 123 public String toString() { 124 return "AggregatorRoute[" + getEndpoint() + " -> " + processor + "]"; 125 } 126 }; 127 128 routes.add(route); 129 } 130 131 public AggregationCollection getAggregationCollection() { 132 return aggregationCollection; 133 } 134 135 public void setAggregationCollection(AggregationCollection aggregationCollection) { 136 this.aggregationCollection = aggregationCollection; 137 } 138 139 public AggregationStrategy getAggregationStrategy() { 140 return aggregationStrategy; 141 } 142 143 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 144 this.aggregationStrategy = aggregationStrategy; 145 } 146 147 public Integer getBatchSize() { 148 return batchSize; 149 } 150 151 public void setBatchSize(Integer batchSize) { 152 this.batchSize = batchSize; 153 } 154 155 public Long getBatchTimeout() { 156 return batchTimeout; 157 } 158 159 public void setBatchTimeout(Long batchTimeout) { 160 this.batchTimeout = batchTimeout; 161 } 162 163 public String getStrategyRef() { 164 return strategyRef; 165 } 166 167 public void setStrategyRef(String strategyRef) { 168 this.strategyRef = strategyRef; 169 } 170 171 public CompletedPredicate getCompletePredicate() { 172 return completedPredicate; 173 } 174 175 public void setCompletePredicate(CompletedPredicate completedPredicate) { 176 this.completedPredicate = completedPredicate; 177 } 178 179 // Fluent API 180 //------------------------------------------------------------------------- 181 public AggregatorType batchSize(int batchSize) { 182 setBatchSize(batchSize); 183 return this; 184 } 185 186 public AggregatorType batchTimeout(long batchTimeout) { 187 setBatchTimeout(batchTimeout); 188 return this; 189 } 190 191 /** 192 * Sets the predicate used to determine if the aggregation is completed 193 * 194 * @return the clause used to create the predicate 195 */ 196 public ExpressionClause<AggregatorType> completedPredicate() { 197 checkNoCompletedPredicate(); 198 ExpressionClause<AggregatorType> clause = new ExpressionClause<AggregatorType>(this); 199 completedPredicate = new CompletedPredicate(clause); 200 return clause; 201 } 202 203 /** 204 * Sets the predicate used to determine if the aggregation is completed 205 */ 206 public AggregatorType completedPredicate(Predicate predicate) { 207 checkNoCompletedPredicate(); 208 completedPredicate = new CompletedPredicate(predicate); 209 return this; 210 } 211 212 protected void checkNoCompletedPredicate() { 213 if (completedPredicate != null) { 214 throw new IllegalArgumentException("There already is a completedPredicate defined for this aggregator: " + this); 215 } 216 } 217 }