001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.List;
022    
023    import javax.xml.bind.annotation.XmlElement;
024    import javax.xml.bind.annotation.XmlElementRef;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Exchange;
029    import org.apache.camel.Expression;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Route;
032    import org.apache.camel.model.config.BatchResequencerConfig;
033    import org.apache.camel.model.config.StreamResequencerConfig;
034    import org.apache.camel.model.language.ExpressionType;
035    import org.apache.camel.processor.Resequencer;
036    import org.apache.camel.processor.StreamResequencer;
037    import org.apache.camel.spi.RouteContext;
038    
039    /**
040     * Represents an XML <resequencer/> element
041     *
042     * @version $Revision: 41908 $
043     */
044    @XmlRootElement(name = "resequencer")
045    public class ResequencerType extends ProcessorType<ProcessorType> {
046        @XmlElementRef
047        private List<ExpressionType> expressions = new ArrayList<ExpressionType>();
048        @XmlElementRef
049        private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
050        // Binding annotation at setter
051        private BatchResequencerConfig batchConfig;
052        // Binding annotation at setter
053        private StreamResequencerConfig streamConfig;
054        @XmlTransient
055        private List<Expression> expressionList;
056    
057        public ResequencerType() {
058            this(null);
059        }
060    
061        public ResequencerType(List<Expression> expressions) {
062            this.expressionList = expressions;
063            this.batch();
064        }
065    
066        /**
067         * Configures the stream-based resequencing algorithm using the default
068         * configuration.
069         *
070         * @return <code>this</code> instance.
071         */
072        public ResequencerType stream() {
073            return stream(StreamResequencerConfig.getDefault());
074        }
075    
076        /**
077         * Configures the batch-based resequencing algorithm using the default
078         * configuration.
079         *
080         * @return <code>this</code> instance.
081         */
082        public ResequencerType batch() {
083            return batch(BatchResequencerConfig.getDefault());
084        }
085    
086        /**
087         * Configures the stream-based resequencing algorithm using the given
088         * {@link StreamResequencerConfig}.
089         *
090         * @return <code>this</code> instance.
091         */
092        public ResequencerType stream(StreamResequencerConfig config) {
093            this.streamConfig = config;
094            this.batchConfig = null;
095            return this;
096        }
097    
098        /**
099         * Configures the batch-based resequencing algorithm using the given
100         * {@link BatchResequencerConfig}.
101         *
102         * @return <code>this</code> instance.
103         */
104        public ResequencerType batch(BatchResequencerConfig config) {
105            this.batchConfig = config;
106            this.streamConfig = null;
107            return this;
108        }
109    
110        public ResequencerType expression(ExpressionType expression) {
111            expressions.add(expression);
112            return this;
113        }
114    
115        @Override
116        public String toString() {
117            return "Resequencer[ " + getExpressions() + " -> " + getOutputs() + "]";
118        }
119    
120        @Override
121        public String getLabel() {
122            return ExpressionType.getLabel(getExpressions());
123        }
124    
125        public List<ExpressionType> getExpressions() {
126            return expressions;
127        }
128    
129        public List<ProcessorType<?>> getOutputs() {
130            return outputs;
131        }
132    
133        public void setOutputs(List<ProcessorType<?>> outputs) {
134            this.outputs = outputs;
135        }
136    
137        public BatchResequencerConfig getBatchConfig() {
138            return batchConfig;
139        }
140    
141        public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) {
142            return batchConfig;
143        }
144    
145        public StreamResequencerConfig getStreamConfig() {
146            return streamConfig;
147        }
148    
149        @XmlElement(name = "batch-config", required = false)
150        public void setBatchConfig(BatchResequencerConfig batchConfig) {
151            // TODO: find out how to have these two within an <xsd:choice>
152            batch(batchConfig);
153        }
154    
155        @XmlElement(name = "stream-config", required = false)
156        public void setStreamConfig(StreamResequencerConfig streamConfig) {
157            // TODO: find out how to have these two within an <xsd:choice>
158            stream(streamConfig);
159        }
160    
161        @Override
162        public Processor createProcessor(RouteContext routeContext) throws Exception {
163            return createStreamResequencer(routeContext, streamConfig);
164        }
165    
166        @Override
167        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
168            if (batchConfig != null) {
169                routes.add(createBatchResequencerRoute(routeContext));
170            } else {
171                // StreamResequencer created via createProcessor method
172                super.addRoutes(routeContext, routes);
173            }
174        }
175    
176        private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception {
177            final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
178            return new Route(routeContext.getEndpoint(), resequencer) {
179                @Override
180                public String toString() {
181                    return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
182                }
183            };
184        }
185    
186        protected Resequencer createBatchResequencer(RouteContext routeContext,
187                BatchResequencerConfig config) throws Exception {
188            Processor processor = routeContext.createProcessor(this);
189            Resequencer resequencer = new Resequencer(routeContext.getEndpoint(),
190                    processor, resolveExpressionList(routeContext));
191            resequencer.setBatchSize(config.getBatchSize());
192            resequencer.setBatchTimeout(config.getBatchTimeout());
193            return resequencer;
194        }
195    
196        protected StreamResequencer createStreamResequencer(RouteContext routeContext,
197                StreamResequencerConfig config) throws Exception {
198            config.getComparator().setExpressions(resolveExpressionList(routeContext));
199            Processor processor = routeContext.createProcessor(this);
200            StreamResequencer resequencer = new StreamResequencer(processor,
201                    config.getComparator(), config.getCapacity());
202            resequencer.setTimeout(config.getTimeout());
203            return resequencer;
204    
205        }
206    
207        private List<Expression> resolveExpressionList(RouteContext routeContext) {
208            if (expressionList == null) {
209                expressionList = new ArrayList<Expression>();
210                for (ExpressionType expression : expressions) {
211                    expressionList.add(expression.createExpression(routeContext));
212                }
213            }
214            if (expressionList.isEmpty()) {
215                throw new IllegalArgumentException("No expressions configured for: " + this);
216            }
217            return expressionList;
218        }
219    }