Class RAFT

java.lang.Object
org.jgroups.stack.Protocol
org.jgroups.protocols.raft.RAFT
All Implemented Interfaces:
org.jgroups.Lifecycle, DynamicMembership, Settable

public class RAFT extends org.jgroups.stack.Protocol implements Settable, DynamicMembership
Implementation of the RAFT consensus protocol in JGroups

The implementation uses a queue to which the following types of requests are added: down-requests (invocations of Settable.setAsync(byte[], int, int)) and up-requests (requests or responses received in up(Message) or up(MessageBatch)).

Leaders handle down-requests (resulting in sending AppendEntriesRequests) and up-requests (responses). Followers handle only up-requests (AppendEntriesRequests) and send responses. Note that the periodic sending of AppendEntriesRequests (if needed) is also done by the queue handling thread.

The use of the queue makes the RAFT protocol effectively single-threaded; i.e. only 1 thread ever changes state, so synchronization can be removed altogether. The only exception to this is invocation of changeRole(Role), called by ELECTION: this still needs to be changed (probably by adding it as an event to the queue, too).
Since:
0.1
Author:
Bela Ban
See Also:
  • Field Details

    • raft_id_key

      public static final byte[] raft_id_key
    • RAFT_ID

      protected static final short RAFT_ID
      See Also:
    • APPEND_ENTRIES_REQ

      protected static final short APPEND_ENTRIES_REQ
      See Also:
    • APPEND_ENTRIES_RSP

      protected static final short APPEND_ENTRIES_RSP
      See Also:
    • APPEND_RESULT

      protected static final short APPEND_RESULT
      See Also:
    • INSTALL_SNAPSHOT_REQ

      protected static final short INSTALL_SNAPSHOT_REQ
      See Also:
    • LOG_ENTRIES

      protected static final short LOG_ENTRIES
      See Also:
    • raft_id

      protected String raft_id
    • internal_state

      protected final PersistentState internal_state
    • raft_state

      protected final org.jgroups.protocols.raft.state.RaftState raft_state
    • majority

      protected int majority
    • dynamic_view_changes

      protected boolean dynamic_view_changes
    • log_class

      protected String log_class
    • log_args

      protected String log_args
    • log_dir

      protected String log_dir
    • log_prefix

      protected String log_prefix
    • log_name

      protected String log_name
    • resend_interval

      protected long resend_interval
    • send_commits_immediately

      protected boolean send_commits_immediately
    • max_log_size

      protected long max_log_size
    • _max_log_cache_size

      protected int _max_log_cache_size
    • _log_use_fsync

      protected boolean _log_use_fsync
    • curr_log_size

      protected long curr_log_size
    • num_successful_append_requests

      protected int num_successful_append_requests
    • num_snapshot_received

      protected int num_snapshot_received
    • avg_append_entries_batch_size

      protected org.jgroups.util.AverageMinMax avg_append_entries_batch_size
    • num_failed_append_requests_not_found

      protected int num_failed_append_requests_not_found
    • num_failed_append_requests_wrong_term

      protected int num_failed_append_requests_wrong_term
    • state_machine

      protected StateMachine state_machine
    • state_machine_loaded

      protected boolean state_machine_loaded
    • log_impl

      protected Log log_impl
    • request_table

      protected RequestTable<String> request_table
    • commit_table

      protected CommitTable commit_table
    • readOnlyRequests

      protected ReadOnlyRequestRepository<RAFT.DownRequest> readOnlyRequests
    • role_change_listeners

      protected final List<RAFT.RoleChange> role_change_listeners
    • impl

      protected volatile RaftImpl impl
      The current role (follower, candidate or leader). Every node starts out as a follower
    • view

      protected volatile org.jgroups.View view
    • last_appended

      protected long last_appended
    • commit_index

      protected long commit_index
    • num_snapshots

      protected int num_snapshots
    • num_resends

      protected int num_resends
    • processing_queue_max_size

      protected int processing_queue_max_size
    • processing_queue

      protected BlockingQueue<RAFT.Request> processing_queue
      All requests are added to this queue; a single thread processes this queue - hence no synchronization issues
    • remove_queue

      protected final List<RAFT.Request> remove_queue
    • runner

      protected org.jgroups.util.Runner runner
    • synchronous

      protected boolean synchronous
    • add_server_future

      protected CompletableFuture<byte[]> add_server_future
  • Constructor Details

    • RAFT

      public RAFT()
  • Method Details

    • removeQueueSize

      public int removeQueueSize()
    • processingQueueSize

      public int processingQueueSize()
    • drainRatio

      public String drainRatio()
    • raftId

      public String raftId()
    • raftId

      public RAFT raftId(String id)
    • impl

      public RaftImpl impl()
    • majority

      public int majority()
    • logClass

      public String logClass()
    • logClass

      public RAFT logClass(String clazz)
    • logArgs

      public String logArgs()
    • logArgs

      public RAFT logArgs(String args)
    • logPrefix

      public String logPrefix()
    • logPrefix

      public RAFT logPrefix(String name)
    • logName

      public String logName()
    • resendInterval

      public long resendInterval()
    • resendInterval

      public RAFT resendInterval(long val)
    • sendCommitsImmediately

      public boolean sendCommitsImmediately()
    • sendCommitsImmediately

      public RAFT sendCommitsImmediately(boolean v)
    • maxLogSize

      public long maxLogSize()
    • maxLogSize

      public RAFT maxLogSize(long val)
    • currentLogSize

      public long currentLogSize()
    • requestTableSize

      public int requestTableSize()
    • numSnapshots

      public int numSnapshots()
    • leader

      public org.jgroups.Address leader()
    • leader

      public RAFT leader(org.jgroups.Address new_leader)
    • isLeader

      public boolean isLeader()
    • stateMachine

      public RAFT stateMachine(StateMachine sm)
    • stateMachine

      public StateMachine stateMachine()
    • commitTable

      public CommitTable commitTable()
    • currentTerm

      public long currentTerm()
    • votedFor

      public org.jgroups.Address votedFor()
    • lastAppended

      public long lastAppended()
    • commitIndex

      public long commitIndex()
    • log

      public Log log()
    • log

      public RAFT log(Log new_log)
    • addRoleListener

      public RAFT addRoleListener(RAFT.RoleChange c)
    • remRoleListener

      public RAFT remRoleListener(RAFT.RoleChange c)
    • stateMachineLoaded

      public RAFT stateMachineLoaded(boolean b)
    • synchronous

      public boolean synchronous()
    • synchronous

      public RAFT synchronous(boolean b)
    • logDir

      public RAFT logDir(String logDir)
    • resetStats

      public void resetStats()
      Overrides:
      resetStats in class org.jgroups.stack.Protocol
    • maxLogCacheSize

      public int maxLogCacheSize()
    • maxLogCacheSize

      public RAFT maxLogCacheSize(int size)
    • logUseFsync

      public RAFT logUseFsync(boolean b)
    • logUseFsync

      public boolean logUseFsync()
    • logCacheNumTrims

      public int logCacheNumTrims()
    • LogCacheNumAccesses

      public int LogCacheNumAccesses()
    • logCacheHitRatio

      public double logCacheHitRatio()
    • setMembers

      public void setMembers(String list)
    • members

      public RAFT members(Collection<String> list)
    • members

      public List<String> members()
    • currentTerm

      public int currentTerm(long new_term)
      Sets current_term if new_term is bigger
      Parameters:
      new_term - The new term
      Returns:
      -1 if new_term is smaller, 0 if equal and 1 if new_term is bigger
    • votedFor

      public RAFT votedFor(org.jgroups.Address mbr)
    • role

      public String role()
    • dumpCommitTable

      public String dumpCommitTable()
    • logSize

      public long logSize()
    • logDescription

      public String logDescription()
    • logSizeInBytes

      public long logSizeInBytes()
      This is a managed operation because it should invoked sparingly (costly)
    • dumpLog

      public String dumpLog(long last_n)
    • dumpLog

      public String dumpLog()
    • enableLogCache

      public void enableLogCache()
    • disableLogCache

      public void disableLogCache()
    • clearLogCache

      public RAFT clearLogCache()
    • trimLogCache

      public RAFT trimLogCache()
    • logEntries

      public void logEntries(ObjLongConsumer<LogEntry> func)
    • createNewTerm

      public long createNewTerm()
    • findProtocol

      public static <T> T findProtocol(Class<T> clazz, org.jgroups.stack.Protocol start, boolean down)
    • addServer

      public CompletableFuture<byte[]> addServer(String name) throws Exception
      Specified by:
      addServer in interface DynamicMembership
      Throws:
      Exception
    • removeServer

      public CompletableFuture<byte[]> removeServer(String name) throws Exception
      Specified by:
      removeServer in interface DynamicMembership
      Throws:
      Exception
    • snapshot

      public void snapshot() throws Exception
      Creates a snapshot and truncates the log. See https://github.com/belaban/jgroups-raft/issues/7 for details
      Throws:
      Exception
    • snapshotAsync

      public CompletableFuture<Void> snapshotAsync()
    • initStateMachineFromLog

      public void initStateMachineFromLog() throws Exception
      Loads the log entries from [first .. commit_index] into the state machine
      Throws:
      Exception
    • init

      public void init() throws Exception
      Specified by:
      init in interface org.jgroups.Lifecycle
      Overrides:
      init in class org.jgroups.stack.Protocol
      Throws:
      Exception
    • start

      public void start() throws Exception
      Specified by:
      start in interface org.jgroups.Lifecycle
      Overrides:
      start in class org.jgroups.stack.Protocol
      Throws:
      Exception
    • stop

      public void stop()
      Specified by:
      stop in interface org.jgroups.Lifecycle
      Overrides:
      stop in class org.jgroups.stack.Protocol
    • down

      public Object down(org.jgroups.Event evt)
      Overrides:
      down in class org.jgroups.stack.Protocol
    • up

      public Object up(org.jgroups.Event evt)
      Overrides:
      up in class org.jgroups.stack.Protocol
    • up

      public Object up(org.jgroups.Message msg)
      Overrides:
      up in class org.jgroups.stack.Protocol
    • up

      public void up(org.jgroups.util.MessageBatch batch)
      Overrides:
      up in class org.jgroups.stack.Protocol
    • flushCommitTable

      public void flushCommitTable()
    • flushCommitTable

      public void flushCommitTable(org.jgroups.Address member)
      Triggers a flush of the entries to the given member.
      Parameters:
      member - The not-null member address to send the entries.
      Throws:
      IllegalStateException - Thrown in case the current node is not the leader.
      NullPointerException - Thrown in case the is null.
    • setAsync

      public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, Options options)
      Called by a building block to apply a change to all state machines in a cluster. This starts the consensus protocol to get a majority to commit this change.

      This call is non-blocking and returns a future as soon as the AppendEntries message has been sent. Only applicable on the leader

      Specified by:
      setAsync in interface Settable
      Parameters:
      buf - The command
      offset - The offset into the buffer
      length - The length of the buffer
      options - Options to pass to the call, may be null
      Returns:
      A CompletableFuture. Can be used to wait for the result (sync). A blocking caller could call set(), then call future.get() to block for the result.
    • getAsync

      public CompletableFuture<byte[]> getAsync(byte[] buf, int offset, int length, Options options) throws Exception
      Description copied from interface: Settable
      Asynchronous get operation that returns immediately without blocking.

      This method submits a read-only operation to the state machine. Read-only operations are treated differently by the replication algorithm. Since read-only operations do not change the state-machine state, these operations are not appended to the replicated log.

      Warning: Do not change the state-machine state by operations submitted through this method. Otherwise, the state-machine will diverge and lead to an undefined state.

      Specified by:
      getAsync in interface Settable
      Parameters:
      buf - The buffer representing the read-only operation to submit to the state machine.
      offset - The offset to skip the bytes in the buffer.
      length - The number of bytes to use from the buffer starting at offset.
      options - Options to pass along in the call chain, may be null.
      Returns:
      A buffer representing the result after submitting the read-only operation.
      Throws:
      Exception - Thrown if the operation could not be submitted.
    • setAsync

      public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, boolean internal, Options options)
    • toString

      public String toString()
      Overrides:
      toString in class org.jgroups.stack.Protocol
    • add

      protected void add(RAFT.Request r)
    • offer

      protected void offer(RAFT.Request r)
    • handleDownRequest

      protected void handleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length, boolean internal, Options opts, boolean readOnly)
      This method is always called by a single thread only, and does therefore not need to be reentrant
    • handleUpRequest

      public void handleUpRequest(org.jgroups.Message msg, RaftHeader hdr)
    • processQueue

      protected void processQueue()
    • process

      protected void process(List<RAFT.Request> q)
    • createRequestTable

      protected void createRequestTable()
      Populate with non-committed entries (from log) (https://github.com/belaban/jgroups-raft/issues/31)
    • createCommitTable

      protected void createCommitTable()
    • _addServer

      protected void _addServer(String name)
    • _removeServer

      protected void _removeServer(String name)
    • sendAppendEntriesMessage

      protected void sendAppendEntriesMessage(org.jgroups.Address member, CommitTable.Entry e)
      Runs (on the leader) as part of the queue handling loop: checks if all members (except the leader) in the commit table have received all messages and resends AppendEntries messages to members who haven't.
      For each member, a next-index and match-index is maintained: next-index is the index of the next message to send to that member (initialized to last-applied) and match-index is the index of the highest message known to have been received by the member.
      Messages are resent to a given member as long as that member's match-index is smaller than its next-index. When match_index == next_index, message resending for that member is stopped. When a new message is sent, next-index is incremented (on reception of the AppendResult) and resending starts again.
    • changeMembers

      protected CompletableFuture<byte[]> changeMembers(String name, InternalCommand.Type type) throws Exception
      Throws:
      Exception
    • resend

      protected void resend(org.jgroups.Address target, long index)
    • resend

      protected void resend(org.jgroups.Address target, long from, long to)
      Resends all entries in range [from .. to] to target
    • sendSnapshotTo

      protected void sendSnapshotTo(org.jgroups.Address dest) throws Exception
      Throws:
      Exception
    • commitLogTo

      protected RAFT commitLogTo(long index_inclusive, boolean serialize_response)
      Tries to move commit_index up to index_inclusive, apply the entries in [commit_index+1 .. index_inclusive] to the state machine and notify the clients for each entry. There is no need to check if an entry is committed in RequestTable, as this was done before calling this method.
      Parameters:
      index_inclusive - The index to which to move commit_index
      serialize_response - When true, the response of applying a change to the state machine needs to be serialized into a byte[] array, otherwise null can be returned (reducing serialization cost)
    • append

      protected boolean append(long index, LogEntries entries)
      Appends to the log and returns true if added or false if not (e.g. because the entry already existed
    • deleteAllLogEntriesStartingFrom

      protected void deleteAllLogEntriesStartingFrom(long index)
    • snapshotIfNeeded

      protected void snapshotIfNeeded(int bytes_added)
    • takeSnapshot

      protected void takeSnapshot() throws Exception
      Throws:
      Exception
    • applyCommits

      protected long applyCommits(long to_inclusive, boolean serialize_response)
      Applies log entries [commit_index+1 .. to_inclusive] to the state machine and notifies clients in RequestTable.
      Parameters:
      to_inclusive - The end index (inclusive) of the log entries to apply
      serialize_response - Whether or not StateMachine.apply(byte[], int, int, boolean) needs to return a serialized response
      Returns:
      The last index of the range of log entries that was successfuly applied (normally this is to_inclusive)
    • applyCommit

      protected void applyCommit(long index, boolean serialize_response) throws Exception
      Applies the commit at index
      Throws:
      Exception
    • handleView

      public void handleView(org.jgroups.View view)
    • setLeaderAndTerm

      public RAFT setLeaderAndTerm(org.jgroups.Address new_leader)
    • setLeaderAndTerm

      public RAFT setLeaderAndTerm(org.jgroups.Address new_leader, long new_term)
      Sets the new leader and term
    • trySetLeaderAndTerm

      public int trySetLeaderAndTerm(org.jgroups.Address newLeader, long newTerm)
    • notify

      protected static <T> void notify(RequestTable.Entry<T> e, byte[] rsp)
    • notify

      protected static <T> void notify(RequestTable.Entry<T> e, Throwable t)
    • changeRole

      protected RAFT changeRole(Role new_role)
    • executeInternalCommand

      protected void executeInternalCommand(InternalCommand cmd, byte[] buf, int offset, int length)
      If cmd is not null, execute it. Else parse buf into InternalCommand then call cmd.execute()
    • createLogName

      protected String createLogName(String name, String suffix)
    • notifyRoleChangeListeners

      protected void notifyRoleChangeListeners(Role role)
    • duplicatesInView

      protected boolean duplicatesInView(org.jgroups.View view)
      Checks if a given view contains duplicate raft-ids. Uses key raft-id in ExtendedUUID to compare
    • parseCommaDelimitedProps

      protected static Map<String,String> parseCommaDelimitedProps(String s)
    • computeMajority

      protected void computeMajority()