001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.List; 022 023 import javax.xml.bind.annotation.XmlElement; 024 import javax.xml.bind.annotation.XmlElementRef; 025 import javax.xml.bind.annotation.XmlRootElement; 026 import javax.xml.bind.annotation.XmlTransient; 027 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Route; 032 import org.apache.camel.model.config.BatchResequencerConfig; 033 import org.apache.camel.model.config.StreamResequencerConfig; 034 import org.apache.camel.model.language.ExpressionType; 035 import org.apache.camel.processor.Resequencer; 036 import org.apache.camel.processor.StreamResequencer; 037 import org.apache.camel.spi.RouteContext; 038 039 /** 040 * Represents an XML <resequencer/> element 041 * 042 * @version $Revision: 41908 $ 043 */ 044 @XmlRootElement(name = "resequencer") 045 public class ResequencerType extends ProcessorType<ProcessorType> { 046 @XmlElementRef 047 private List<ExpressionType> expressions = new ArrayList<ExpressionType>(); 048 @XmlElementRef 049 private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>(); 050 // Binding annotation at setter 051 private BatchResequencerConfig batchConfig; 052 // Binding annotation at setter 053 private StreamResequencerConfig streamConfig; 054 @XmlTransient 055 private List<Expression> expressionList; 056 057 public ResequencerType() { 058 this(null); 059 } 060 061 public ResequencerType(List<Expression> expressions) { 062 this.expressionList = expressions; 063 this.batch(); 064 } 065 066 /** 067 * Configures the stream-based resequencing algorithm using the default 068 * configuration. 069 * 070 * @return <code>this</code> instance. 071 */ 072 public ResequencerType stream() { 073 return stream(StreamResequencerConfig.getDefault()); 074 } 075 076 /** 077 * Configures the batch-based resequencing algorithm using the default 078 * configuration. 079 * 080 * @return <code>this</code> instance. 081 */ 082 public ResequencerType batch() { 083 return batch(BatchResequencerConfig.getDefault()); 084 } 085 086 /** 087 * Configures the stream-based resequencing algorithm using the given 088 * {@link StreamResequencerConfig}. 089 * 090 * @return <code>this</code> instance. 091 */ 092 public ResequencerType stream(StreamResequencerConfig config) { 093 this.streamConfig = config; 094 this.batchConfig = null; 095 return this; 096 } 097 098 /** 099 * Configures the batch-based resequencing algorithm using the given 100 * {@link BatchResequencerConfig}. 101 * 102 * @return <code>this</code> instance. 103 */ 104 public ResequencerType batch(BatchResequencerConfig config) { 105 this.batchConfig = config; 106 this.streamConfig = null; 107 return this; 108 } 109 110 public ResequencerType expression(ExpressionType expression) { 111 expressions.add(expression); 112 return this; 113 } 114 115 @Override 116 public String toString() { 117 return "Resequencer[ " + getExpressions() + " -> " + getOutputs() + "]"; 118 } 119 120 @Override 121 public String getLabel() { 122 return ExpressionType.getLabel(getExpressions()); 123 } 124 125 public List<ExpressionType> getExpressions() { 126 return expressions; 127 } 128 129 public List<ProcessorType<?>> getOutputs() { 130 return outputs; 131 } 132 133 public void setOutputs(List<ProcessorType<?>> outputs) { 134 this.outputs = outputs; 135 } 136 137 public BatchResequencerConfig getBatchConfig() { 138 return batchConfig; 139 } 140 141 public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) { 142 return batchConfig; 143 } 144 145 public StreamResequencerConfig getStreamConfig() { 146 return streamConfig; 147 } 148 149 @XmlElement(name = "batch-config", required = false) 150 public void setBatchConfig(BatchResequencerConfig batchConfig) { 151 // TODO: find out how to have these two within an <xsd:choice> 152 batch(batchConfig); 153 } 154 155 @XmlElement(name = "stream-config", required = false) 156 public void setStreamConfig(StreamResequencerConfig streamConfig) { 157 // TODO: find out how to have these two within an <xsd:choice> 158 stream(streamConfig); 159 } 160 161 @Override 162 public Processor createProcessor(RouteContext routeContext) throws Exception { 163 return createStreamResequencer(routeContext, streamConfig); 164 } 165 166 @Override 167 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 168 if (batchConfig != null) { 169 routes.add(createBatchResequencerRoute(routeContext)); 170 } else { 171 // StreamResequencer created via createProcessor method 172 super.addRoutes(routeContext, routes); 173 } 174 } 175 176 private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception { 177 final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig); 178 return new Route(routeContext.getEndpoint(), resequencer) { 179 @Override 180 public String toString() { 181 return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]"; 182 } 183 }; 184 } 185 186 protected Resequencer createBatchResequencer(RouteContext routeContext, 187 BatchResequencerConfig config) throws Exception { 188 Processor processor = routeContext.createProcessor(this); 189 Resequencer resequencer = new Resequencer(routeContext.getEndpoint(), 190 processor, resolveExpressionList(routeContext)); 191 resequencer.setBatchSize(config.getBatchSize()); 192 resequencer.setBatchTimeout(config.getBatchTimeout()); 193 return resequencer; 194 } 195 196 protected StreamResequencer createStreamResequencer(RouteContext routeContext, 197 StreamResequencerConfig config) throws Exception { 198 config.getComparator().setExpressions(resolveExpressionList(routeContext)); 199 Processor processor = routeContext.createProcessor(this); 200 StreamResequencer resequencer = new StreamResequencer(processor, 201 config.getComparator(), config.getCapacity()); 202 resequencer.setTimeout(config.getTimeout()); 203 return resequencer; 204 205 } 206 207 private List<Expression> resolveExpressionList(RouteContext routeContext) { 208 if (expressionList == null) { 209 expressionList = new ArrayList<Expression>(); 210 for (ExpressionType expression : expressions) { 211 expressionList.add(expression.createExpression(routeContext)); 212 } 213 } 214 if (expressionList.isEmpty()) { 215 throw new IllegalArgumentException("No expressions configured for: " + this); 216 } 217 return expressionList; 218 } 219 }