To be reactive, distributed applications must deal gracefully with temporary and prolonged outages as well as have the ability to scale up and down to make the best use of resources. Apache Pekko Cluster provides these capabilities so that you don't have to implement them yourself. The distributed workers example demonstrates the following Apache Pekko clustering capabilities:
The design is based on Derek Wyatt's blog post Balancing Workload Across Nodes with Akka 2 from 2012, which is a bit old, but still a good description of the advantages of letting the workers pull work from the work manager instead of pushing work to the workers.
To run the example:
sbt run
After waiting a few seconds for the cluster to form the output should start look something like this (scroll all the way to the right to see the Actor output):
[INFO] [07/21/2017 17:41:53.320] [ClusterSystem-pekko.actor.default-dispatcher-16] [pekko://ClusterSystem@127.0.0.1:51983/user/producer] Produced work: 3 [INFO] [07/21/2017 17:41:53.322] [ClusterSystem-pekko.actor.default-dispatcher-3] [pekko://ClusterSystem@127.0.0.1:7345/user/master/singleton] Accepted work: 3bce4d6d-eaae-4da6-b316-0c6f566f2399 [INFO] [07/21/2017 17:41:53.328] [ClusterSystem-pekko.actor.default-dispatcher-3] [pekko://ClusterSystem@127.0.0.1:7345/user/master/singleton] Giving worker 2b646020-6273-437c-aa0d-4aad6f12fb47 some work 3bce4d6d-eaae-4da6-b316-0c6f566f2399 [INFO] [07/21/2017 17:41:53.328] [ClusterSystem-pekko.actor.default-dispatcher-2] [pekko://ClusterSystem@127.0.0.1:51980/user/worker] Got work: 3 [INFO] [07/21/2017 17:41:53.328] [ClusterSystem-pekko.actor.default-dispatcher-16] [pekko://ClusterSystem@127.0.0.1:51980/user/worker] Work is complete. Result 3 * 3 = 9. [INFO] [07/21/2017 17:41:53.329] [ClusterSystem-pekko.actor.default-dispatcher-19] [pekko://ClusterSystem@127.0.0.1:7345/user/master/singleton] Work 3bce4d6d-eaae-4da6-b316-0c6f566f2399 is done by worker 2b646020-6273-437c-aa0d-4aad6f12fb47
Now take a look at what happened under the covers.
When Main
is run without any parameters, it starts six ActorSystem
s in the same JVM. These six ActorSystem
s form a single cluster. The six nodes include two each that perform front-end, back-end, and worker tasks:
Let's look at the details of each part of the application, starting with the front-end.
Typically in systems built with Apache Pekko, clients submit requests using a RESTful API or a gRPC API. Either Pekko HTTP or Play Framework are great choices for implementing an HTTP API for the front-end, Pekko gRPC can be used of a gRPC front end is preferred.
To limit the scope of this example, we have chosen to emulate client activity with two ordinary actors:
FrontEnd
actor generates payloads at random intervals and sends them to the ‘WorkManager’ actor.WorkResultConsumerActor
that consumes results and logs them.The FrontEnd
actor only concerns itself with posting workloads, and does not care when the work has been completed. When a workload has been processed successfully and passed to the WorkManager
actor it publishes the result to all interested cluster nodes through Distributed Pub-Sub.
The WorkResultConsumerActor
subscribes to the completion events and logs when a workload has completed.
Now, let's take a look at the code that accomplishes this front-end behavior.
Note in the source code that as the ‘FrontEnd’ actor starts up, it:
The FrontEnd
actor schedules Tick
messages to itself when starting up. the Tick
message then triggers creation of a new Work
, sending the work to the WorkManager
actor on a back-end
node and switching to a new busy
behavior.
The cluster contains one WorkManager
actor. The FrontEnd
actor does not need to know the exact location because it sends work to the masterProxy
that is a cluster singleton proxy.
The ‘WorkManager’ actor can accept or deny a work request and we need to deal with unexpected errors:
self
and a Retry
is scheduled.When a workload has been acknowledged by the master, the actor goes back to the idle
behavior which schedules a Tick
to start the process again.
If the work is not accepted or there is no response, for example if the message or response got lost, the FrontEnd
actor backs off a bit and then sends the work again.
You can see how actors on a front-end node are started in the method Main.start
when the node contains the front-end
role.
As mentioned in the introduction, results are published using Distributed Pub-Sub. The ‘WorkResultConsumerActor’ subscribes to completion events and logs when a workload has completed.
In an actual application you would probably want a way for clients to poll or stream the status changes of the submitted work.
The back-end nodes host the WorkManager
actor, which manages work, keeps track of available workers, and notifies registered workers when new work is available. The single WorkManager
actor is the heart of the solution, with built-in resilience provided by the Apache Pekko Cluster Singleton.
The Cluster Singleton tool in Apache Pekko makes sure an actor only runs concurrently on one node within the subset of nodes marked with the role back-end
at any given time. It will run on the oldest back-end node. If the node on which the ‘WorkManager’ is running is removed from the cluster, Pekkostarts a new WorkManager
on the next oldest node. Other nodes in the cluster interact with the WorkManager
through the ClusterSingletonProxy
without knowing the explicit location. You can see this interaction in the FrontEnd
and Worker
actors.
In case of the master node crashing and being removed from the cluster another master actor is automatically started on the new oldest node.
You can see how the master singleton is started in the method init
in WorkManagerSingleton
:
The singleton accepts the Behavior
of the actual singleton actor, as well as configuration which allows us to decide that the singleton actors should only run on the nodes with the role back-end
.
Calls to init
on nodes without the back-end
role will result in a proxy to communicate with the singleton being created.
The state of the master is recovered on the standby node in the case of the node being lost through event sourcing.
Let's now explore the implementation of the WorkManager
actor in depth.
The WorkManager
actor is without question the most involved component in this example. This is because it is designed to deal with failures. While the Apache Pekko Cluster takes care of restarting the WorkManager
in case of a failure, we also want to make sure that the new WorkManager
can arrive at the same state as the failed WorkManager
. We use event sourcing and PekkoPersistence to achieve this.
If the back-end
node hosting the WorkManager
actor would crash the Apache Pekko Cluster Singleton makes sure it starts up on a different node, but we would also want it to reach the exact same state as the crashed node WorkManager
. This is achieved through use of event sourcing and PekkoPersistence.
The current set of work item is modelled in the WorkState
class. It keeps track of the current set of work that is pending, has been accepted by a worker, has completed etc. Every change to the WorkState
is modelled as a domain event.
This allows us to capture and store each such event that happens, we can later replay all of them on an empty model and arrive at the exact same state. This is how event sourcing and PekkoPersistence allows the actor to start on any node and reach the same state as a previous instance.
If the WorkManager
fails and is restarted, the replacement WorkManager
replays events from the log to retrieve the current state. This means that when the WorkState is modified, the WorkManager
must persist the event before acting on it. When the event is successfully stored, we can modify the state. Otherwise, if a failure occurs before the event is persisted, the replacement WorkManager
will not be able to attain the same state as the failed WorkManager
.
Let's look at how a command to process a work item from the front-end comes in. The first thing you might notice is the comment saying idempotent, this means that the same work message may arrive multiple times, but regardless how many times the same work arrives, it should only be executed once. This is needed since the FrontEnd
actor re-sends work in case of the Work
or Ack
messages getting lost (Pekkodoes not provide any guarantee of delivery, see details in the docs).
To make the logic idempotent we simple check if the work id is already known, and if it is we simply Ack
it without further logic. If the work is previously unknown, we start by transforming it into a WorkAccepted
event, which we persist, and only in the EventHandler
that is called after the event has been persisted do we actually update the workState
, and send an Ack
back to the FrontEnd
and trigger a search for available workers. In this case the event handler delegates the logic to the WorkState
domain class.
In a “normal” Actor the only thing we have only to provide a Behavior
. For a PersistentBehavior
there are three things that needs to be implemented:
persistenceId
is a global identifier for the actor, we must make sure that there is never more than one Actor instance with the same persistenceId
running globally, or else we would possibly mess up its journal.commandHandler
receives incoming messages, called Command
s and returns any Effects e.g. persisting an eventeventHandler
is invoked with the events once they have been persisted to the databaseUnlike the WorkManager
actor, the example system contains multiple workers that can be stopped and restarted frequently. We do not need to save their state since the WorkManager
is tracking work and will simply send work to another worker if the original fails to respond. So, rather than persisting a list of available workers, the example uses the following strategy:
RegisterWorker
message. If a back-end
node fails and the WorkManager
is started on a new node, the registrations go automatically to the new node.RegisterWorker
message from arriving within the work-timeout
period causes the ‘WorkManager’ actor to remove the worker from its list.When stopping a Worker
Actor still tries to gracefully remove itself using the DeRegisterWorker
message, but in case of crash it will have no chance to communicate that with the master node.
Now let's move on to the last piece of the puzzle, the worker nodes.
Worker
actors and the WorkManager
actor interact as follows:
Worker
actors register with the receptionist.WorkManager
subscribes to workers via the receptionist.WorkManager
actor has work, it sends a WorkIsReady
message to all workers it thinks are not busy.WorkManager
picks the first reply and assigns the work to that worker. This achieves back pressure because the WorkManager
does not push work on workers that are already busy and overwhelm their mailboxes.WorkExecutor
. This allows the worker to be responsive while its child executes the work.You can see how a worker node and a number of worker actors is started in the method Main.start
if the node contains the role worker
.
Now that we have covered all the details, we can experiment with different sets of nodes for the cluster.
When running the appliction without parameters it runs a six node cluster within the same JVM and starts a Apache Cassandra database. It can be more interesting to run them in separate processes. Open four terminal windows.
In the first terminal window, start the Apache Cassandra database with the following command:
sbt "runMain worker.Main cassandra"
The Apache Cassandra database will stay alive as long as you do not kill this process, when you want to stop it you can do that with Ctrl + C
. Without the database the back-end nodes will not be able to start up.
You could also run your own local installation of Apache Cassandra given that it runs on the default port on localhost and does not require a password.
With the database running, go to the second terminal window and start the first seed node with the following command:
sbt "runMain worker.Main 7345"
7345 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to ‘Up’.
In the third terminal window, start the front-end node with the following command:
sbt "runMain worker.Main 3001"
3001 is to the port of the node. In the log output you see that the cluster node has been started and joins the 7345 node and becomes a member of the cluster. Its status changed to ‘Up’.
Switch over to the second terminal window and see in the log output that the member joined. So far, no Worker
has not been started, i.e. jobs are produced and accepted but not processed.
In the fourth terminal window, start a worker node with the following command:
sbt "runMain worker.Main 5001 3"
5001 means the node will be a worker node, and the second parameter 3
means that it will host three worker actors.
Look at the log output in the different terminal windows. In the second window (front-end) you should see that the produced jobs are processed and logged as "Consumed result"
.
Take a look at the logging that is done in WorkProducer
, WorkManager
and Worker
. Identify the corresponding log entries in the 3 terminal windows with Pekkonodes.
Shutdown the worker node (fourth terminal window) with ctrl-c
. Observe how the "Consumed result"
logs in the front-end node (second terminal window) stops. Start the worker node again.
sbt "runMain worker.Main 5001 3"
You can also start more such worker nodes in new terminal windows.
You can start more cluster back-end nodes using port numbers between 2000-2999.
sbt "runMain worker.Main 7355"
The nodes with port 7345 to 2554 are configured to be used as “seed nodes” in this sample, if you shutdown all or start none of these the other nodes will not know how to join the cluster. If all four are shut down and 7345 is started it will join itself and form a new cluster.
As long as one of the four nodes is alive the cluster will keep working. You can read more about this in the Pekkodocumentation section on seed nodes.
You can start more cluster front-end nodes using port numbers between 3000-3999:
sbt "runMain worker.Main 3002
Any port outside these ranges creates a worker node, for which you can also play around with the number of worker actors on using the second parameter.
sbt "runMain worker.Main 5009 4
The files of the Apache Cassandra database are saved in the target directory and when you restart the application the state is recovered. You can clean the state with:
sbt clean
The following are some ideas where to take this sample next. Implementation of each idea is left up to you.
The FrontEnd
in this sample is a dummy that automatically generates work. A real application could for example use PekkoHTTP to provide a HTTP REST (or other) API for external clients.
If the singleton master becomes a bottleneck we could start several master actors and shard the jobs among them. This could be achieved by using Apache Pekko Cluster Sharding with many WorkManager
actors as entities and a hash of some sort on the payload deciding which master it should go to.
In this example we have used Cluster Singleton and Distributed Publish Subscribe but those are not the only tools in Apache Pekko Cluster.
You can also find a good overview of the various modules that make up Pekkoin this section of the official documentation