[NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51)

JIRA: [NEMO-102: Stage Partitioning by PhysicalPlanGenerator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)

**Major changes:**
- Removed StageId property
- Replaced DefaultStagePartitioningPass with StagePartitioner in nemo-runtime-common
- Modified PhysicalPlanGenerator to use StagePartitioner
- Added Stage-level property. Common properties which vertices in a stage share become stage-level properties. (Except for the properties ignored by StagePartitioner)
- Ad-hoc properties in Task and Stage, such as containerType, now can be handled by stage-level properties.
- Replaced ScheduleGroupPass with DefaultScheduleGroupPass, which does not require StageId property for assigning ScheduleGroupIndex

**Minor changes to note:**
- Removed StageBuilder and StageEdgeBuilder
- Add a feature to visualizer to display stage-level ExecutionProperties
- Modified visualizer to properly display other ExecutionProperties
- Modified visualizer to properly draw StageEdges so that those edges are not cut by stage boundaries
- Removed parallelism equality checking by DAGBuilder which requires IR-level StageId property (the feature is replaced by the constructor of Stage and sanity checking by PhysicalPlanGenerator)
- Renamed DataStoreProperty to InterTaskDataStoreProperty because it only controls data flow which spans through differnet stages
- Modified ExecutionPropertyMap#toString to emit canonical name of property key
- Added equality test cases to ExecutionPropertyMapTest
- Removed `idToIRVertex` parameter from the constructor of PhysicalPlan. (The parameter value can be inferred from the IR dag supplied to the constructor.)
- Increased the capacity of resources in `beam_sample_executor_resources.json`, because some integration test cases require scheduling a large ScheduleGroup at once. (For example, ScheduleGroup 0 in AlternatingLeastSquareITCase_pado requires 15 slots in Transient resoruce.)
- Modified StageEdge to use consistent naming for executionProperties when emitting json (edgeProperties → executionProperties)

**Tests for the changes:**
- Added StagePartitionerTest to test StagePartitioner under various test scenarios.
- Renamed ScheduleGroupPassTest to DefaultScheduleGroupPassTest and made it tests DefaultScheduleGroupPass under various test scenarios.
- Existing tests should cover changes on PhysicalPlanGenerator.

**Other comments:**
- Legacy ScheduleGroupPass forces stages with SourceVertex within to have ScheduleGroupIndex of zero. Since DefaultScheduleGroupPass does not employ this kind of trick, in FaultToleranceTest ScheduleGroup 0 for `TestPlanGenerator.PlanType.TwoVerticesJoined` is splitted into two ScheduleGroups. That's why I made modification like `if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {` in FaultToleranceTest.

resolves [NEMO-102](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)
60 files changed
tree: 4410ca9084ec2e6a70aa8178981d1c107bb2d9cb
  1. .github/
  2. bin/
  3. client/
  4. common/
  5. compiler/
  6. conf/
  7. deploy/
  8. examples/
  9. runtime/
  10. tests/
  11. webui/
  12. .gitignore
  13. .travis.yml
  14. checkstyle.xml
  15. LICENSE
  16. pom.xml
  17. README.md
README.md

Nemo

Build Status

A Data Processing System for Flexible Employment With Different Deployment Characteristics.

Online Documentation

Details about Nemo and its development can be found in:

Please refer to the Contribution guideline to contribute to our project.

Nemo prerequisites and setup

Prerequisites

  • Java 8
  • Maven
  • YARN settings
  • Protobuf 2.5.0
    • On Ubuntu 14.04 LTS and its point releases:

      sudo apt-get install protobuf-compiler
      
    • On Ubuntu 16.04 LTS and its point releases:

      sudo add-apt-repository ppa:snuspl/protobuf-250
      sudo apt update
      sudo apt install protobuf-compiler=2.5.0-9xenial1
      
    • On macOS:

      brew tap homebrew/versions
      brew install protobuf@2.5
      
    • Or build from source:

    • To check for a successful installation of version 2.5.0, run protoc --version

