[ISSUE-244] Fix flaky test of CoordinatorGrpcTest.rpcMetricsTest (#256)

### What changes were proposed in this pull request?
[ISSUE-244] Fix flaky test of CoordinatorGrpcTest.rpcMetricsTest

### Why are the changes needed?
1. The gauge metric of `HEARTBEAT_METHOD` is hard to meansure due to async sending at background. So remove it
2. The gauge metric of `GET_SHUFFLE_ASSIGNMENTS_METHOD` may be not called `descCounter` when rpc finished, so remove it

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Dont need
1 file changed
tree: bb39b7f5940db8372b77deacdf4aa01022942f35
  1. .github/
  2. bin/
  3. client/
  4. client-mr/
  5. client-spark/
  6. common/
  7. conf/
  8. coordinator/
  9. deploy/
  10. docs/
  11. integration-test/
  12. internal-client/
  13. proto/
  14. server/
  15. spark-patches/
  16. storage/
  17. .asf.yaml
  18. .gitignore
  19. build_distribution.sh
  20. checkstyle-suppressions.xml
  21. checkstyle.xml
  22. CONTRIBUTING.md
  23. DISCLAIMER-WIP
  24. LICENSE
  25. NOTICE
  26. pom.xml
  27. README.md
  28. spotbugs-exclude.xml
README.md

Apache Uniffle (Incubating)

Uniffle is a unified remote shuffle service for compute engines. It provides the ability to aggregate and store shuffle data on remote servers, thus improving the performance and reliability of large jobs. Currently it supports Apache Spark and MapReduce.

Build Codecov

Architecture

Rss Architecture Uniffle contains coordinator cluster, shuffle server cluster and remote storage(eg, HDFS) if necessary.

Coordinator will collect status of shuffle server and do the assignment for the job.

Shuffle server will receive the shuffle data, merge them and write to storage.

Depend on different situation, Uniffle supports Memory & Local, Memory & Remote Storage(eg, HDFS), Memory & Local & Remote Storage(recommendation for production environment).

Shuffle Process with Uniffle

  • Spark driver ask coordinator to get shuffle server for shuffle process

  • Spark task write shuffle data to shuffle server with following step: Rss Shuffle_Write

    1. Send KV data to buffer
    2. Flush buffer to queue when buffer is full or buffer manager is full
    3. Thread pool get data from queue
    4. Request memory from shuffle server first and send the shuffle data
    5. Shuffle server cache data in memory first and flush to queue when buffer manager is full
    6. Thread pool get data from queue
    7. Write data to storage with index file and data file
    8. After write data, task report all blockId to shuffle server, this step is used for data validation later
    9. Store taskAttemptId in MapStatus to support Spark speculation
  • Depend on different storage type, spark task read shuffle data from shuffle server or remote storage or both of them.

Shuffle file format

The shuffle data is stored with index file and data file. Data file has all blocks for specific partition and index file has metadata for every block.

Rss Shuffle_Write

Supported Spark Version

Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x, Spark 3.2.x

Note: To support dynamic allocation, the patch(which is included in client-spark/patch folder) should be applied to Spark

Supported MapReduce Version

Current support Hadoop 2.8.5's MapReduce framework.

Building Uniffle

note: currently Uniffle requires JDK 1.8 to build, adding later JDK support is on our roadmap.

Uniffle is built using Apache Maven. To build it, run:

mvn -DskipTests clean package

Build against profile Spark2(2.4.6)

mvn -DskipTests clean package -Pspark2

Build against profile Spark3(3.1.2)

mvn -DskipTests clean package -Pspark3

Build against Spark 3.2.x, Except 3.2.0

mvn -DskipTests clean package -Pspark3.2

Build against Spark 3.2.0

mvn -DskipTests clean package -Pspark3.2.0

To package the Uniffle, run:

./build_distribution.sh

Package against Spark 3.2.x, Except 3.2.0, run:

./build_distribution.sh --spark3-profile 'spark3.2'

Package against Spark 3.2.0, run:

./build_distribution.sh --spark3-profile 'spark3.2.0'

rss-xxx.tgz will be generated for deployment

Deploy

Deploy Coordinator

  1. unzip package to RSS_HOME
  2. update RSS_HOME/bin/rss-env.sh, eg,
      JAVA_HOME=<java_home>
      HADOOP_HOME=<hadoop home>
      XMX_SIZE="16g"
    
  3. update RSS_HOME/conf/coordinator.conf, eg,
      rss.rpc.server.port 19999
      rss.jetty.http.port 19998
      rss.coordinator.server.heartbeat.timeout 30000
      rss.coordinator.app.expired 60000
      rss.coordinator.shuffle.nodes.max 5
      # enable dynamicClientConf, and coordinator will be responsible for most of client conf
      rss.coordinator.dynamicClientConf.enabled true
      # config the path of client conf
      rss.coordinator.dynamicClientConf.path <RSS_HOME>/conf/dynamic_client.conf
      # config the path of excluded shuffle server
      rss.coordinator.exclude.nodes.file.path <RSS_HOME>/conf/exclude_nodes
    
  4. update <RSS_HOME>/conf/dynamic_client.conf, rss client will get default conf from coordinator eg,
     # MEMORY_LOCALFILE_HDFS is recommandation for production environment
     rss.storage.type MEMORY_LOCALFILE_HDFS
     # multiple remote storages are supported, and client will get assignment from coordinator
     rss.coordinator.remote.storage.path hdfs://cluster1/path,hdfs://cluster2/path
     rss.writer.require.memory.retryMax 1200
     rss.client.retry.max 100
     rss.writer.send.check.timeout 600000
     rss.client.read.buffer.size 14m
    
  5. start Coordinator
     bash RSS_HOME/bin/start-coordnator.sh
    

Deploy Shuffle Server

  1. unzip package to RSS_HOME
  2. update RSS_HOME/bin/rss-env.sh, eg,
      JAVA_HOME=<java_home>
      HADOOP_HOME=<hadoop home>
      XMX_SIZE="80g"
    
  3. update RSS_HOME/conf/server.conf, eg,
      rss.rpc.server.port 19999
      rss.jetty.http.port 19998
      rss.rpc.executor.size 2000
      # it should be configed the same as in coordinator
      rss.storage.type MEMORY_LOCALFILE_HDFS
      rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
      # local storage path for shuffle server
      rss.storage.basePath /data1/rssdata,/data2/rssdata....
      # it's better to config thread num according to local disk num
      rss.server.flush.thread.alive 5
      rss.server.flush.threadPool.size 10
      rss.server.buffer.capacity 40g
      rss.server.read.buffer.capacity 20g
      rss.server.heartbeat.timeout 60000
      rss.server.heartbeat.interval 10000
      rss.rpc.message.max.size 1073741824
      rss.server.preAllocation.expired 120000
      rss.server.commit.timeout 600000
      rss.server.app.expired.withoutHeartbeat 120000
      # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m
      # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS
      # please set proper value if DFS is used, eg, 64m, 128m.
      rss.server.flush.cold.storage.threshold.size 100g
    
  4. start Shuffle Server
     bash RSS_HOME/bin/start-shuffle-server.sh
    

Deploy Spark Client

  1. Add client jar to Spark classpath, eg, SPARK_HOME/jars/

    The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-XXXXX-shaded.jar

    The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar

  2. Update Spark conf to enable Uniffle, eg,

    spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
    spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
    # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
    

Support Spark dynamic allocation

To support spark dynamic allocation with Uniffle, spark code should be updated. There are 3 patches for spark (2.4.6/3.1.2/3.2.1) in spark-patches folder for reference.

After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:

spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true

Deploy MapReduce Client

  1. Add client jar to the classpath of each NodeManager, e.g., /share/hadoop/mapreduce/

The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar

  1. Update MapReduce conf to enable Uniffle, eg,

    -Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999
    -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster
    -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector
    -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
    

Note that the RssMRAppMaster will automatically disable slow start (i.e., mapreduce.job.reduce.slowstart.completedmaps=1) and job recovery (i.e., yarn.app.mapreduce.am.job.recovery.enable=false)

Configuration

The important configuration is listed as following.

Coordinator

For more details of advanced configuration, please see Uniffle Coordinator Guide.

Shuffle Server

Property NameDefaultDescription
rss.coordinator.quorum-Coordinator quorum
rss.rpc.server.port-RPC port for Shuffle server
rss.jetty.http.port-Http port for Shuffle server
rss.server.buffer.capacity-Max memory of buffer manager for shuffle server
rss.server.memory.shuffle.highWaterMark.percentage75.0Threshold of spill data to storage, percentage of rss.server.buffer.capacity
rss.server.memory.shuffle.lowWaterMark.percentage25.0Threshold of keep data in memory, percentage of rss.server.buffer.capacity
rss.server.read.buffer.capacity-Max size of buffer for reading data
rss.server.heartbeat.interval10000Heartbeat interval to Coordinator (ms)
rss.server.flush.threadPool.size10Thread pool for flush data to file
rss.server.commit.timeout600000Timeout when commit shuffle data (ms)
rss.storage.type-Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
rss.server.flush.cold.storage.threshold.size64MThe threshold of data size for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used
rss.server.tags-The comma-separated list of tags to indicate the shuffle server's attributes. It will be used as the assignment basis for the coordinator
rss.server.single.buffer.flush.enabledfalseWhether single buffer flush when size exceeded rss.server.single.buffer.flush.threshold
rss.server.single.buffer.flush.threshold64MThe threshold of single shuffle buffer flush

Shuffle Client

For more details of advanced configuration, please see Uniffle Shuffle Client Guide.

Security: Hadoop kerberos authentication

The primary goals of the Uniffle Kerberos security are:

  1. to enable secure data access for coordinator/shuffle-servers, like dynamic conf/exclude-node files stored in secured dfs cluster
  2. to write shuffle data to kerberos secured dfs cluster for shuffle-servers.

The following security configurations are introduced.

Property NameDefaultDescription
rss.security.hadoop.kerberos.enablefalseWhether enable access secured hadoop cluster
rss.security.hadoop.kerberos.krb5-conf.file-The file path of krb5.conf. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid
rss.security.hadoop.kerberos.keytab.file-The kerberos keytab file path. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid
rss.security.hadoop.kerberos.principal-The kerberos keytab principal. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid
rss.security.hadoop.kerberos.relogin.interval.sec60The kerberos authentication relogin interval. unit: sec
  • The proxy user mechanism is used to keep the data isolation in uniffle, which means the shuffle-data written by shuffle-servers is owned by spark app's user. To achieve the this, the login user specified by above config should be as the superuser for HDFS. For more details of related sections, please see Proxy user - Superusers Acting On Behalf Of Other Users

LICENSE

Uniffle is under the Apache License Version 2.0. See the LICENSE file for details.

Contributing

For more information about contributing issues or pull requests, see Uniffle Contributing Guide.