commit | e6c36168147d9c70c07c834acfaad575e08a5bb6 | [log] [tgz] |
---|---|---|
author | Sanha Lee <sanhaleehana@gmail.com> | Tue Aug 21 12:23:49 2018 +0900 |
committer | Won Wook SONG <wonook@apache.org> | Tue Aug 21 12:23:49 2018 +0900 |
tree | 1196014de6b59acb16e32cb2cdb74e6c0f105fe5 | |
parent | faecf759ee21677c010250046d17e305238486a2 [diff] |
[NEMO-139, 6] Logic in the scheduler for appending jobs, Support RDD caching (#111) JIRA: [NEMO-139: Logic in the scheduler for appending jobs](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-139) JIRA: [NEMO-6: Support RDD caching](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-6) **Major changes:** - add a logic in the scheduler for appending plans (NEMO-139) - implement `PlanAppender` that appends submitted `PhysicalPlan` to a original `PhysicalPlan` - refactor `PlanStateManager`, `BatchScheduler`, `BlockManagerMaster`, and `TaskDispatcher` to reflect that all plans from a single job are appended to a single `PhysicalPlan` through `PlanAppender` - support RDD caching (NEMO-6) - add `CacheIdProperty` property and `GhostProperty` - When a Spark user program call `cache()` or `persist()` for a `RDD`, the RDD creates a ghost vertex and connect the vertex having the `RDD` to the ghost vertex. This edge to the ghost vertex is annotated with an ID of cache (`cacheIdProperty`). When a plan with this edge is executed in our runtime, the data to cache will be stored in the edge as the required `StorageLevel` format. (Any extra feature is not required in our runtime to produce or sustain this data.) - When the `BatchScheduler` encounter a task that annotated with the `GhostProperty`, the vertex will not be scheduled but just regarded as a completed task. - implement `Optimizer` that conducts optimization by using `OptimizationPass`es from our `UserApplicationRunner` to separate the roll. - When an IR DAG that contains any edge with `cacheIdProperty` is submitted and there was any already executed IR DAG that contains an edge with the identical `cacheIdProperty`, the `Optimizer` crops the IR DAG before the cache edge and adds a `CachedSourceVertex` before the edge. - make `PlanAppender` properly handle the caching - Make `PlanAppender` append the `PhysicalPlan` constructed from the cropped IR DAG with caching edge to the original `PhysicalPlan` and add a new edge from the vertex that has the actual edge to a ghost vertex and the new `CachedSourceVertex`. In runtime, when the `CachedSourceVertex` requires the data, the cached data that produced and stored in the edge to the ghost vertex will be read through our `DuplicateEdgeGroupProperty` logic. **Minor changes to note:** - N/A. **Tests for the changes:** - add an integration test that tests `SparkCachingWordCount` application - `SparkCachingWordCount` caches a shuffle data and calculates that which keys have identical count by using the cached data. **Other comments:** - I'm sorry for the late. This issue is a part of our first release and the target due was August 16th, but it is delayed to resolve conflicts. Closes #111
A Data Processing System for Flexible Employment With Different Deployment Characteristics.
Details about Nemo and its development can be found in:
Please refer to the Contribution guideline to contribute to our project.
export HADOOP_HOME=/path/to/hadoop-2.7.2 export YARN_HOME=$HADOOP_HOME export PATH=$PATH:$HADOOP_HOME/bin
On Ubuntu 14.04 LTS and its point releases:
sudo apt-get install protobuf-compiler
On Ubuntu 16.04 LTS and its point releases:
sudo add-apt-repository ppa:snuspl/protobuf-250 sudo apt update sudo apt install protobuf-compiler=2.5.0-9xenial1
On macOS:
brew tap homebrew/versions brew install protobuf@2.5
Or build from source:
./configure
make
make check
sudo make install
To check for a successful installation of version 2.5.0, run protoc --version
mvn clean install -T 2C
mvn clean install -DskipITs -T 2C
-job_id
: ID of the Beam job-user_main
: Canonical name of the Beam application-user_args
: Arguments that the Beam application accepts-optimization_policy
: Canonical name of the optimization policy to apply to a job DAG in Nemo Compiler-deploy_mode
: yarn
is supported(default value is local
)## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ -optimization_policy edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy \ -user_main edu.snu.nemo.examples.beam.WordCount \ -user_args "`pwd`/examples/resources/test_input_wordcount `pwd`/examples/resources/test_output_wordcount" ## YARN cluster example ./bin/run_beam.sh \ -deploy_mode yarn \ -job_id mr_transient \ -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ -user_main edu.snu.nemo.examples.beam.WordCount \ -optimization_policy edu.snu.nemo.compiler.optimizer.policy.TransientResourcePolicy \ -user_args "hdfs://v-m:9000/test_input_wordcount hdfs://v-m:9000/test_output_wordcount"
-executor_json
command line option can be used to provide a path to the JSON file that describes resource configuration for executors. Its default value is config/default.json
, which initializes one of each Transient
, Reserved
, and Compute
executor, each of which has one core and 1024MB memory.
num
(optional): Number of containers. Default value is 1type
: Three container types are supported:Transient
: Containers that store eviction-prone resources. When batch jobs use idle resources in Transient
containers, they can be arbitrarily evicted when latency-critical jobs attempt to use the resources.Reserved
: Containers that store eviction-free resources. Reserved
containers are used to reliably store intermediate data which have high eviction cost.Compute
: Containers that are mainly used for computation.memory_mb
: Memory size in MBcapacity
: Number of Task
s that can be run in an executor. Set this value to be the same as the number of CPU cores of the container.[ { "num": 12, "type": "Transient", "memory_mb": 1024, "capacity": 4 }, { "type": "Reserved", "memory_mb": 1024, "capacity": 2 } ]
This example configuration specifies
Nemo Compiler and Engine can store JSON representation of intermediate DAGs.
-dag_dir
command line option is used to specify the directory where the JSON files are stored. The default directory is ./dag
. Using our online visualizer, you can easily visualize a DAG. Just drop the JSON file of the DAG as an input to it../bin/run_beam.sh \ -job_id als \ -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ -user_main edu.snu.nemo.examples.beam.AlternatingLeastSquare \ -optimization_policy edu.snu.nemo.compiler.optimizer.policy.TransientResourcePolicy \ -dag_dir "./dag/als" \ -user_args "`pwd`/examples/resources/test_input_als 10 3"