View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.journal;
19  
20  import java.io.IOException;
21  import java.io.PrintWriter;
22  import java.io.StringWriter;
23  
24  import org.codehaus.activemq.management.CountStatisticImpl;
25  import org.codehaus.activemq.management.TimeStatisticImpl;
26  import org.codehaus.activemq.util.IndentPrinter;
27  
28  /***
29   * A Journal filter that captures performance statistics of the filtered Journal.
30   * 
31   * @version $Revision: 1.2 $
32   */
33  public class JournalStatsFilter implements Journal {
34  	
35  	private final TimeStatisticImpl writeLatency = new TimeStatisticImpl("writeLatency", "The amount of time that is spent waiting for a record to be written to the Journal"); 
36  	private final CountStatisticImpl writeRecordsCounter = new CountStatisticImpl("writeRecordsCounter","The number of records that have been written by the Journal");
37  	private final CountStatisticImpl writeBytesCounter = new CountStatisticImpl("writeBytesCounter","The number of bytes that have been written by the Journal");
38  	private final TimeStatisticImpl synchedWriteLatency = new TimeStatisticImpl(writeLatency, "synchedWriteLatency", "The amount of time that is spent waiting for a synch record to be written to the Journal"); 
39  	private final TimeStatisticImpl unsynchedWriteLatency = new TimeStatisticImpl(writeLatency, "unsynchedWriteLatency", "The amount of time that is spent waiting for a non synch record to be written to the Journal"); 	
40  	private final TimeStatisticImpl readLatency = new TimeStatisticImpl("readLatency", "The amount of time that is spent waiting for a record to be read from the Journal"); 
41  	private final CountStatisticImpl readBytesCounter = new CountStatisticImpl("readBytesCounter","The number of bytes that have been read by the Journal");
42  	
43  	private final Journal next;
44  	private boolean detailedStats;
45  
46  	
47  	/***
48  	 * Creates a JournalStatsFilter that captures performance information of <code>next</next>. 
49  	 * @param next
50  	 */
51  	public JournalStatsFilter(Journal next) {
52  		this.next = next;
53  	}
54  	
55  	/***
56  	 * @see org.codehaus.activemq.journal.Journal#write(byte[], boolean)
57  	 */
58  	public RecordLocation write(byte[] data, boolean sync) throws IOException {
59  		//writeWaitTimeStat
60  		long start = System.currentTimeMillis();
61  		RecordLocation answer = next.write(data, sync);
62  		long end = System.currentTimeMillis();
63  		
64  		writeRecordsCounter.increment();
65  		writeBytesCounter.add(data.length);
66  		if( sync )
67  			synchedWriteLatency.addTime(end-start);
68  		else 
69  			unsynchedWriteLatency.addTime(end-start);
70  		return answer;
71  	}
72  
73  	/***
74  	 * @see org.codehaus.activemq.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
75  	 */
76  	public byte[] read(RecordLocation location)
77  			throws InvalidRecordLocationException, IOException {
78  		
79  		long start = System.currentTimeMillis();
80  		byte answer[] = next.read(location);		
81  		long end = System.currentTimeMillis();
82  		
83  		readBytesCounter.add(answer.length);
84  		readLatency.addTime(end-start);
85  		return answer;
86  	}
87  
88  	/***
89  	 * @see org.codehaus.activemq.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
90  	 */
91  	public void setMark(RecordLocation recordLocator, boolean force)
92  			throws InvalidRecordLocationException, IOException {
93  		next.setMark(recordLocator, force);
94  	}
95  
96  	/***
97  	 * @see org.codehaus.activemq.journal.Journal#getMark()
98  	 */
99  	public RecordLocation getMark() {
100 		return next.getMark();
101 	}
102 
103 	/***
104 	 * @see org.codehaus.activemq.journal.Journal#close()
105 	 */
106 	public void close() throws IOException {
107 		next.close();
108 	}
109 	
110 	/***
111 	 * @see org.codehaus.activemq.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
112 	 */
113 	public void setJournalEventListener(JournalEventListener eventListener) {
114 	    next.setJournalEventListener(eventListener);
115 	}
116 
117 	/***
118 	 * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
119 	 */
120 	public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
121 			throws IOException, InvalidRecordLocationException {		
122 		return next.getNextRecordLocation(lastLocation);
123 	}
124 	
125 	/***
126 	 * Writes the gathered statistics to the <code>out</code> object.
127 	 * 
128 	 * @param out
129 	 */
130     public void dump(IndentPrinter out) {
131         out.printIndent();
132         out.println("Journal Stats {");        
133         out.incrementIndent();
134         out.printIndent();
135         out.println("Throughput           : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s" );
136         out.printIndent();
137         out.println("Latency with force   : "+ getAvgSyncedLatencyMs() +" ms"  );
138         out.printIndent();
139         out.println("Latency without force: "+ getAvgUnSyncedLatencyMs() +" ms"  );
140 
141         out.printIndent();
142         out.println("Raw Stats {");
143         out.incrementIndent();
144                 
145         out.printIndent();
146         out.println(writeRecordsCounter);
147         out.printIndent();
148         out.println(writeBytesCounter);
149         out.printIndent();
150         out.println(writeLatency);
151         out.incrementIndent();
152         out.printIndent();
153         out.println(synchedWriteLatency);
154         out.printIndent();
155         out.println(unsynchedWriteLatency);
156         out.decrementIndent();
157 
158         out.printIndent();
159         out.println(readBytesCounter);
160         
161         out.printIndent();
162         out.println(readLatency);        
163         out.decrementIndent();
164         out.printIndent();
165         out.println("}");
166         
167         out.decrementIndent();
168         out.printIndent();
169         out.println("}");
170 
171     }
172 
173     /***
174      * Dumps the stats to a String.
175      * 
176      * @see java.lang.Object#toString()
177      */
178 	public String toString() {
179 		if( detailedStats ) {
180 			StringWriter w = new StringWriter();
181 			PrintWriter pw = new PrintWriter(w);		
182 			dump(new IndentPrinter(pw, "  "));
183 			return w.getBuffer().toString();
184 		} else {
185 			StringWriter w = new StringWriter();
186 			PrintWriter pw = new PrintWriter(w);
187 			IndentPrinter out = new IndentPrinter(pw, "  ");
188 	        out.println("Throughput           : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s");
189 	        out.printIndent();
190 	        out.println("Latency with force   : "+getAvgSyncedLatencyMs()+" ms"  );
191 	        out.printIndent();
192 	        out.println("Latency without force: "+getAvgUnSyncedLatencyMs()+" ms"  );
193 			return w.getBuffer().toString();			
194 		}
195     }
196 
197 	/***
198 	 * @param detailedStats true if details stats should be displayed by <code>toString()</code> and <code>dump</code>
199 	 * @return
200 	 */
201 	public JournalStatsFilter enableDetailedStats(boolean detailedStats) {		
202 		this.detailedStats = detailedStats;
203 		return this;
204 	}
205 
206 	/***
207 	 * Gets the average throughput in k/s.
208 	 * 
209 	 * @return the average throughput in k/s.
210 	 */
211 	public double getThroughputKps() {
212 		 long totalTime = writeBytesCounter.getLastSampleTime()-writeBytesCounter.getStartTime(); 
213 		 return (((double)writeBytesCounter.getCount()/(double)totalTime)/(double)1024)*1000;
214 	}
215 
216 	/***
217 	 * Gets the average throughput in records/s.
218 	 * 
219 	 * @return the average throughput in records/s.
220 	 */
221 	public double getThroughputRps() {
222 		 long totalTime = writeRecordsCounter.getLastSampleTime()-writeRecordsCounter.getStartTime(); 
223 		 return (((double)writeRecordsCounter.getCount()/(double)totalTime))*1000;
224 	}
225 
226 	/***
227 	 * Gets the average number of writes done per second
228 	 * 
229 	 * @return the average number of writes in w/s.
230 	 */
231 	public double getWritesPerSecond() {
232 		 return writeLatency.getAveragePerSecond();
233 	}
234 
235 	/***
236 	 * Gets the average sync write latency in ms.
237 	 * 
238 	 * @return the average sync write latency in ms.
239 	 */
240 	public double getAvgSyncedLatencyMs() {
241 		return synchedWriteLatency.getAverageTime();
242 	}
243 
244 	/***
245 	 * Gets the average non sync write latency in ms.
246 	 * 
247 	 * @return the average non sync write latency in ms.
248 	 */
249 	public double getAvgUnSyncedLatencyMs() {
250 		return unsynchedWriteLatency.getAverageTime();
251 	}
252 	
253 	/***
254 	 * Resets the stats sample.
255 	 */
256 	public void reset() {
257 		writeLatency.reset(); 
258 		writeBytesCounter.reset();
259 		writeRecordsCounter.reset();
260 		synchedWriteLatency.reset(); 
261 		unsynchedWriteLatency.reset(); 	
262 		readLatency.reset(); 
263 		readBytesCounter.reset();
264 	}
265 }