001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.List; 022 023 import javax.xml.bind.annotation.XmlElement; 024 import javax.xml.bind.annotation.XmlElementRef; 025 import javax.xml.bind.annotation.XmlRootElement; 026 import javax.xml.bind.annotation.XmlTransient; 027 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.model.config.BatchResequencerConfig; 032 import org.apache.camel.model.config.StreamResequencerConfig; 033 import org.apache.camel.model.language.ExpressionType; 034 import org.apache.camel.processor.Resequencer; 035 import org.apache.camel.processor.StreamResequencer; 036 import org.apache.camel.processor.resequencer.ExpressionResultComparator; 037 import org.apache.camel.spi.RouteContext; 038 039 /** 040 * Represents an XML <resequencer/> element 041 * 042 * @version $Revision: 64127 $ 043 */ 044 @XmlRootElement(name = "resequencer") 045 public class ResequencerType extends ProcessorType<ProcessorType> { 046 @XmlElementRef 047 private List<ExpressionType> expressions = new ArrayList<ExpressionType>(); 048 @XmlElementRef 049 private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>(); 050 // Binding annotation at setter 051 private BatchResequencerConfig batchConfig; 052 // Binding annotation at setter 053 private StreamResequencerConfig streamConfig; 054 @XmlTransient 055 private List<Expression> expressionList; 056 057 public ResequencerType() { 058 this(null); 059 } 060 061 public ResequencerType(List<Expression> expressions) { 062 this.expressionList = expressions; 063 this.batch(); 064 } 065 066 @Override 067 public String getShortName() { 068 return "resequencer"; 069 } 070 071 /** 072 * Configures the stream-based resequencing algorithm using the default 073 * configuration. 074 * 075 * @return <code>this</code> instance. 076 */ 077 public ResequencerType stream() { 078 return stream(StreamResequencerConfig.getDefault()); 079 } 080 081 /** 082 * Configures the batch-based resequencing algorithm using the default 083 * configuration. 084 * 085 * @return <code>this</code> instance. 086 */ 087 public ResequencerType batch() { 088 return batch(BatchResequencerConfig.getDefault()); 089 } 090 091 /** 092 * Configures the stream-based resequencing algorithm using the given 093 * {@link StreamResequencerConfig}. 094 * 095 * @return <code>this</code> instance. 096 */ 097 public ResequencerType stream(StreamResequencerConfig config) { 098 this.streamConfig = config; 099 this.batchConfig = null; 100 return this; 101 } 102 103 /** 104 * Configures the batch-based resequencing algorithm using the given 105 * {@link BatchResequencerConfig}. 106 * 107 * @return <code>this</code> instance. 108 */ 109 public ResequencerType batch(BatchResequencerConfig config) { 110 this.batchConfig = config; 111 this.streamConfig = null; 112 return this; 113 } 114 115 public ResequencerType expression(ExpressionType expression) { 116 expressions.add(expression); 117 return this; 118 } 119 120 @Override 121 public String toString() { 122 return "Resequencer[" + getExpressions() + " -> " + getOutputs() + "]"; 123 } 124 125 @Override 126 public String getLabel() { 127 return ExpressionType.getLabel(getExpressions()); 128 } 129 130 public List<ExpressionType> getExpressions() { 131 return expressions; 132 } 133 134 public List<ProcessorType<?>> getOutputs() { 135 return outputs; 136 } 137 138 public void setOutputs(List<ProcessorType<?>> outputs) { 139 this.outputs = outputs; 140 } 141 142 public BatchResequencerConfig getBatchConfig() { 143 return batchConfig; 144 } 145 146 public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) { 147 return batchConfig; 148 } 149 150 public StreamResequencerConfig getStreamConfig() { 151 return streamConfig; 152 } 153 154 @XmlElement(name = "batch-config", required = false) 155 public void setBatchConfig(BatchResequencerConfig batchConfig) { 156 // TODO: find out how to have these two within an <xsd:choice> 157 batch(batchConfig); 158 } 159 160 @XmlElement(name = "stream-config", required = false) 161 public void setStreamConfig(StreamResequencerConfig streamConfig) { 162 // TODO: find out how to have these two within an <xsd:choice> 163 stream(streamConfig); 164 } 165 166 public ResequencerType timeout(long timeout) { 167 if (batchConfig != null) { 168 batchConfig.setBatchTimeout(timeout); 169 } else { 170 streamConfig.setTimeout(timeout); 171 } 172 return this; 173 } 174 175 public ResequencerType size(int batchSize) { 176 if (batchConfig == null) { 177 throw new IllegalStateException("size() only supported for batch resequencer"); 178 } 179 batchConfig.setBatchSize(batchSize); 180 return this; 181 } 182 183 public ResequencerType capacity(int capacity) { 184 if (streamConfig == null) { 185 throw new IllegalStateException("capacity() only supported for stream resequencer"); 186 } 187 streamConfig.setCapacity(capacity); 188 return this; 189 190 } 191 192 public ResequencerType comparator(ExpressionResultComparator<Exchange> comparator) { 193 if (streamConfig == null) { 194 throw new IllegalStateException("comparator() only supported for stream resequencer"); 195 } 196 streamConfig.setComparator(comparator); 197 return this; 198 199 } 200 201 @Override 202 public Processor createProcessor(RouteContext routeContext) throws Exception { 203 if (batchConfig != null) { 204 return createBatchResequencer(routeContext, batchConfig); 205 } else { 206 // streamConfig should be non-null if batchConfig is null 207 return createStreamResequencer(routeContext, streamConfig); 208 } 209 } 210 211 /** 212 * Creates a batch {@link Resequencer} instance applying the given 213 * <code>config</code>. 214 * 215 * @param routeContext 216 * route context. 217 * @param config 218 * batch resequencer configuration. 219 * @return the configured batch resequencer. 220 * @throws Exception 221 */ 222 protected Resequencer createBatchResequencer(RouteContext routeContext, 223 BatchResequencerConfig config) throws Exception { 224 Processor processor = routeContext.createProcessor(this); 225 Resequencer resequencer = new Resequencer(processor, resolveExpressionList(routeContext)); 226 resequencer.setBatchSize(config.getBatchSize()); 227 resequencer.setBatchTimeout(config.getBatchTimeout()); 228 return resequencer; 229 } 230 231 /** 232 * Creates a {@link StreamResequencer} instance applying the given 233 * <code>config</code>. 234 * 235 * @param routeContext 236 * route context. 237 * @param config 238 * stream resequencer configuration. 239 * @return the configured stream resequencer. 240 * @throws Exception 241 */ 242 protected StreamResequencer createStreamResequencer(RouteContext routeContext, 243 StreamResequencerConfig config) throws Exception { 244 config.getComparator().setExpressions(resolveExpressionList(routeContext)); 245 Processor processor = routeContext.createProcessor(this); 246 StreamResequencer resequencer = new StreamResequencer(processor, config.getComparator()); 247 resequencer.setTimeout(config.getTimeout()); 248 resequencer.setCapacity(config.getCapacity()); 249 return resequencer; 250 251 } 252 253 private List<Expression> resolveExpressionList(RouteContext routeContext) { 254 if (expressionList == null) { 255 expressionList = new ArrayList<Expression>(); 256 for (ExpressionType expression : expressions) { 257 expressionList.add(expression.createExpression(routeContext)); 258 } 259 } 260 if (expressionList.isEmpty()) { 261 throw new IllegalArgumentException("No expressions configured for: " + this); 262 } 263 return expressionList; 264 } 265 }