#Introduction to REEF
It is instructive to first remind us of how MapReduce works: Hadoop Map-Reduce schedules compute tasks on containers for executing map and reduce functions on record data. The basic structure of a Map-Reduce job is as follows. For each input block, schedule a map task that passes each internal record to a user-defined map function and materializes the output in key-sorted order. Further, assign a user-defined number of reduce tasks to non-overlapping portions of the key-space from the map output, then shuffle it across the network to where the respective reduce task is scheduled. For each reduce task, perform a global key-based sort on the shuffled data, group it by key and call the reduce function on each record group; storing the output in a durable fashion (i.e., HDFS).
From the perspective of the scheduler, a number of issues arise that must be appropriately handled in order to scale-out to massive datasets. First, each map task should be scheduled close to where the input block resides; ideally on the same machine or rack. Second, failures can occur at the task level at any step; requiring backup tasks to be scheduled or the job being aborted. Third, performance bottlenecks can cause an imbalance in the task-level progress. The scheduler must react to these stragglers by scheduling clones and incorporating the logical task that crosses the finish line first.
Anyone of these issues can limit the scale-out degree of a Map-Reduce job. In what follows, we prescribe a scheduler framework that provides task life-cycle management mechanisms. Using this framework, we developed a complete version of the Map-Reduce runtime that addresses the above issues. Our framework is designed around three components.
Driver
: captures the client code embodying the schedulerEvaluator
: provides a runtime environment on a container.Task
: encapsulates the task-level client code to be executed in an Evaluator.Below, we describe the client facing interfaces to these components. The core REEF control flow design is based on the reactive extensions (Rx), which enforce asynchronous message-passing method signatures. In Rx terms, interfaces are based on an observer pattern, which expose methods that accept messages from a (possibly remote) asynchronous caller.
A running REEF job with two RunningEvaluators
and one RunningTask
The above figure presents the REEF components in terms of a running application, which is written in terms of a Driver
and task-specific Task
modules. The application code is packaged and submitted to a REEF client API, which in turn submits a REEF-AM configuration to YARN. The REEF-AM contains a runtime for launching the Driver
and client libraries for requesting Evaluators
and launching Tasks
. When a request for Evaluators
is made, the REEF-AM negotiates containers with the YARN-RM and launches an Evaluator
runtime on the YARN-NM that hosts the allocated container. In turn, the Driver
is given an Evaluator
object reference, which it uses to submit an Task
. The Driver
is also given a Task
object reference, which it may use to send messages to the Task
running in the Evaluator
. The REEF layer implements these communication channels and encodes the computational life-cycle as state transitions, which are surfaced to the Driver
in the form of Rx messages.
###Computational Life-Cycle
States of Evaluator
, Contexts
, and Tasks
The Figure above describes the state transitions for (a) Evaluator
and Context
and (b) Task
components. Each state transition is associated with an object reference that is surfaced to the Driver
in an Rx-style interface. For instance, when the YARN-RM notifies the REEF-AM of an allocated container, the Driver
is given an AllocatedEvaluator
object; containing methods for adding configurations (i.e., for data services, see below) and file resources, and submit methods that bootstraps the Evaluator
runtime on the YARN-NM. When an Evaluator
bootstrap successfully completes, the Driver
is given an ActiveContext
object, which can be used to launch Tasks
or to initiate a close, which triggers a shutdown event at the Evaluator
runtime and a subsequent container release at the YARN-RM. If at any point a failure occurs, the Driver
is passed a FailedEvaluator
object, containing an exception trace when possible.
Recall that the Driver
launches a Task
on a submit method call from the ActiveContext
reference. This state transition is denoted in the Figure above by the edge labeled submit, spanning the two state machines. The REEF-AM passes a RunningTask
object to the Driver
after receiving confirmation of a successful Task
start or resume. The Driver
may use the RunningTask
reference to close or suspend the execution, triggering a CompletedTask
or SuspendedTask
object reference to the Driver
. The SuspendedTask
object contains a memento used to resume the execution on some (possibly alternative) ActiveContext
. Exceptions during the Task
execution are surfaced to the Driver
in the form of a FailedTask
, which contains the actual exception object.
###Task Component
A Task
encapsulates the task work of a job. The client interface contains a single synchronous call method that takes an optional memento argument and returns a byte array, which will be packaged with the CompletedTask
object surfaced to the Driver
. An exception may be thrown at any point during the call method, returning control back to the Evaluator
, which packages the exception and sends it to the Driver
where it is surfaced as a FailedTask
. The Evaluator
periodically performs a heartbeat with the REEF-AM to convey its status information. A Task
can optionally implement a method interface that, when called, returns a (bounded) byte array, which the Evaluator
includes in its heartbeat to the REEF-AM and surfaced to the Driver
.