There's no getting around it, frameworks on Mesos are distributed systems.
Distributed systems must deal with failures, and partitions (the two are indistinguishable from a system's perspective).
Concretely, what does this mean for frameworks? Mesos uses an actor-like message passing programming model, in which messages are delivered at-most-once. (Exceptions to this include task status updates, most of which are delivered at-least-once through the use of acknowledgements). The messages passed between the master and the framework are therefore susceptible to be dropped, in the presence of failures.
When these non-reliable messages are dropped, inconsistent state can arise between the framework and Mesos.
As a simple example, consider a launch task request sent by a framework. There are many ways that failures can lead to the loss of the task, for example:
In these cases, the framework believes the task to be staging, but the task is unknown to Mesos. To cope with such situations, task state must be reconciled between the framework and Mesos whenever a failure is detected.
It is the responsibility of Mesos (scheduler driver / Master) to ensure that the framework is notified when a disconnection, and subsequent (re-)registration occurs. At this point, the scheduler should perform task state reconciliation.
Tasks must be reconciled explicitly by the framework after a failure.
This is because the scheduler driver does not persist any task information. In the future, the scheduler driver (or a pure-language mesos library) could perform task reconciliation seamlessly under the covers on behalf of the framework.
So, for now, let's look at how one needs to implement task state reconciliation in a framework scheduler.
Frameworks send a list of TaskStatus
messages to the master:
// Allows the framework to query the status for non-terminal tasks. // This causes the master to send back the latest task status for // each task in 'statuses', if possible. Tasks that are no longer // known will result in a TASK_LOST update. If statuses is empty, // then the master will send the latest status for each task // currently known. message Reconcile { repeated TaskStatus statuses = 1; // Should be non-terminal only. }
Currently, the master will only examine two fields in TaskStatus
:
TaskID
: This is required.SlaveID
: Optional, leads to faster reconciliation in the presence of slaves that are transitioning between states.The technique for performing reconciliation should reconcile all non-terminal tasks, until an update is received for each task, using exponential backoff:
start = now()
remaining = { T ϵ tasks | T is non-terminal }
reconcile(remaining)
remaining = { T ϵ remaining | T.last_update_arrival() < start }
remaining
is non-empty, go to 3.This reconciliation algorithm must be run after each (re-)registration.
Notes:
Offers are reconciled automatically after a failure: