Class CounterService

java.lang.Object
org.jgroups.raft.blocks.CounterService
All Implemented Interfaces:
RAFT.RoleChange, StateMachine

public class CounterService extends Object implements StateMachine, RAFT.RoleChange
Provides a consensus based distributed counter (similar to AtomicLong) which can be atomically updated across a cluster.
Since:
0.2
Author:
Bela Ban
  • Field Details

    • raft

      protected RaftHandle raft
    • repl_timeout

      protected long repl_timeout
    • allow_dirty_reads

      protected boolean allow_dirty_reads
      If true, reads can return the local counter value directly. Else, reads have to go through the leader
    • counters

      protected final Map<String,Long> counters
  • Constructor Details

    • CounterService

      public CounterService(org.jgroups.JChannel ch)
  • Method Details

    • setChannel

      public void setChannel(org.jgroups.JChannel ch)
    • channel

      public org.jgroups.JChannel channel()
    • addRoleChangeListener

      public void addRoleChangeListener(RAFT.RoleChange listener)
    • replTimeout

      public long replTimeout()
    • replTimeout

      public CounterService replTimeout(long timeout)
    • allowDirtyReads

      public boolean allowDirtyReads()
    • allowDirtyReads

      public CounterService allowDirtyReads(boolean flag)
    • lastApplied

      public long lastApplied()
    • commitIndex

      public long commitIndex()
    • snapshot

      public void snapshot() throws Exception
      Throws:
      Exception
    • logSize

      public long logSize()
    • raftId

      public String raftId()
    • raftId

      public CounterService raftId(String id)
    • getOrCreateCounter

      public RaftSyncCounter getOrCreateCounter(String name, long initial_value) throws Exception
      Returns an existing counter, or creates a new one if none exists. This is a cluster-wide operation which would fail if no leader is elected.
      Parameters:
      name - Name of the counter, different counters have to have different names
      initial_value - The initial value of a new counter if there is no existing counter. Ignored if the counter already exists
      Returns:
      The counter implementation
      Throws:
      Exception
    • deleteCounter

      public void deleteCounter(String name) throws Exception
      Deletes a counter instance (on the coordinator)
      Parameters:
      name - The name of the counter. No-op if the counter doesn't exist
      Throws:
      Exception
    • deleteCounterAsync

      public CompletionStage<Void> deleteCounterAsync(String name)
      Deletes a counter instance.
      Parameters:
      name - The name of the counter. No-op if the counter doesn't exist
      Returns:
      Returns a CompletionStage which is completed when the majority reach consensus.
    • printCounters

      public String printCounters()
    • apply

      public byte[] apply(byte[] data, int offset, int length, boolean serialize_response) throws Exception
      Description copied from interface: StateMachine
      Applies a command to the state machine. The contents of the byte[] buffer are interpreted by the state machine. The command could for example be a set(), remove() or clear() command.
      Specified by:
      apply in interface StateMachine
      Parameters:
      data - The byte[] buffer
      offset - The offset at which the data starts
      length - The length of the data
      serialize_response - If true, serialize and return the response, else return null
      Returns:
      A serialized response value, or null (e.g. if the method returned void)
      Throws:
      Exception - Thrown on deserialization or other failure
    • writeContentTo

      public void writeContentTo(DataOutput out) throws Exception
      Description copied from interface: StateMachine
      Writes the contents of the state machine to an output stream. This is typically called on the leader to provide state to a new node, or a node that's lagging far behind.

      Updates to the state machine may need to be put on hold while the state is written to the output stream.

      Specified by:
      writeContentTo in interface StateMachine
      Parameters:
      out - The output stream
      Throws:
      Exception
    • readContentFrom

      public void readContentFrom(DataInput in) throws Exception
      Description copied from interface: StateMachine
      Reads the contents of the state machine from an input stream.

      This can be the case when an InstallSnapshot RPC is used to bootstrap a new node, or a node that's lagging far behind. The parsing depends on the concrete state machine implementation, but the idea is that the stream is a sequence of commands, each of which can be passed to StateMachine.apply(byte[], int, int, boolean).

      The state machine may need to block modifications until the contents have been set (unless e.g. copy-on-write is used). The state machine implementation may need to remove all contents before populating itself from the stream.

      Specified by:
      readContentFrom in interface StateMachine
      Parameters:
      in - The input stream
      Throws:
      Exception
    • readAndDumpSnapshot

      public static String readAndDumpSnapshot(DataInput in)
    • dumpLog

      public void dumpLog()
    • dumpLogEntry

      public static String dumpLogEntry(LogEntry e)
    • roleChanged

      public void roleChanged(Role role)
      Specified by:
      roleChanged in interface RAFT.RoleChange
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • asyncCounter

      public RaftAsyncCounter asyncCounter(String name)
      Returns an RaftAsyncCounter instance of the counter.

      This is local operation, and it does not create the counter in the raft log.

      Parameters:
      name - Name of the counter, different counters have to have different names.
      Returns:
      The RaftAsyncCounter instance
    • getOrCreateAsyncCounter

      public CompletionStage<RaftAsyncCounter> getOrCreateAsyncCounter(String name, long initialValue)
      Returns an existing counter, or creates a new one if none exists.

      This is a cluster-wide operation which would fail if no leader is elected.

      Parameters:
      name - Name of the counter, different counters have to have different names
      initialValue - The initial value of a new counter if there is no existing counter. Ignored if the counter already exists
      Returns:
      The RaftAsyncCounter implementation.
    • asyncGet

      protected CompletionStage<Long> asyncGet(org.jgroups.util.AsciiString name)
    • asyncSet

      protected CompletionStage<Void> asyncSet(org.jgroups.util.AsciiString name, long value)
    • asyncAddAndGet

      protected CompletionStage<Long> asyncAddAndGet(org.jgroups.util.AsciiString name, long delta, Options opts)
    • asyncCompareAndSwap

      protected CompletionStage<Long> asyncCompareAndSwap(org.jgroups.util.AsciiString name, long expected, long value, Options opts)
    • asyncUpdate

      protected <T extends org.jgroups.util.Streamable> CompletionStage<T> asyncUpdate(org.jgroups.util.AsciiString name, org.jgroups.blocks.atomic.CounterFunction<T> function, Options options)
    • invokeAsyncAndGet

      protected CompletionStage<Long> invokeAsyncAndGet(CounterService.Command command, org.jgroups.util.AsciiString name, Options opts)
    • invokeAsyncAddAndGet

      protected CompletionStage<Long> invokeAsyncAddAndGet(org.jgroups.util.AsciiString name, long arg, Options opts)
    • invokeAsync

      protected CompletionStage<Void> invokeAsync(CounterService.Command command, org.jgroups.util.AsciiString name, long arg)
    • print

      protected static String print(CounterService.Command command, String name, int num_args, DataInput in)
    • _create

      protected long _create(String name, long initial_value)
    • _delete

      protected void _delete(String name)
    • _get

      protected long _get(String name)
    • _set

      protected void _set(String name, long new_val)
    • _add

      protected long _add(String name, long delta)
    • _compareAndSwap

      protected long _compareAndSwap(String name, long expected, long value)
    • _update

      protected <T extends org.jgroups.util.Streamable> T _update(String name, org.jgroups.blocks.atomic.CounterFunction<T> function)