1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.journal.impl;
19
20 import java.nio.ByteBuffer;
21
22 import EDU.oswego.cs.dl.util.concurrent.Latch;
23
24 /***
25 * This contains all the data needed to write and force a list of records to a LogFile.
26 * The more records that can be cramed into a single BatchedWrite, the higher throughput
27 * that can be achived by a write and force operation.
28 *
29 * @version $Revision: 1.2 $
30 */
31 public class BatchedWrite {
32
33 private final Latch writeDoneLatch = new Latch();
34 private Record data=null;
35 public Throwable error;
36 private final ByteBuffer byteBuffer;
37 private long firstSequenceId=-1;
38 private long lastSequenceId;
39 private Mark mark;
40
41 private boolean appendDisabled=false;
42 private boolean appendInProgress=false;
43
44 /***
45 * @param byteBuffer
46 */
47 public BatchedWrite(ByteBuffer byteBuffer) {
48 this.byteBuffer = byteBuffer;
49 }
50
51 /***
52 * @throws InterruptedException
53 *
54 */
55 synchronized public void disableAppend() throws InterruptedException {
56 appendDisabled=true;
57 while( appendInProgress ) {
58 wait();
59 }
60 }
61
62 /***
63 * @param record
64 */
65 public boolean append(Record record) {
66
67 synchronized(this) {
68 if( appendDisabled )
69 return false;
70 appendInProgress=true;
71 }
72
73 record.fill( byteBuffer );
74
75 if( record.remaining()==0 ) {
76 if( firstSequenceId== -1 )
77 firstSequenceId = record.getHeader().sequenceId;
78 lastSequenceId = record.getHeader().sequenceId;;
79 if( record.getMark()!=null )
80 mark = record.getMark();
81 }
82
83 synchronized(this) {
84 appendInProgress=false;
85 this.notify();
86
87 if( appendDisabled )
88 return false;
89 else
90 return byteBuffer.remaining()>0;
91 }
92 }
93
94 public void waitForForce() throws Throwable {
95 writeDoneLatch.acquire();
96 synchronized(this) {
97 if( error!=null )
98 throw error;
99 }
100 }
101
102 public void forced() {
103 writeDoneLatch.release();
104 }
105
106 public void writeFailed(Throwable error) {
107 synchronized(this) {
108 this.error=error;
109 }
110 writeDoneLatch.release();
111 }
112
113 /***
114 * @return Returns the data.
115 */
116 public Record getData() {
117 return data;
118 }
119
120 public ByteBuffer getByteBuffer() {
121 return byteBuffer;
122 }
123
124 /***
125 * @return
126 */
127 public Mark getMark() {
128 return mark;
129 }
130
131 /***
132 * @return
133 */
134 public long getLastSequenceId() {
135 return lastSequenceId;
136 }
137
138 /***
139 * @return
140 */
141 public long getFirstSequenceId() {
142 return firstSequenceId;
143 }
144
145
146 }