public interface DistributedGroup extends io.atomix.resource.Resource<DistributedGroup>
The distributed group resource facilitates managing group membership within an Atomix cluster. Membership is
managed by nodes joining and leaving the group, and instances
of the group throughout the cluster are notified on changes to the structure of the group. Groups can elect a
leader, and members can communicate directly with one another or through persistent queues.
Groups membership is managed in a replicated state machine. When a member joins the group, the join request
is replicated, the member is added to the group, and the state machine notifies instances of the
DistributedGroup of the membership change. In the event that a group instance becomes disconnected from
the cluster and its session times out, the replicated state machine will automatically remove the member
from the group and notify the remaining instances of the group of the membership change.
To create a membership group resource, use the DistributedGroup class or constructor:
atomix.getGroup("my-group").thenAccept(group -> {
...
});
members() list
as it is not yet a member of the group. Once the instance has been created, the user must join the group
via join():
group.join().thenAccept(member -> {
System.out.println("Joined with member ID: " + member.id());
});
Once the group has been joined, the members() list provides an up-to-date view of the group which will
be automatically updated as members join and leave the group. To be explicitly notified when a member joins or
leaves the group, use the onJoin(Consumer) or onLeave(Consumer) event consumers respectively:
group.onJoin(member -> {
System.out.println(member.id() + " joined the group!");
});
members() getter:
DistributedGroup group = atomix.getGroup("foo").get();
for (GroupMember member : group.members()) {
...
}
Once the group instance has been created, the group membership will be automatically updated each time the structure
of the group changes. However, in the event that the client becomes disconnected from the cluster, it may not receive
notifications of changes in the group structure.
DistributedGroup supports a concept of persistent members that requires members to explicitly
leave the group to be removed from it. Persistent member tasks will remain
in a failed member's queue until the member recovers.
In order to support recovery, persistent members must be configured with a user-provided member ID.
The member ID is provided when the member joins the group, and providing a member ID is
all that's required to create a persistent member.
DistributedGroup group = atomix.getGroup("persistent-members").get();
LocalGroupMember memberA = group.join("a").get();
LocalGroupMember memberB = group.join("b").get();
Persistent members are not limited to a single node. If a node crashes, any persistent members that existed
on that node may rejoin the group on any other node. Persistent members rejoin simply by calling join(String)
with the unique member ID. Once a persistent member has rejoined the group, its session will be updated and any
tasks remaining in the member's MessageService will be published to the member.
Persistent member state is retained only inside the group's replicated state machine and not on clients.
From the perspective of DistributedGroup instances in a cluster, in the event that the node on which
a persistent member is running fails, the member will leave the group. Once the persistent
member rejoins the group, onJoin(Consumer) will be called again on each group instance in the cluster.
DistributedGroup resource facilitates leader election which can be used to coordinate a group by
ensuring only a single member of the group performs some set of operations at any given time. Leader election
is a core concept of membership groups, and because leader election is a low-overhead process, leaders are
elected for each group automatically.
Leaders are elected using a fair policy. The first member to join a group will always become the
initial group leader. Each unique leader in a group is associated with a term. The term
represents a globally unique, monotonically increasing token that can be used for fencing. Users can listen for
changes in group terms and leaders with event listeners:
DistributedGroup group = atomix.getGroup("election-group").get();
group.election().onElection(term -> {
...
});
The term is guaranteed to be unique for each leader and is
guaranteed to be monotonically increasing. Each instance of a group is guaranteed to see the same leader for the
same term, and no two leaders can ever exist in the same term. In that sense, the terminology and constraints of
leader election in Atomix borrow heavily from the Raft consensus algorithm that underlies it.
MessageService. Direct messaging between group members is reliable and is done as writes to the Atomix cluster.
Messages are held in memory within the Atomix cluster and are published to consumers using Copycat's session event
framework. Messages are guaranteed to be delivered to consumers in the order in which they were sent by a producer.
Because each message is dependent on at least one or more writes to the Atomix cluster, messaging is not intended
to support high-throughput use cases. Group messaging is designed for coordinating group behaviors. For example,
a leader can instruct a random member to perform a task through the messaging API.
GroupMember's
MessageClient.
GroupMember member = group.member("foo");
MessageProducer<String> producer = member.messaging().producer("bar");
producer.send("baz").thenRun(() -> {
// Message acknowledged
});
Users can specify the criteria by which a producer determines when a message is completed by configuring the
producer's Execution policy. To configure the execution
policy, pass MessageProducer.Options when creating a
MessageProducer.
MessageProducer.Options options = new MessageProducer.Options()
.withExecution(MessageProducer.Execution.SYNC);
MessageProducer<String> producer = member.messaging().producer("bar", options);
Producers can be configured to send messages using three execution policies:
SYNC sends messages to consumers
and awaits acknowledgement from the consumer side of the queue. If a producer is producing to an entire group,
synchronous producers will await acknowledgement from all members of the group.ASYNC awaits acknowledgement of
persistence in the cluster but not acknowledgement that messages have been received and processed by consumers.REQUEST_REPLY awaits
arbitrary responses from all consumers to which a message is sent. If a message is sent to a group of consumers,
message reply futures will be completed with a list of reply values.MessageProducer is configured with the
ASYNC execution policy, the CompletableFuture
returned by the MessageProducer.send(Object) method will be completed as soon as
the message is persisted in the cluster.
MessageClient that allows users to broadcast messages to all members of a
group or send a direct message to a random member of a group. To use the group-wide message client, use the
messaging() getter.
MessageProducer<String> producer = group.messaging().producer("foo");
producer.send("Hello world!").thenRun(() -> {
// Message delivered to all group members
});
By default, messages sent through the group-wide message producer will be sent to all members of the group.
But just as Execution policies can be used to define the
criteria by which message operations are completed, the Delivery
policy can be used to define how messages are delivered when using a group-wide producer.
MessageProducer.Options options = new MessageProducer.Options()
.withDelivery(MessageProducer.Delivery.RANDOM);
MessageProducer<String> producer = member.messaging().producer("bar", options);
Group-wide producers can be configured with the following Delivery
policies:
MessageProducer.Delivery.RANDOM producers send each message to a random
member of the group. In the event that a message is not successfully acknowledged by a
member and that member fails or leaves the group, random messages will be redelivered to remaining members
of the group.MessageProducer.Delivery.BROADCAST producers send messages to all available
members of a group. This option applies only to producers constructed from DistributedGroup
messaging clients.Execution policies
described above. For example, a group-wide producer configured with the
REQUEST_REPLY execution policy and
the BROADCAST delivery policy will send each
message to all members of the group and aggregate replies into a Collection once all consumers have replied
to the message.
LocalMember's
MessageService. Only the node to which a member belongs can listen for messages sent to that member. Thus,
to listen for messages, join a group and create a MessageConsumer.
LocalMember localMember = group.join().join();
MessageConsumer<String> consumer = localMember.messaging().consumer("foo");
consumer.onMessage(message -> {
message.ack();
});
When a message is received, consumers must always Message.ack() or Message.reply(Object) to the message.
Failure to ack or reply to a message will result in a memory leak in the cluster and failure to deliver any additional
messages to the consumer. When a consumer acknowledges a message, the message will be removed from memory in the cluster
and the producer that sent the message will be notified according to its configuration.
ASYNC execution
policy. A message can be persisted but may never actually be delivered and acknowledged. To ensure that direct messages
are eventually delivered, persistent members must be used.
LocalMember member = group.join("member-1").join();
MessageConsumer<String> consumer = member.messaging().consumer("foo");
consumer.onMessage(message -> {
...
});
When a message is sent to a persistent member, the message will be persisted in the cluster until it can be delivered
to that member regardless of whether the member is actively connected to the cluster. If the persistent member crashes,
once the member rejoins the group pending messages will be delivered. Persistent members are also free to switch nodes
to rejoin the group on live nodes, and pending messages will still be redelivered.
Users must take care, however, when using persistent members. BROADCAST
messages sent to groups with persistent members that are not connected to the cluster will be persisted in memory in the
cluster until they can be delivered. If the producer that broadcasts the message is configured to await acknowledgement
or replies from members, producer send operations cannot
be completed until dead members rejoin the group.
Serializer
which can be access via Resource.serializer() or on the parent Atomix instance. Because objects are
typically replicated throughout the cluster, it's critical that any object sent from any node should be
serializable by all other nodes.
Users should register serializable types before performing any operations on the group.
DistributedGroup group = atomix.getGroup("group").get();
group.serializer().register(User.class, UserSerializer.class);
For the best performance from serialization, it is recommended that serializable types be registered with
unique type IDs. This allows the Catalyst Serializer to identify the
type by its serialization ID rather than its class name. It's essential that the ID for a given type is
the same all all nodes in the cluster.
group.serializer().register(User.class, 1, UserSerializer.class);
Users can also serialize Serializable types by simply registering the class without any
other serializer. Catalyst will attempt to use the optimal serializer based on the interfaces implemented
by the class. Alternatively, type registration can be disabled altogether via Serializer.disableWhitelist(),
however this is not recommended as arbitrary deserialization of class names is slow and is a security risk.
StateMachine. When a
DistributedGroup is created, an instance of the group state machine is created on each replica in
the cluster. The state machine instance manages state for the specific membership group. When a member
joins the group, a join request is sent to the cluster and logged and replicated before
being applied to the group state machine. Once the join request has been committed and applied to the
state machine, the group state is updated and existing group members are notified by
publishing state change
notifications to open instances of the group. Membership change event notifications are received by all
open instances of the resource.
Leader election is performed by the group state machine. When the first member joins the group, that
member will automatically be assigned as the group member. Each time an additional member joins the group,
the new member will be placed in a leader queue. In the event that the current group leader's
Session expires or is closed, the group state machine will assign a new
leader by pulling from the leader queue and will publish an elect event to all remaining group
members. Additionally, for each new leader of the group, the state machine will publish a term change
event, providing a globally unique, monotonically increasing token uniquely associated with the new leader.
To track group membership, the group state machine tracks the state of the Session
associated with each open instance of the group. In the event that the session expires or is closed, the group
member associated with that session will automatically be removed from the group and remaining instances
of the group will be notified.
The group state machine facilitates direct and broadcast messaging through writes to the Atomix cluster. Each message sent to a group or a member of a group is committed as a single write to the cluster. Once persisted in the cluster, messages are delivered to clients through the state machine's session events API. The group state machine delivers messages to sessions based on the configured per-message delivery policy, and client-side group instances are responsible for dispatching received messages to the appropriate consumers. When a consumer acknowledges or replies to a message, another write is commited to the Atomix cluster, and the group state machine completes the associated message.
The group state machine manages compaction of the replicated log by tracking which state changes contribute to the state of the group at any given time. For instance, when a member joins the group, the commit that added the member to the group contributes to the group's state as long as the member remains a part of the group. Once the member leaves the group or its session is expired, the commit that created and remove the member no longer contribute to the group's state and are therefore released from the state machine and will be removed from the log during compaction.
| Modifier and Type | Interface and Description |
|---|---|
static class |
DistributedGroup.Config
Configuration for cluster-wide
DistributedGroups. |
static class |
DistributedGroup.Options
Distributed group options.
|
| Modifier and Type | Method and Description |
|---|---|
Election |
election()
Returns the group election.
|
CompletableFuture<LocalMember> |
join()
Joins the instance to the membership group.
|
CompletableFuture<LocalMember> |
join(Object metadata)
Joins the instance to the membership group with a user-provided member ID.
|
CompletableFuture<LocalMember> |
join(String memberId)
Joins the instance to the membership group with a user-provided member ID.
|
CompletableFuture<LocalMember> |
join(String memberId,
Object metadata)
Joins the instance to the membership group with a user-provided member ID.
|
GroupMember |
member(String memberId)
Gets a group member by ID.
|
Collection<GroupMember> |
members()
Gets the collection of all members in the group.
|
MessageClient |
messaging()
Returns the group message client.
|
Listener<GroupMember> |
onJoin(Consumer<GroupMember> listener)
Adds a listener for members joining the group.
|
Listener<GroupMember> |
onLeave(Consumer<GroupMember> listener)
Adds a listener for members leaving the group.
|
CompletableFuture<Void> |
remove(String memberId)
Removes the member with the given member ID from the group.
|
Election election()
The returned election is specific to this group's set of members. The Term defined by the returned
election will not necessarily be reflected in any subgroups of this group.
MessageClient messaging()
The returned message client is group-wide and can be used to broadcast messages to all members of the group or to random members of the group.
GroupMember member(String memberId)
If the member with the given ID has not joined the membership group, the resulting
GroupMember will be null.
memberId - The member ID for which to return a GroupMember.memberId or null if it is not a known member of the group.Collection<GroupMember> members()
The group members are fetched from the cluster. If any GroupMember instances have been referenced
by this membership group instance, the same object will be returned for that member.
This method returns a CompletableFuture which can be used to block until the operation completes
or to be notified in a separate thread once the operation completes. To block until the operation completes,
use the CompletableFuture.join() method to block the calling thread:
Collection<GroupMember> members = group.members().get();
Alternatively, to execute the operation asynchronous and be notified once the lock is acquired in a different
thread, use one of the many completable future callbacks:
group.members().thenAccept(members -> {
members.forEach(member -> {
member.send("test", "Hello world!");
});
});
CompletableFuture<LocalMember> join()
Joining the group results in a new member being created and joining the group. Each DistributedGroup
instance may represent multiple members of a group. The returned CompletableFuture will be completed
with the joined LocalMember object once the member has joined the group, but does not guarantee that
all other instances of the group have seen the newly joined member.
This method returns a CompletableFuture which can be used to block until the operation completes
or to be notified in a separate thread once the operation completes. To block until the operation completes,
use the CompletableFuture.join() method to block the calling thread:
group.join().join();
Alternatively, to execute the operation asynchronous and be notified once the lock is acquired in a different
thread, use one of the many completable future callbacks:
group.join().thenAccept(thisMember -> System.out.println("This member is: " + thisMember.id()));
CompletableFuture<LocalMember> join(String memberId)
Joining the group results in a new member being created and joining the group. Each DistributedGroup
instance may represent multiple members of a group. The returned CompletableFuture will be completed
with the joined LocalMember object once the member has joined the group, but does not guarantee that
all other instances of the group have seen the newly joined member.
When joining a group with a user-provided memberId, a persistent member is created. In the event that this
node crashes, the member may rejoin the group on any node with the same memberId and receive pending messages.
While the persistent member is disconnected from the cluster, it will not appear in the group members()
list but its state will not be removed from the cluster.
This method returns a CompletableFuture which can be used to block until the operation completes
or to be notified in a separate thread once the operation completes. To block until the operation completes,
use the CompletableFuture.join() method to block the calling thread:
group.join("foo").join();
Alternatively, to execute the operation asynchronous and be notified once the lock is acquired in a different
thread, use one of the many completable future callbacks:
group.join("foo").thenAccept(thisMember -> System.out.println("This member is: " + thisMember.id()));
memberId - The unique member ID to assign to the member.CompletableFuture<LocalMember> join(Object metadata)
Joining the group results in a new member being created and joining the group. Each DistributedGroup
instance may represent multiple members of a group. The returned CompletableFuture will be completed
with the joined LocalMember object once the member has joined the group, but does not guarantee that
all other instances of the group have seen the newly joined member.
metadata provided when a persistent member joins a group can be viewed by all other instances of the
same group. Metadata objects mut be serializable either via Java's Serializable or by registering a
Catalyst TypeSerializer on the group's Resource.serializer().
This method returns a CompletableFuture which can be used to block until the operation completes
or to be notified in a separate thread once the operation completes. To block until the operation completes,
use the CompletableFuture.join() method to block the calling thread:
group.join("foo").join();
Alternatively, to execute the operation asynchronous and be notified once the lock is acquired in a different
thread, use one of the many completable future callbacks:
group.join("foo").thenAccept(thisMember -> System.out.println("This member is: " + thisMember.id()));
metadata - Metadata to assign to the joined group member.CompletableFuture<LocalMember> join(String memberId, Object metadata)
Joining the group results in a new member being created and joining the group. Each DistributedGroup
instance may represent multiple members of a group. The returned CompletableFuture will be completed
with the joined LocalMember object once the member has joined the group, but does not guarantee that
all other instances of the group have seen the newly joined member.
When joining a group with a user-provided memberId, a persistent member is created. In the event that this
node crashes, the member may rejoin the group on any node with the same memberId and receive pending messages.
While the persistent member is disconnected from the cluster, it will not appear in the group members()
list but its state will not be removed from the cluster.
metadata provided when a persistent member joins a group can be viewed by all other instances of the
same group. Metadata objects mut be serializable either via Java's Serializable or by registering a
Catalyst TypeSerializer on the group's Resource.serializer().
This method returns a CompletableFuture which can be used to block until the operation completes
or to be notified in a separate thread once the operation completes. To block until the operation completes,
use the CompletableFuture.join() method to block the calling thread:
group.join("foo", new MyMetadata()).join();
Alternatively, to execute the operation asynchronous and be notified once the lock is acquired in a different
thread, use one of the many completable future callbacks:
group.join("foo", new MyMetadata()).thenAccept(thisMember -> System.out.println("This member is: " + thisMember.id()));
memberId - The unique member ID to assign to the member.metadata - Metadata to assign to the joined group member.Listener<GroupMember> onJoin(Consumer<GroupMember> listener)
The provided Consumer will be called each time a member joins the group. Note that
the join consumer will be called before the joining member's join() completes.
The returned Listener can be used to unregister the listener
when its use if finished.
listener - The join listener.CompletableFuture<Void> remove(String memberId)
memberId - The member ID of the member to remove from the group.Listener<GroupMember> onLeave(Consumer<GroupMember> listener)
The provided Consumer will be called each time a member leaves the group. Members can
leave the group either voluntarily or by crashing or otherwise becoming disconnected from the
cluster for longer than their session timeout. Note that the leave consumer will be called before
the leaving member's LocalMember.leave() completes.
The returned Listener can be used to unregister the listener
when its use if finished.
listener - The leave listener.Copyright © 2013–2017. All rights reserved.