001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.text.NumberFormat;
020    import java.util.concurrent.atomic.AtomicInteger;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.commons.logging.Log;
024    
025    /**
026     * @version $Revision: 37863 $
027     */
028    public class ThroughputLogger extends Logger {
029        private int groupSize = 100;
030        private long startTime;
031        private long groupStartTime;
032        private AtomicInteger receivedCounter = new AtomicInteger();
033        private NumberFormat numberFormat = NumberFormat.getNumberInstance();
034        private String action = "Received";
035        private String logMessage;
036    
037        public ThroughputLogger() {
038        }
039    
040        public ThroughputLogger(Log log) {
041            super(log);
042        }
043    
044        public ThroughputLogger(Log log, LoggingLevel level) {
045            super(log, level);
046        }
047    
048        public ThroughputLogger(String logName) {
049            super(logName);
050        }
051    
052        public ThroughputLogger(String logName, LoggingLevel level) {
053            super(logName, level);
054        }
055    
056        public ThroughputLogger(String logName, LoggingLevel level, int groupSize) {
057            super(logName, level);
058            setGroupSize(groupSize);
059        }
060    
061        public ThroughputLogger(String logName, int groupSize) {
062            super(logName);
063            setGroupSize(groupSize);
064        }
065    
066        public ThroughputLogger(int groupSize) {
067            setGroupSize(groupSize);
068        }
069    
070        @Override
071        public void process(Exchange exchange) {
072            if (startTime == 0) {
073                startTime = System.currentTimeMillis();
074            }
075            int receivedCount = receivedCounter.incrementAndGet();
076            if (receivedCount % groupSize == 0) {
077                logMessage = createLogMessage(exchange, receivedCount);
078                super.process(exchange);
079            }
080        }
081    
082        public int getGroupSize() {
083            return groupSize;
084        }
085    
086        public void setGroupSize(int groupSize) {
087            if (groupSize == 0) {
088                throw new IllegalArgumentException("groupSize cannot be zero!");
089            }
090            this.groupSize = groupSize;
091        }
092    
093        public NumberFormat getNumberFormat() {
094            return numberFormat;
095        }
096    
097        public void setNumberFormat(NumberFormat numberFormat) {
098            this.numberFormat = numberFormat;
099        }
100    
101        public String getAction() {
102            return action;
103        }
104    
105        public void setAction(String action) {
106            this.action = action;
107        }
108    
109        @Override
110        protected Object logMessage(Exchange exchange) {
111            return logMessage;
112        }
113    
114        protected String createLogMessage(Exchange exchange, int receivedCount) {
115            long time = System.currentTimeMillis();
116            if (groupStartTime == 0) {
117                groupStartTime = startTime;
118            }
119    
120            double rate = messagesPerSecond(groupSize, groupStartTime, time);
121            double average = messagesPerSecond(receivedCount, startTime, time);
122    
123            groupStartTime = time;
124    
125            return getAction() + ": " + receivedCount + " messages so far. Last group took: " + (time - groupStartTime)
126                    + " millis which is: " + numberFormat.format(rate)
127                    + " messages per second. average: " + numberFormat.format(average);
128        }
129    
130        // timeOneMessage = elapsed / messageCount
131        // messagePerSend = 1000 / timeOneMessage
132        protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
133            double rate = messageCount * 1000.0;
134            rate /= endTime - startTime;
135            return rate;
136        }
137    }