Let's get started to use Raft in your application. To demonstrate how to use Ratis, we implement a simple Counter service, which maintains a counter value across a raft group. Clients could send the following types of requests to the raft group:
INCREMENT
: increase the counter value by 1. This command will trigger a transaction to change the state.GET
: query the current value of the counter. This is a read-only command since it does not change the state.We have the following enum
for representing the supported commands.
/** * The supported commands the Counter example. */ public enum CounterCommand { /** Increment the counter by 1. */ INCREMENT, /** Get the counter value. */ GET; private final Message message = Message.valueOf(name()); public Message getMessage() { return message; } /** Does the given command string match this command? */ public boolean matches(String command) { return name().equalsIgnoreCase(command); } }
Note: The source code of the Counter example and the other examples is at Ratis examples. This article intends to show the steps of integration of Ratis. If you wish to run the Counter example please refer to the README.
The first step is to add Ratis dependencies into the project. The dependencies are available in maven central:
<dependency> <artifactId>ratis-server</artifactId> <groupId>org.apache.ratis</groupId> </dependency>
Then, add one of the following transports:
In this example, we choose to use ratis-grpc:
<dependency> <artifactId>ratis-grpc</artifactId> <groupId>org.apache.ratis</groupId> </dependency>
Please note that Apache Hadoop dependencies are shaded, so it’s safe to use hadoop transport with different versions of Hadoop.
CounterStateMachine
A state machine manages the application logic. The state machine is responsible for:
INCREMENT
request, it will first be written to the raft log as a log entry. Once the log entry is committed, the state machine will be invoked for applying the log entry as a transaction so that the counter value will be increased by 1.GET
request, it will not be written to the raft log since it is a readonly request which does not change the state. The state machine should return the current value of the counter.We discuss how to implement CounterStateMachine
in the following subsections. The complete source code of it is in CounterStateMachine.java.
In this example, the CounterStateMachine
extends the BaseStateMachine
, which provides a base implementation of a StateMachine
.
Inside the CounterStateMachine
, there is a counter
object which stores the current value. The counter
is an AtomicInteger
in order to support concurrent access. We use the build-in SimpleStateMachineStorage
, which is a file-based storage implementation, as a storage for storing snapshots. The fields are shown below:
public class CounterStateMachine extends BaseStateMachine { // ... private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final AtomicInteger counter = new AtomicInteger(0); // ... }
Once a raft log entry has been committed (i.e. a majority of the servers have acknowledged), Ratis notifies the state machine by invoking the applyTransaction
method. The applyTransaction
method first validates the log entry. Then, it applies the log entry by increasing the counter value and updates the term-index. The code fragments are shown below. Note that the incrementCounter
method is synchronized in order to update both counter and last applied term-index atomically.
public class CounterStateMachine extends BaseStateMachine { // ... private synchronized int incrementCounter(TermIndex termIndex) { updateLastAppliedTermIndex(termIndex); return counter.incrementAndGet(); } // ... /** * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object. * * @param trx the transaction context * @return the message containing the updated counter value */ @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { final LogEntryProto entry = trx.getLogEntry(); //check if the command is valid final String command = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset()); if (!CounterCommand.INCREMENT.match(command)) { return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command)); } //increment the counter and update term-index final TermIndex termIndex = TermIndex.valueOf(entry); final long incremented = incrementCounter(termIndex); //if leader, log the incremented value and the term-index if (trx.getServerRole() == RaftPeerRole.LEADER) { LOG.info("{}: Increment to {}", termIndex, incremented); } //return the new value of the counter to the client return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented))); } // ... }
The INCREMENT
command is implemented in the previous section. What about the GET
command? Since the GET
command is a readonly command, it is implemented by the query
method instead of the applyTransaction
method. The code fragment is shown below.
public class CounterStateMachine extends BaseStateMachine { // ... /** * Process {@link CounterCommand#GET}, which gets the counter value. * * @param request the GET request * @return a {@link Message} containing the current counter value as a {@link String}. */ @Override public CompletableFuture<Message> query(Message request) { final String command = request.getContent().toString(Charset.defaultCharset()); if (!CounterCommand.GET.match(command)) { return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command)); } return CompletableFuture.completedFuture(Message.valueOf(counter.toString())); } // ... }
When taking a snapshot, the state is persisted in the storage of the state machine. The snapshot can be loaded for restoring the state in the future. In this example, we use ObjectOutputStream
to write the counter value to a snapshot file. The term-index is stored in the file name of the snapshot file. The code fragments are shown below. Note that the getState
method is synchronized in order to get the applied term-index and the counter value atomically. Note also that getting the counter value alone does not have to be synchronized since the counter
field is already an AtomicInteger
.
public class CounterStateMachine extends BaseStateMachine { // ... /** The state of the {@link CounterStateMachine}. */ static class CounterState { private final TermIndex applied; private final int counter; CounterState(TermIndex applied, int counter) { this.applied = applied; this.counter = counter; } TermIndex getApplied() { return applied; } int getCounter() { return counter; } } // ... /** @return the current state. */ private synchronized CounterState getState() { return new CounterState(getLastAppliedTermIndex(), counter.get()); } // ... /** * Store the current state as a snapshot file in the {@link #storage}. * * @return the index of the snapshot */ @Override public long takeSnapshot() { //get the current state final CounterState state = getState(); final long index = state.getApplied().getIndex(); //create a file with a proper name to store the snapshot final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index); //write the counter value into the snapshot file try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream( Files.newOutputStream(snapshotFile.toPath())))) { out.writeInt(state.getCounter()); } catch (IOException ioe) { LOG.warn("Failed to write snapshot file \"" + snapshotFile + "\", last applied index=" + state.getApplied()); } //return the index of the stored snapshot (which is the last applied one) return index; } // ... }
When loading a snapshot, we use an ObjectInputStream
to read the snapshot file. The term-index is read from the file name of the snapshot file. The code fragments are shown below. Note that the updateState
method is synchronized in order to update the applied term-index and the counter value atomically.
public class CounterStateMachine extends BaseStateMachine { // ... private synchronized void updateState(TermIndex applied, int counterValue) { updateLastAppliedTermIndex(applied); counter.set(counterValue); } // ... /** * Load the state of the state machine from the {@link #storage}. * * @param snapshot the information of the snapshot being loaded * @return the index of the snapshot or -1 if snapshot is invalid * @throws IOException if it failed to read from storage */ private long load(SingleFileSnapshotInfo snapshot) throws IOException { //check null if (snapshot == null) { LOG.warn("The snapshot info is null."); return RaftLog.INVALID_LOG_INDEX; } //check if the snapshot file exists. final Path snapshotPath = snapshot.getFile().getPath(); if (!Files.exists(snapshotPath)) { LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot); return RaftLog.INVALID_LOG_INDEX; } //read the TermIndex from the snapshot file name final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile()); //read the counter value from the snapshot file final int counterValue; try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) { counterValue = in.readInt(); } //update state updateState(last, counterValue); return last.getIndex(); } // ... }
initialize
and reinitialize
methods.The initialize
method is called at most once when the server is starting up. In contrast, the reinitialize
method is called when
PAUSE
state, orIn CounterStateMachine
, the reinitialize
method simply loads the latest snapshot and the initialize
method additionally initializes the BaseStateMachine
super class and the storage.
public class CounterStateMachine extends BaseStateMachine { // ... /** * Initialize the state machine storage and then load the state. * * @param server the server running this state machine * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup} * @param raftStorage the storage of the server * @throws IOException if it fails to load the state. */ @Override public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { super.initialize(server, groupId, raftStorage); storage.init(raftStorage); reinitialize(); } /** * Simply load the state. * * @throws IOException if it fails to load the state. */ @Override public void reinitialize() throws IOException { load(storage.getLatestSnapshot()); } // ... }
RaftGroup
In order to run a raft group, each server must start a RaftServer
instance, which is responsible for communicating to each other through the Raft protocol.
It's important to keep in mind that, each raft server knows the initial raft group when starting up. They know the number of raft peers in the group and the addresses of the peers.
In this example, we have a raft group with 3 peers. For simplicity, each peer listens to a specific port on the same machine. The addresses of them are defined in a property file as below.
raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
The peers are named as ‘n0’, ‘n1’ and ‘n2’ and they form a RaftGroup
. For more details, see Constants.java.
CounterServer
We use a RaftServer.Builder
to build a RaftServer
. We first set up a RaftProperties
object with a local directory as the storage of the server and a port number as the gRPC server port. Then, we create our CounterStateMachine
and pass everything to the builder as below.
public final class CounterServer implements Closeable { private final RaftServer server; public CounterServer(RaftPeer peer, File storageDir) throws IOException { //create a property object final RaftProperties properties = new RaftProperties(); //set the storage directory (different for each peer) in the RaftProperty object RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); //set the port (different for each peer) in RaftProperty object final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort(); GrpcConfigKeys.Server.setPort(properties, port); //create the counter state machine which holds the counter value final CounterStateMachine counterStateMachine = new CounterStateMachine(); //build the Raft server this.server = RaftServer.newBuilder() .setGroup(Constants.RAFT_GROUP) .setProperties(properties) .setServerId(peer.getId()) .setStateMachine(counterStateMachine) .build(); } // ... }
Now we are ready to start our CounterServer
peers and form a raft group. The command is:
java org.apache.ratis.examples.counter.server.CounterServer peer_index
The argument peer_index
must be 0, 1 or 2.
After a server is started, it communicates with other peers in the group, and performs raft actions such as leader election and append-log-entries. After all three servers are started, the counter service is up and running with the Raft protocol.
For more details, see CounterServer.java.
CounterClient
We use a RaftGroup
to build a RaftClient
and then use the RaftClient
to send commands to the raft service. Note that gRPC is the default RPC type so that we may skip setting it in the RaftProperties
.
public final class CounterClient implements Closeable { private final RaftClient client = RaftClient.newBuilder() .setProperties(new RaftProperties()) .setRaftGroup(Constants.RAFT_GROUP) .build(); // ... }
With this raft client, we can then send commands using the BlockingApi
returned by RaftClient.io()
, or the AsyncApi
returned by RaftClient.async()
. The send
method in the BlockingApi
/AsyncApi
is used to send the INCREMENT
command as below.
client.io().send(CounterCommand.INCREMENT.getMessage());
or
client.async().send(CounterCommand.INCREMENT.getMessage());
The sendReadonly
method in the BlockingApi
/AsyncApi
is used to send the GET
command as below.
client.io().sendReadOnly(CounterCommand.GET.getMessage());
or
client.async().sendReadOnly(CounterCommand.GET.getMessage());
For more details, see CounterClient.java.