commit | add2b555a57632761fbc5657c1215fd684b37760 | [log] [tgz] |
---|---|---|
author | Jangho Seo <jangho@jangho.io> | Mon Jun 25 12:17:10 2018 +0900 |
committer | John Yang <johnyangk@gmail.com> | Mon Jun 25 12:17:10 2018 +0900 |
tree | 4410ca9084ec2e6a70aa8178981d1c107bb2d9cb | |
parent | 7ef822be5b95f0793fba9519335adf1a1837cee7 [diff] |
[NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51) JIRA: [NEMO-102: Stage Partitioning by PhysicalPlanGenerator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102) **Major changes:** - Removed StageId property - Replaced DefaultStagePartitioningPass with StagePartitioner in nemo-runtime-common - Modified PhysicalPlanGenerator to use StagePartitioner - Added Stage-level property. Common properties which vertices in a stage share become stage-level properties. (Except for the properties ignored by StagePartitioner) - Ad-hoc properties in Task and Stage, such as containerType, now can be handled by stage-level properties. - Replaced ScheduleGroupPass with DefaultScheduleGroupPass, which does not require StageId property for assigning ScheduleGroupIndex **Minor changes to note:** - Removed StageBuilder and StageEdgeBuilder - Add a feature to visualizer to display stage-level ExecutionProperties - Modified visualizer to properly display other ExecutionProperties - Modified visualizer to properly draw StageEdges so that those edges are not cut by stage boundaries - Removed parallelism equality checking by DAGBuilder which requires IR-level StageId property (the feature is replaced by the constructor of Stage and sanity checking by PhysicalPlanGenerator) - Renamed DataStoreProperty to InterTaskDataStoreProperty because it only controls data flow which spans through differnet stages - Modified ExecutionPropertyMap#toString to emit canonical name of property key - Added equality test cases to ExecutionPropertyMapTest - Removed `idToIRVertex` parameter from the constructor of PhysicalPlan. (The parameter value can be inferred from the IR dag supplied to the constructor.) - Increased the capacity of resources in `beam_sample_executor_resources.json`, because some integration test cases require scheduling a large ScheduleGroup at once. (For example, ScheduleGroup 0 in AlternatingLeastSquareITCase_pado requires 15 slots in Transient resoruce.) - Modified StageEdge to use consistent naming for executionProperties when emitting json (edgeProperties → executionProperties) **Tests for the changes:** - Added StagePartitionerTest to test StagePartitioner under various test scenarios. - Renamed ScheduleGroupPassTest to DefaultScheduleGroupPassTest and made it tests DefaultScheduleGroupPass under various test scenarios. - Existing tests should cover changes on PhysicalPlanGenerator. **Other comments:** - Legacy ScheduleGroupPass forces stages with SourceVertex within to have ScheduleGroupIndex of zero. Since DefaultScheduleGroupPass does not employ this kind of trick, in FaultToleranceTest ScheduleGroup 0 for `TestPlanGenerator.PlanType.TwoVerticesJoined` is splitted into two ScheduleGroups. That's why I made modification like `if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {` in FaultToleranceTest. resolves [NEMO-102](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)
A Data Processing System for Flexible Employment With Different Deployment Characteristics.
Details about Nemo and its development can be found in:
Please refer to the Contribution guideline to contribute to our project.
export HADOOP_HOME=/path/to/hadoop-2.7.4 export YARN_HOME=$HADOOP_HOME export PATH=$PATH:$HADOOP_HOME/bin
On Ubuntu 14.04 LTS and its point releases:
sudo apt-get install protobuf-compiler
On Ubuntu 16.04 LTS and its point releases:
sudo add-apt-repository ppa:snuspl/protobuf-250 sudo apt update sudo apt install protobuf-compiler=2.5.0-9xenial1
On macOS:
brew tap homebrew/versions brew install protobuf@2.5
Or build from source:
./configure
make
make check
sudo make install
To check for a successful installation of version 2.5.0, run protoc --version
mvn clean install -T 2C
mvn clean install -DskipITs -T 2C
./bin/run_external_app.sh \ `pwd`/nemo_app/target/bd17f-1.0-SNAPSHOT.jar \ -job_id mapreduce \ -executor_json `pwd`/examples/resources/sample_executor_resources.json \ -user_main MapReduce \ -user_args "`pwd`/mr_input_data `pwd`/nemo_output/output_data"
-job_id
: ID of the Beam job-user_main
: Canonical name of the Beam application-user_args
: Arguments that the Beam application accepts-optimization_policy
: Canonical name of the optimization policy to apply to a job DAG in Nemo Compiler-deploy_mode
: yarn
is supported(default value is local
)## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ -executor_json `pwd`/examples/resources/sample_executor_resources.json \ -optimization_policy edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy \ -user_main edu.snu.nemo.examples.beam.WordCount \ -user_args "`pwd`/examples/resources/sample_input_wordcount `pwd`/examples/resources/sample_output_wordcount" ## YARN cluster example ./bin/run_beam.sh \ -deploy_mode yarn \ -job_id mr_pado \ -executor_json `pwd`/examples/resources/sample_executor_resources.json \ -user_main edu.snu.nemo.examples.beam.WordCount \ -optimization_policy edu.snu.nemo.compiler.optimizer.policy.PadoPolicy \ -user_args "hdfs://v-m:9000/sample_input_wordcount hdfs://v-m:9000/sample_output_wordcount"
-executor_json
command line option can be used to provide a path to the JSON file that describes resource configuration for executors. Its default value is config/default.json
, which initializes one of each Transient
, Reserved
, and Compute
executor, each of which has one core and 1024MB memory.
num
(optional): Number of containers. Default value is 1type
: Three container types are supported:Transient
: Containers that store eviction-prone resources. When batch jobs use idle resources in Transient
containers, they can be arbitrarily evicted when latency-critical jobs attempt to use the resources.Reserved
: Containers that store eviction-free resources. Reserved
containers are used to reliably store intermediate data which have high eviction cost.Compute
: Containers that are mainly used for computation.memory_mb
: Memory size in MBcapacity
: Number of Task
s that can be run in an executor. Set this value to be the same as the number of CPU cores of the container.[ { "num": 12, "type": "Transient", "memory_mb": 1024, "capacity": 4 }, { "type": "Reserved", "memory_mb": 1024, "capacity": 2 } ]
This example configuration specifies
Nemo Compiler and Engine can store JSON representation of intermediate DAGs.
-dag_dir
command line option is used to specify the directory where the JSON files are stored. The default directory is ./dag
. Using our online visualizer, you can easily visualize a DAG. Just drop the JSON file of the DAG as an input to it../bin/run_beam.sh \ -job_id als \ -executor_json `pwd`/examples/resources/beam_sample_executor_resources.json \ -user_main edu.snu.nemo.examples.beam.AlternatingLeastSquare \ -optimization_policy edu.snu.nemo.compiler.optimizer.policy.PadoPolicy \ -dag_dir "./dag/als" \ -user_args "`pwd`/examples/resources/sample_input_als 10 3"