Installing Nemo

  • Run all tests and install: mvn clean install -T 2C
  • Run only unit tests and install: mvn clean install -DskipITs -T 2C

Running Beam applications

Running an external Beam application

  • Use run_external_app.sh instead of run.sh
  • Set the first argument the path to the external Beam application jar
./bin/run_external_app.sh \
    `pwd`/nemo_app/target/bd17f-1.0-SNAPSHOT.jar \
    -job_id mapreduce \
    -executor_json `pwd`/examples/resources/sample_executor_resources.json \
    -user_main MapReduce \
    -user_args "`pwd`/mr_input_data `pwd`/nemo_output/output_data"

Configurable options

  • -job_id: ID of the Beam job
  • -user_main: Canonical name of the Beam application
  • -user_args: Arguments that the Beam application accepts
  • -optimization_policy: Canonical name of the optimization policy to apply to a job DAG in Nemo Compiler
  • -deploy_mode: yarn is supported(default value is local)

Examples

## MapReduce example
./bin/run_beam.sh \
	-job_id mr_default \
	-executor_json `pwd`/examples/resources/sample_executor_resources.json \
	-optimization_policy edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy \
	-user_main edu.snu.nemo.examples.beam.WordCount \
	-user_args "`pwd`/examples/resources/sample_input_wordcount `pwd`/examples/resources/sample_output_wordcount"

## YARN cluster example
./bin/run_beam.sh \
	-deploy_mode yarn \
  	-job_id mr_pado \
	-executor_json `pwd`/examples/resources/sample_executor_resources.json \
  	-user_main edu.snu.nemo.examples.beam.WordCount \
  	-optimization_policy edu.snu.nemo.compiler.optimizer.policy.PadoPolicy \
  	-user_args "hdfs://v-m:9000/sample_input_wordcount hdfs://v-m:9000/sample_output_wordcount"

Resource Configuration

-executor_json command line option can be used to provide a path to the JSON file that describes resource configuration for executors. Its default value is config/default.json, which initializes one of each Transient, Reserved, and Compute executor, each of which has one core and 1024MB memory.

Configurable options

  • num (optional): Number of containers. Default value is 1
  • type: Three container types are supported:
    • Transient : Containers that store eviction-prone resources. When batch jobs use idle resources in Transient containers, they can be arbitrarily evicted when latency-critical jobs attempt to use the resources.
    • Reserved : Containers that store eviction-free resources. Reserved containers are used to reliably store intermediate data which have high eviction cost.
    • Compute : Containers that are mainly used for computation.
  • memory_mb: Memory size in MB
  • capacity: Number of Tasks that can be run in an executor. Set this value to be the same as the number of CPU cores of the container.

Examples

[
  {
    "num": 12,
    "type": "Transient",
    "memory_mb": 1024,
    "capacity": 4
  },
  {
    "type": "Reserved",
    "memory_mb": 1024,
    "capacity": 2
  }
]

This example configuration specifies

  • 12 transient containers with 4 cores and 1024MB memory each
  • 1 reserved container with 2 cores and 1024MB memory

Monitoring your job using web UI

Nemo Compiler and Engine can store JSON representation of intermediate DAGs.

  • -dag_dir command line option is used to specify the directory where the JSON files are stored. The default directory is ./dag. Using our online visualizer, you can easily visualize a DAG. Just drop the JSON file of the DAG as an input to it.

Examples

./bin/run_beam.sh \
	-job_id als \
	-executor_json `pwd`/examples/resources/beam_sample_executor_resources.json \
  	-user_main edu.snu.nemo.examples.beam.AlternatingLeastSquare \
  	-optimization_policy edu.snu.nemo.compiler.optimizer.policy.PadoPolicy \
  	-dag_dir "./dag/als" \
  	-user_args "`pwd`/examples/resources/sample_input_als 10 3"