001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.List; 022 023 import javax.xml.bind.annotation.XmlElement; 024 import javax.xml.bind.annotation.XmlElementRef; 025 import javax.xml.bind.annotation.XmlRootElement; 026 import javax.xml.bind.annotation.XmlTransient; 027 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Route; 032 import org.apache.camel.impl.RouteContext; 033 import org.apache.camel.model.config.BatchResequencerConfig; 034 import org.apache.camel.model.config.StreamResequencerConfig; 035 import org.apache.camel.model.language.ExpressionType; 036 import org.apache.camel.processor.Resequencer; 037 import org.apache.camel.processor.StreamResequencer; 038 039 /** 040 * @version $Revision: 36321 $ 041 */ 042 @XmlRootElement(name = "resequencer") 043 public class ResequencerType extends ProcessorType<ProcessorType> { 044 @XmlElementRef 045 private List<ExpressionType> expressions = new ArrayList<ExpressionType>(); 046 @XmlElementRef 047 private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>(); 048 // Binding annotation at setter 049 private BatchResequencerConfig batchConfig; 050 // Binding annotation at setter 051 private StreamResequencerConfig streamConfig; 052 @XmlTransient 053 private List<Expression> expressionList; 054 055 public ResequencerType() { 056 this(null); 057 } 058 059 public ResequencerType(List<Expression> expressions) { 060 this.expressionList = expressions; 061 this.batch(); 062 } 063 064 /** 065 * Configures the stream-based resequencing algorithm using the default 066 * configuration. 067 * 068 * @return <code>this</code> instance. 069 */ 070 public ResequencerType stream() { 071 return stream(StreamResequencerConfig.getDefault()); 072 } 073 074 /** 075 * Configures the batch-based resequencing algorithm using the default 076 * configuration. 077 * 078 * @return <code>this</code> instance. 079 */ 080 public ResequencerType batch() { 081 return batch(BatchResequencerConfig.getDefault()); 082 } 083 084 /** 085 * Configures the stream-based resequencing algorithm using the given 086 * {@link StreamResequencerConfig}. 087 * 088 * @return <code>this</code> instance. 089 */ 090 public ResequencerType stream(StreamResequencerConfig config) { 091 this.streamConfig = config; 092 this.batchConfig = null; 093 return this; 094 } 095 096 /** 097 * Configures the batch-based resequencing algorithm using the given 098 * {@link BatchResequencerConfig}. 099 * 100 * @return <code>this</code> instance. 101 */ 102 public ResequencerType batch(BatchResequencerConfig config) { 103 this.batchConfig = config; 104 this.streamConfig = null; 105 return this; 106 } 107 108 public ResequencerType expression(ExpressionType expression) { 109 expressions.add(expression); 110 return this; 111 } 112 113 114 @Override 115 public String toString() { 116 return "Resequencer[ " + getExpressions() + " -> " + getOutputs() + "]"; 117 } 118 119 @Override 120 public String getLabel() { 121 return ExpressionType.getLabel(getExpressions()); 122 } 123 124 public List<ExpressionType> getExpressions() { 125 return expressions; 126 } 127 128 public List<ProcessorType<?>> getOutputs() { 129 return outputs; 130 } 131 132 public void setOutputs(List<ProcessorType<?>> outputs) { 133 this.outputs = outputs; 134 } 135 136 public BatchResequencerConfig getBatchConfig() { 137 return batchConfig; 138 } 139 140 public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) { 141 return batchConfig; 142 } 143 144 public StreamResequencerConfig getStreamConfig() { 145 return streamConfig; 146 } 147 148 // 149 // TODO: find out how to have these two within an <xsd:choice> 150 // 151 152 @XmlElement(name = "batch-config", required = false) 153 public void setBatchConfig(BatchResequencerConfig batchConfig) { 154 batch(batchConfig); 155 } 156 157 @XmlElement(name = "stream-config", required = false) 158 public void setStreamConfig(StreamResequencerConfig streamConfig) { 159 stream(streamConfig); 160 } 161 162 // 163 // END_TODO 164 // 165 166 @Override 167 public Processor createProcessor(RouteContext routeContext) throws Exception { 168 return createStreamResequencer(routeContext, streamConfig); 169 } 170 171 @Override 172 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 173 if (batchConfig != null) { 174 routes.add(createBatchResequencerRoute(routeContext)); 175 } else { 176 // StreamResequencer created via createProcessor method 177 super.addRoutes(routeContext, routes); 178 } 179 } 180 181 private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception { 182 final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig); 183 return new Route(routeContext.getEndpoint(), resequencer) { 184 @Override 185 public String toString() { 186 return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]"; 187 } 188 }; 189 } 190 191 protected Resequencer createBatchResequencer(RouteContext routeContext, 192 BatchResequencerConfig config) throws Exception { 193 Processor processor = routeContext.createProcessor(this); 194 Resequencer resequencer = new Resequencer(routeContext.getEndpoint(), 195 processor, resolveExpressionList(routeContext)); 196 resequencer.setBatchSize(config.getBatchSize()); 197 resequencer.setBatchTimeout(config.getBatchTimeout()); 198 return resequencer; 199 } 200 201 protected StreamResequencer createStreamResequencer(RouteContext routeContext, 202 StreamResequencerConfig config) throws Exception { 203 config.getComparator().setExpressions(resolveExpressionList(routeContext)); 204 Processor processor = routeContext.createProcessor(this); 205 StreamResequencer resequencer = new StreamResequencer(processor, 206 config.getComparator(), config.getCapacity()); 207 resequencer.setTimeout(config.getTimeout()); 208 return resequencer; 209 210 } 211 212 private List<Expression> resolveExpressionList(RouteContext routeContext) { 213 if (expressionList == null) { 214 expressionList = new ArrayList<Expression>(); 215 for (ExpressionType expression : expressions) { 216 expressionList.add(expression.createExpression(routeContext)); 217 } 218 } 219 if (expressionList.isEmpty()) { 220 throw new IllegalArgumentException("No expressions configured for: " + this); 221 } 222 return expressionList; 223 } 224 }