Uniffle is designed as a unified shuffle engine for multiple computing frameworks, including Apache Spark and Apache Hadoop. Uniffle has provided pluggable client plugins to enable remote shuffle in Spark and MapReduce.
This document will introduce how to deploy Uniffle client plugins with Spark and MapReduce.
Add client jar to Spark classpath, eg, SPARK_HOME/jars/
The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-XXXXX-shaded.jar
The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar
Update Spark conf to enable Uniffle, eg,
spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999 # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
To support spark dynamic allocation with Uniffle, spark code should be updated. There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference.
After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
spark.shuffle.service.enabled false spark.dynamicAllocation.enabled true
The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar
Update MapReduce conf to enable Uniffle, eg,
-Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999 -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
Note that the RssMRAppMaster will automatically disable slow start (i.e., mapreduce.job.reduce.slowstart.completedmaps=1
) and job recovery (i.e., yarn.app.mapreduce.am.job.recovery.enable=false
)
The important configuration of client is listed as following.
These configurations are shared by all types of clients.
Property Name | Default | Description |
---|---|---|
<client_type>.rss.coordinator.quorum | - | Coordinator quorum |
<client_type>.rss.writer.buffer.size | 3m | Buffer size for single partition data |
<client_type>.rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS |
<client_type>.rss.client.read.buffer.size | 14m | The max data size read from storage |
<client_type>.rss.client.send.threadPool.size | 5 | The thread size for send shuffle data to shuffle server |
<client_type>.rss.client.assignment.tags | - | The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not |
<client_type>.rss.client.data.commit.pool.size | The number of assigned shuffle servers | The thread size for sending commit to shuffle servers |
<client_type>.rss.client.assignment.shuffle.nodes.max | -1 | The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of “rss.coordinator.shuffle.nodes.max”, it will use the size of “rss.coordinator.shuffle.nodes.max” default |
Notice: |
<client_type>
should be spark
or mapreduce
<client_type>.rss.coordinator.quorum
is compulsory, and other configurations are optional when coordinator dynamic configuration is enabled.
To select build-in shuffle or remote shuffle in a smart manner, Uniffle support adaptive enabling. The client should use DelegationRssShuffleManager
and provide its unique <access_id> so that the coordinator could distinguish whether it should enable remote shuffle.
spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager spark.rss.access.id=<access_id>
Notice: Currently, this feature only supports Spark.
Other configuration:
Property Name | Default | Description |
---|---|---|
spark.rss.access.timeout.ms | 10000 | The timeout to access Uniffle coordinator |
spark.rss.client.access.retry.interval.ms | 20000 | The interval between retries fallback to SortShuffleManager |
spark.rss.client.access.retry.times | 0 | The number of retries fallback to SortShuffleManager |
Uniffle supports client-side quorum protocol to tolerant shuffle server crash. This feature is client-side behaviour, in which shuffle writer sends each block to multiple servers, and shuffle readers could fetch block data from one of server. Since sending multiple replicas of blocks can reduce the shuffle performance and resource consumption, we designed it as an optional feature.
Property Name | Default | Description |
---|---|---|
<client_type>.rss.data.replica | 1 | The max server number that each block can be send by client in quorum protocol |
<client_type>.rss.data.replica.write | 1 | The min server number that each block should be send by client successfully |
<client_type>.rss.data.replica.read | 1 | The min server number that metadata should be fetched by client successfully |
Notice:
spark.rss.data.replica.write
+ spark.rss.data.replica.read
> spark.rss.data.replica
Recommended examples:
spark.rss.data.replica 1 spark.rss.data.replica.write 1 spark.rss.data.replica.read 1
spark.rss.data.replica 3 spark.rss.data.replica.write 2 spark.rss.data.replica.read 2
The important configuration is listed as following.
Property Name | Default | Description |
---|---|---|
spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data |
spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server |
spark.rss.client.unregister.thread.pool.size | 10 | The max size of thread pool of unregistering |
spark.rss.client.unregister.request.timeout.sec | 10 | The max timeout sec when doing unregister to remote shuffle-servers |
Property Name | Default | Description |
---|---|---|
mapreduce.rss.client.max.buffer.size | 3k | The max buffer size in map side |
mapreduce.rss.client.batch.trigger.num | 50 | The max batch of buffers to send data in map side |
In cloud environment, VM may have very limited disk space and performance. This experimental feature allows reduce tasks to spill data to remote storage (e.g., hdfs)
Property Name | Default | Description |
---|---|---|
mapreduce.rss.reduce.remote.spill.enable | false | Whether to use remote spill |
mapreduce.rss.reduce.remote.spill.attempt.inc | 1 | Increase reduce attempts as hdfs is easier to crash than disk |
mapreduce.rss.reduce.remote.spill.replication | 1 | The replication number to spill data to hdfs |
mapreduce.rss.reduce.remote.spill.retries | 5 | The retry number to spill data to hdfs |
Notice: this feature requires the MEMORY_LOCAL_HDFS mode.