- All Implemented Interfaces:
org.jgroups.Lifecycle,DynamicMembership,Settable
The implementation uses a queue to which the following types of requests are added: down-requests (invocations of
Settable.setAsync(byte[], int, int))
and up-requests (requests or responses received in up(Message) or up(MessageBatch)).
Leaders handle down-requests (resulting in sending AppendEntriesRequests) and up-requests (responses). Followers handle only up-requests (AppendEntriesRequests) and send responses. Note that the periodic sending of AppendEntriesRequests (if needed) is also done by the queue handling thread.
The use of the queue makes the RAFT protocol effectively single-threaded; i.e. only 1 thread ever changes state, so synchronization can be removed altogether. The only exception to this is invocation ofchangeRole(Role), called by ELECTION: this still needs to be changed (probably by adding it as an
event to the queue, too).- Since:
- 0.1
- Author:
- Bela Ban
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classGenerated bySettable.setAsync(byte[], int, int)protected static classstatic interfaceprotected static classprotected static classReceived by up(Message) or up(MessageBatch) -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected booleanprotected intprotected CompletableFuture<byte[]> protected static final shortprotected static final shortprotected static final shortprotected org.jgroups.util.AverageMinMaxprotected longprotected CommitTableprotected longprotected booleanprotected RaftImplThe current role (follower, candidate or leader).protected static final shortprotected final PersistentStateprotected longprotected Stringprotected Stringprotected Stringprotected static final shortprotected Logprotected Stringprotected Stringprotected intprotected longprotected intprotected intprotected intprotected intprotected intprotected intprotected BlockingQueue<RAFT.Request> All requests are added to this queue; a single thread processes this queue - hence no synchronization issuesprotected intprotected Stringprotected static final shortstatic final byte[]protected final org.jgroups.protocols.raft.state.RaftStateprotected ReadOnlyRequestRepository<RAFT.DownRequest> protected final List<RAFT.Request> protected RequestTable<String> protected longprotected final List<RAFT.RoleChange> protected org.jgroups.util.Runnerprotected booleanprotected StateMachineprotected booleanprotected booleanprotected org.jgroups.ViewFields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected void_addServer(String name) protected void_removeServer(String name) protected voidadd(RAFT.Request r) CompletableFuture<byte[]> protected booleanappend(long index, LogEntries entries) Appends to the log and returns true if added or false if not (e.g. because the entry already existedprotected voidapplyCommit(long index, boolean serialize_response) Applies the commit at indexprotected longapplyCommits(long to_inclusive, boolean serialize_response) Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable.protected CompletableFuture<byte[]> changeMembers(String name, InternalCommand.Type type) protected RAFTchangeRole(Role new_role) longprotected RAFTcommitLogTo(long index_inclusive, boolean serialize_response) Tries to move commit_index up to index_inclusive, apply the entries in [commit_index+1 .. index_inclusive] to the state machine and notify the clients for each entry.protected voidprotected voidprotected StringcreateLogName(String name, String suffix) longprotected voidPopulate with non-committed entries (from log) (https://github.com/belaban/jgroups-raft/issues/31)longlongintcurrentTerm(long new_term) Sets current_term if new_term is biggerprotected voiddeleteAllLogEntriesStartingFrom(long index) voiddown(org.jgroups.Event evt) dumpLog()dumpLog(long last_n) protected booleanduplicatesInView(org.jgroups.View view) Checks if a given view contains duplicate raft-ids.voidprotected voidexecuteInternalCommand(InternalCommand cmd, byte[] buf, int offset, int length) If cmd is not null, execute it.static <T> TfindProtocol(Class<T> clazz, org.jgroups.stack.Protocol start, boolean down) voidvoidflushCommitTable(org.jgroups.Address member) Triggers a flush of the entries to the given member.CompletableFuture<byte[]> Asynchronous get operation that returns immediately without blocking.protected voidhandleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length, boolean internal, Options opts, boolean readOnly) This method is always called by a single thread only, and does therefore not need to be reentrantvoidhandleUpRequest(org.jgroups.Message msg, RaftHeader hdr) voidhandleView(org.jgroups.View view) impl()voidinit()voidLoads the log entries from [first .. commit_index] into the state machinebooleanisLeader()longorg.jgroups.Addressleader()leader(org.jgroups.Address new_leader) log()logArgs()doubleintintlogClass()voidlogEntries(ObjLongConsumer<LogEntry> func) logName()longlogSize()longThis is a managed operation because it should invoked sparingly (costly)booleanlogUseFsync(boolean b) intmajority()intmaxLogCacheSize(int size) longmaxLogSize(long val) members()members(Collection<String> list) protected static <T> voidnotify(RequestTable.Entry<T> e, byte[] rsp) protected static <T> voidnotify(RequestTable.Entry<T> e, Throwable t) protected voidintprotected voidprotected voidprocess(List<RAFT.Request> q) intprotected voidraftId()intCompletableFuture<byte[]> removeServer(String name) intprotected voidresend(org.jgroups.Address target, long index) protected voidresend(org.jgroups.Address target, long from, long to) Resends all entries in range [from .. to] to targetlongresendInterval(long val) voidrole()protected voidsendAppendEntriesMessage(org.jgroups.Address member, CommitTable.Entry e) Runs (on the leader) as part of the queue handling loop: checks if all members (except the leader) in the commit table have received all messages and resends AppendEntries messages to members who haven't.
For each member, a next-index and match-index is maintained: next-index is the index of the next message to send to that member (initialized to last-applied) and match-index is the index of the highest message known to have been received by the member.
Messages are resent to a given member as long as that member's match-index is smaller than its next-index.booleansendCommitsImmediately(boolean v) protected voidsendSnapshotTo(org.jgroups.Address dest) CompletableFuture<byte[]> CompletableFuture<byte[]> Called by a building block to apply a change to all state machines in a cluster.setLeaderAndTerm(org.jgroups.Address new_leader) setLeaderAndTerm(org.jgroups.Address new_leader, long new_term) Sets the new leader and termvoidsetMembers(String list) voidsnapshot()Creates a snapshot and truncates the log.protected voidsnapshotIfNeeded(int bytes_added) voidstart()stateMachineLoaded(boolean b) voidstop()booleansynchronous(boolean b) protected voidtoString()inttrySetLeaderAndTerm(org.jgroups.Address newLeader, long newTerm) up(org.jgroups.Event evt) up(org.jgroups.Message msg) voidup(org.jgroups.util.MessageBatch batch) org.jgroups.AddressvotedFor()votedFor(org.jgroups.Address mbr) Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, destroy, down, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, providedUpServices, removePolicy, requiredDownServices, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setLevel, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled
-
Field Details
-
raft_id_key
public static final byte[] raft_id_key -
RAFT_ID
protected static final short RAFT_ID- See Also:
-
APPEND_ENTRIES_REQ
protected static final short APPEND_ENTRIES_REQ- See Also:
-
APPEND_ENTRIES_RSP
protected static final short APPEND_ENTRIES_RSP- See Also:
-
APPEND_RESULT
protected static final short APPEND_RESULT- See Also:
-
INSTALL_SNAPSHOT_REQ
protected static final short INSTALL_SNAPSHOT_REQ- See Also:
-
LOG_ENTRIES
protected static final short LOG_ENTRIES- See Also:
-
print_function
-
raft_id
-
internal_state
-
raft_state
protected final org.jgroups.protocols.raft.state.RaftState raft_state -
majority
protected int majority -
dynamic_view_changes
protected boolean dynamic_view_changes -
log_class
-
log_args
-
log_dir
-
log_prefix
-
log_name
-
resend_interval
protected long resend_interval -
send_commits_immediately
protected boolean send_commits_immediately -
max_log_size
protected long max_log_size -
_max_log_cache_size
protected int _max_log_cache_size -
_log_use_fsync
protected boolean _log_use_fsync -
curr_log_size
protected long curr_log_size -
num_successful_append_requests
protected int num_successful_append_requests -
num_snapshot_received
protected int num_snapshot_received -
avg_append_entries_batch_size
protected org.jgroups.util.AverageMinMax avg_append_entries_batch_size -
num_failed_append_requests_not_found
protected int num_failed_append_requests_not_found -
num_failed_append_requests_wrong_term
protected int num_failed_append_requests_wrong_term -
state_machine
-
state_machine_loaded
protected boolean state_machine_loaded -
log_impl
-
request_table
-
commit_table
-
readOnlyRequests
-
role_change_listeners
-
impl
The current role (follower, candidate or leader). Every node starts out as a follower -
view
protected volatile org.jgroups.View view -
last_appended
protected long last_appended -
commit_index
protected long commit_index -
num_snapshots
protected int num_snapshots -
num_resends
protected int num_resends -
processing_queue_max_size
protected int processing_queue_max_size -
processing_queue
All requests are added to this queue; a single thread processes this queue - hence no synchronization issues -
remove_queue
-
runner
protected org.jgroups.util.Runner runner -
synchronous
protected boolean synchronous -
add_server_future
-
-
Constructor Details
-
RAFT
public RAFT()
-
-
Method Details
-
removeQueueSize
public int removeQueueSize() -
processingQueueSize
public int processingQueueSize() -
drainRatio
-
raftId
-
raftId
-
impl
-
majority
public int majority() -
logClass
-
logClass
-
logArgs
-
logArgs
-
logPrefix
-
logPrefix
-
logName
-
resendInterval
public long resendInterval() -
resendInterval
-
sendCommitsImmediately
public boolean sendCommitsImmediately() -
sendCommitsImmediately
-
maxLogSize
public long maxLogSize() -
maxLogSize
-
currentLogSize
public long currentLogSize() -
requestTableSize
public int requestTableSize() -
numSnapshots
public int numSnapshots() -
leader
public org.jgroups.Address leader() -
leader
-
isLeader
public boolean isLeader() -
stateMachine
-
stateMachine
-
commitTable
-
currentTerm
public long currentTerm() -
votedFor
public org.jgroups.Address votedFor() -
lastAppended
public long lastAppended() -
commitIndex
public long commitIndex() -
log
-
log
-
addRoleListener
-
remRoleListener
-
stateMachineLoaded
-
synchronous
public boolean synchronous() -
synchronous
-
logDir
-
resetStats
public void resetStats()- Overrides:
resetStatsin classorg.jgroups.stack.Protocol
-
maxLogCacheSize
public int maxLogCacheSize() -
maxLogCacheSize
-
logUseFsync
-
logUseFsync
public boolean logUseFsync() -
logCacheNumTrims
public int logCacheNumTrims() -
LogCacheNumAccesses
public int LogCacheNumAccesses() -
logCacheHitRatio
public double logCacheHitRatio() -
setMembers
-
members
-
members
-
currentTerm
public int currentTerm(long new_term) Sets current_term if new_term is bigger- Parameters:
new_term- The new term- Returns:
- -1 if new_term is smaller, 0 if equal and 1 if new_term is bigger
-
votedFor
-
role
-
dumpCommitTable
-
logSize
public long logSize() -
logDescription
-
logSizeInBytes
public long logSizeInBytes()This is a managed operation because it should invoked sparingly (costly) -
dumpLog
-
dumpLog
-
enableLogCache
public void enableLogCache() -
disableLogCache
public void disableLogCache() -
clearLogCache
-
trimLogCache
-
logEntries
-
createNewTerm
public long createNewTerm() -
findProtocol
-
addServer
- Specified by:
addServerin interfaceDynamicMembership- Throws:
Exception
-
removeServer
- Specified by:
removeServerin interfaceDynamicMembership- Throws:
Exception
-
snapshot
Creates a snapshot and truncates the log. See https://github.com/belaban/jgroups-raft/issues/7 for details- Throws:
Exception
-
snapshotAsync
-
initStateMachineFromLog
Loads the log entries from [first .. commit_index] into the state machine- Throws:
Exception
-
init
- Specified by:
initin interfaceorg.jgroups.Lifecycle- Overrides:
initin classorg.jgroups.stack.Protocol- Throws:
Exception
-
start
- Specified by:
startin interfaceorg.jgroups.Lifecycle- Overrides:
startin classorg.jgroups.stack.Protocol- Throws:
Exception
-
stop
public void stop()- Specified by:
stopin interfaceorg.jgroups.Lifecycle- Overrides:
stopin classorg.jgroups.stack.Protocol
-
down
- Overrides:
downin classorg.jgroups.stack.Protocol
-
up
- Overrides:
upin classorg.jgroups.stack.Protocol
-
up
- Overrides:
upin classorg.jgroups.stack.Protocol
-
up
public void up(org.jgroups.util.MessageBatch batch) - Overrides:
upin classorg.jgroups.stack.Protocol
-
flushCommitTable
public void flushCommitTable() -
flushCommitTable
public void flushCommitTable(org.jgroups.Address member) Triggers a flush of the entries to the given member.- Parameters:
member- The not-null member address to send the entries.- Throws:
IllegalStateException- Thrown in case the current node is not the leader.NullPointerException- Thrown in case the is null.
-
setAsync
Called by a building block to apply a change to all state machines in a cluster. This starts the consensus protocol to get a majority to commit this change.This call is non-blocking and returns a future as soon as the AppendEntries message has been sent. Only applicable on the leader
- Specified by:
setAsyncin interfaceSettable- Parameters:
buf- The commandoffset- The offset into the bufferlength- The length of the bufferoptions- Options to pass to the call, may be null- Returns:
- A
CompletableFuture. Can be used to wait for the result (sync). A blocking caller could call set(), then call future.get() to block for the result.
-
getAsync
public CompletableFuture<byte[]> getAsync(byte[] buf, int offset, int length, Options options) throws Exception Description copied from interface:SettableAsynchronous get operation that returns immediately without blocking.This method submits a read-only operation to the state machine. Read-only operations are treated differently by the replication algorithm. Since read-only operations do not change the state-machine state, these operations are not appended to the replicated log.
Warning: Do not change the state-machine state by operations submitted through this method. Otherwise, the state-machine will diverge and lead to an undefined state.
- Specified by:
getAsyncin interfaceSettable- Parameters:
buf- The buffer representing the read-only operation to submit to the state machine.offset- The offset to skip the bytes in the buffer.length- The number of bytes to use from the buffer starting at offset.options- Options to pass along in the call chain, may be null.- Returns:
- A buffer representing the result after submitting the read-only operation.
- Throws:
Exception- Thrown if the operation could not be submitted.
-
setAsync
public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, boolean internal, Options options) -
toString
- Overrides:
toStringin classorg.jgroups.stack.Protocol
-
add
-
offer
-
handleDownRequest
protected void handleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length, boolean internal, Options opts, boolean readOnly) This method is always called by a single thread only, and does therefore not need to be reentrant -
handleUpRequest
-
processQueue
protected void processQueue() -
process
-
createRequestTable
protected void createRequestTable()Populate with non-committed entries (from log) (https://github.com/belaban/jgroups-raft/issues/31) -
createCommitTable
protected void createCommitTable() -
_addServer
-
_removeServer
-
sendAppendEntriesMessage
Runs (on the leader) as part of the queue handling loop: checks if all members (except the leader) in the commit table have received all messages and resends AppendEntries messages to members who haven't.
For each member, a next-index and match-index is maintained: next-index is the index of the next message to send to that member (initialized to last-applied) and match-index is the index of the highest message known to have been received by the member.
Messages are resent to a given member as long as that member's match-index is smaller than its next-index. When match_index == next_index, message resending for that member is stopped. When a new message is sent, next-index is incremented (on reception of the AppendResult) and resending starts again. -
changeMembers
protected CompletableFuture<byte[]> changeMembers(String name, InternalCommand.Type type) throws Exception - Throws:
Exception
-
resend
protected void resend(org.jgroups.Address target, long index) -
resend
protected void resend(org.jgroups.Address target, long from, long to) Resends all entries in range [from .. to] to target -
sendSnapshotTo
- Throws:
Exception
-
commitLogTo
Tries to move commit_index up to index_inclusive, apply the entries in [commit_index+1 .. index_inclusive] to the state machine and notify the clients for each entry. There is no need to check if an entry is committed in RequestTable, as this was done before calling this method.- Parameters:
index_inclusive- The index to which to move commit_indexserialize_response- When true, the response of applying a change to the state machine needs to be serialized into a byte[] array, otherwise null can be returned (reducing serialization cost)
-
append
Appends to the log and returns true if added or false if not (e.g. because the entry already existed -
deleteAllLogEntriesStartingFrom
protected void deleteAllLogEntriesStartingFrom(long index) -
snapshotIfNeeded
protected void snapshotIfNeeded(int bytes_added) -
takeSnapshot
- Throws:
Exception
-
applyCommits
protected long applyCommits(long to_inclusive, boolean serialize_response) Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable.- Parameters:
to_inclusive- The end index (inclusive) of the log entries to applyserialize_response- Whether or notStateMachine.apply(byte[], int, int, boolean)needs to return a serialized response- Returns:
- The last index of the range of log entries that was successfuly applied (normally this is to_inclusive)
-
applyCommit
Applies the commit at index- Throws:
Exception
-
handleView
public void handleView(org.jgroups.View view) -
setLeaderAndTerm
-
setLeaderAndTerm
Sets the new leader and term -
trySetLeaderAndTerm
public int trySetLeaderAndTerm(org.jgroups.Address newLeader, long newTerm) -
notify
-
notify
-
changeRole
-
executeInternalCommand
If cmd is not null, execute it. Else parse buf into InternalCommand then call cmd.execute() -
createLogName
-
notifyRoleChangeListeners
-
duplicatesInView
protected boolean duplicatesInView(org.jgroups.View view) Checks if a given view contains duplicate raft-ids. Uses key raft-id in ExtendedUUID to compare -
parseCommaDelimitedProps
-
computeMajority
protected void computeMajority()
-