commit | 51b85811bc4d7a9b3595bc04283c8cf6c7172d8a | [log] [tgz] |
---|---|---|
author | RickyMa <rickyma@tencent.com> | Mon Mar 11 09:53:25 2024 +0800 |
committer | GitHub <noreply@github.com> | Mon Mar 11 09:53:25 2024 +0800 |
tree | 87a26c3312c8ae21c6adc96ebe0f75ec7ef1bb96 | |
parent | cf6d2bff5d97f0977aaa4828d53c497d4abfeda7 [diff] |
[#1567] fix(spark): Let Spark use its own NettyUtils (#1565) ### What changes were proposed in this pull request? When we release the shaded client jar for Spark 2.x, the class `org.apache.spark.network.util.NettyUtils.class` should not be included in the package. ### Why are the changes needed? Fix https://github.com/apache/incubator-uniffle/issues/1567. & It's also a followup PR for https://github.com/apache/incubator-uniffle/pull/727. When running in Spark 2.4, we will encounter exceptions as below: ``` 24/03/07 16:34:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tdwadmin); groups with view permissions: Set(); users with modify permissions: Set(tdwadmin); groups with modify permissions: Set() Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.network.util.NettyUtils.createEventLoop(Lorg/apache/spark/network/util/IOMode;ILjava/lang/String;)Lio/netty/channel/EventLoopGroup; at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:104) at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:89) at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:70) at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:449) at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:264) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:271) at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) at org.apache.spark.deploy.yarn.SQLApplicationMaster.<init>(SQLApplicationMaster.scala:96) at org.apache.spark.deploy.yarn.SQLApplicationMaster.<init>(SQLApplicationMaster.scala:53) at org.apache.spark.deploy.yarn.SQLApplicationMaster$$anonfun$main$1.apply$mcV$sp(SQLApplicationMaster.scala:544) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2286) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.yarn.SQLApplicationMaster$.main(SQLApplicationMaster.scala:543) at org.apache.spark.deploy.yarn.SQLApplicationMaster.main(SQLApplicationMaster.scala) ``` This is because the return value of `createEventLoop` in `NettyUtils` within Uniffle is `org.apache.uniffle.io.netty.channel.EventLoopGroup` (which is shaded), while the return value of `createEventLoop` in `NettyUtils` within Spark is `io.netty.channel.EventLoopGroup`. When running a Spark application, the Driver loads `NettyUtils` from the rss-client's JAR, causing inconsistency in the method's return values and ultimately leading to a `NoSuchMethodError` exception. We should let Spark use its own `NettyUtils` instead of ours. However, if we simply remove the `org.apache.spark.network.util.NettyUtils` file from the code repository, we will encounter errors when running integration tests. ``` java.lang.RuntimeException: java.lang.NoSuchFieldException: DEFAULT_TINY_CACHE_SIZE at org.apache.spark.network.util.NettyUtils.getPrivateStaticField(NettyUtils.java:131) at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:118) at org.apache.spark.network.server.TransportServer.init(TransportServer.java:94) at org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:73) at org.apache.spark.network.TransportContext.createServer(TransportContext.java:114) at org.apache.spark.rpc.netty.NettyRpcEnv.startServer(NettyRpcEnv.scala:119) at org.apache.spark.rpc.netty.NettyRpcEnvFactory$$anonfun$4.apply(NettyRpcEnv.scala:465) at org.apache.spark.rpc.netty.NettyRpcEnvFactory$$anonfun$4.apply(NettyRpcEnv.scala:464) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2275) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2267) at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:469) at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:256) at org.apache.spark.SparkContext.<init>(SparkContext.scala:423) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2493) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:934) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:925) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:925) at org.apache.uniffle.test.SparkIntegrationTestBase.runSparkApp(SparkIntegrationTestBase.java:92) at org.apache.uniffle.test.SparkIntegrationTestBase.run(SparkIntegrationTestBase.java:53) at org.apache.uniffle.test.RSSStageResubmitTest.testRSSStageResubmit(RSSStageResubmitTest.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ``` This is because our code project _**globally controls**_ the version of Netty in the root `pom.xml`'s `dependencyManagement`, which leads to Spark2's own lower version of Netty being upgraded to a higher version. This causes exceptions due to Netty version incompatibility, resulting in certain fields not being found. This issue does not occur in the production environment, as Spark has its own `NettyUtils` and does not need to use our provided `NettyUtils`. Retaining `org.apache.spark.network.util.NettyUtils` is somewhat of a workaround for passing integration tests. But given that Spark2 is not frequently updated anymore, maintaining a static version of `NettyUtils` should not pose a significant problem. Of course, the optimal approach would be to shade our own Netty during integration testing, allowing Spark to continue using its own Netty dependency, effectively separating the two. This would provide the most accurate testing, as any changes in Spark2's Netty version could be verified through unit tests. However, this would mean that a large amount of integration test code would need to prefix `org.apache.uniffle` to the `import` statements where Netty is used. Ultimately, this could lead to significant redundancy in the code and cause confusion for those who write codes in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
Uniffle is a high performance, general purpose remote shuffle service for distributed computing engines. It provides the ability to push shuffle data into centralized storage service, changing the shuffle style from “local file pull-like style” to “remote block push-like style”. It brings in several advantages like supporting disaggregated storage deployment, super large shuffle jobs, and high elasticity. Currently it supports Apache Spark, Apache Hadoop MapReduce and Apache Tez.
Uniffle cluster consists of three components, a coordinator cluster, a shuffle server cluster and an optional remote storage (e.g., HDFS).
Coordinator will collect the status of shuffle servers and assign jobs based on some strategy.
Shuffle server will receive the shuffle data, merge them and write to storage.
Depending on different situations, Uniffle supports Memory & Local, Memory & Remote Storage(e.g., HDFS), Memory & Local & Remote Storage(recommendation for production environment).
The shuffle data is stored with index file and data file. Data file has all blocks for a specific partition and the index file has metadata for every block.
Currently supports Spark 2.3.x, Spark 2.4.x, Spark 3.0.x, Spark 3.1.x, Spark 3.2.x, Spark 3.3.x, Spark 3.4.x, Spark 3.5.x
Note: To support dynamic allocation, the patch(which is included in patch/spark folder) should be applied to Spark
Currently supports the MapReduce framework of Hadoop 2.8.5, Hadoop 3.2.1
note: currently Uniffle requires JDK 1.8 to build, adding later JDK support is on our roadmap.
Uniffle is built using Apache Maven. To build it, run:
./mvnw -DskipTests clean package
To fix code style issues, run:
./mvnw spotless:apply -Pspark3 -Pspark2 -Ptez -Pmr -Phadoop2.8 -Pdashboard
Build against profile Spark 2 (2.4.6)
./mvnw -DskipTests clean package -Pspark2
Build against profile Spark 3 (3.1.2)
./mvnw -DskipTests clean package -Pspark3
Build against Spark 3.2.x, Except 3.2.0
./mvnw -DskipTests clean package -Pspark3.2
Build against Spark 3.2.0
./mvnw -DskipTests clean package -Pspark3.2.0
Build against Hadoop MapReduce 2.8.5
./mvnw -DskipTests clean package -Pmr,hadoop2.8
Build against Hadoop MapReduce 3.2.1
./mvnw -DskipTests clean package -Pmr,hadoop3.2
Build against Tez 0.9.1
./mvnw -DskipTests clean package -Ptez
Build against Tez 0.9.1 and Hadoop 3.2.1
./mvnw -DskipTests clean package -Ptez,hadoop3.2
Build with dashboard
./mvnw -DskipTests clean package -Pdashboard
To package the Uniffle, run:
./build_distribution.sh
Package against Spark 3.2.x, Except 3.2.0, run:
./build_distribution.sh --spark3-profile 'spark3.2'
Package against Spark 3.2.0, run:
./build_distribution.sh --spark3-profile 'spark3.2.0'
Package will build against Hadoop 2.8.5 in default. If you want to build package against Hadoop 3.2.1, run:
./build_distribution.sh --hadoop-profile 'hadoop3.2'
Package with hadoop jars, If you want to build package against Hadoop 3.2.1, run:
./build_distribution.sh --hadoop-profile 'hadoop3.2' -Phadoop-dependencies-included
rss-xxx.tgz will be generated for deployment
If you have packaged tgz with hadoop jars, the env of HADOOP_HOME
is needn't specified in rss-env.sh
.
JAVA_HOME=<java_home> HADOOP_HOME=<hadoop home> XMX_SIZE="16g"
rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.coordinator.server.heartbeat.timeout 30000 rss.coordinator.app.expired 60000 rss.coordinator.shuffle.nodes.max 5 # enable dynamicClientConf, and coordinator will be responsible for most of client conf rss.coordinator.dynamicClientConf.enabled true # config the path of client conf rss.coordinator.dynamicClientConf.path <RSS_HOME>/conf/dynamic_client.conf # config the path of excluded shuffle server rss.coordinator.exclude.nodes.file.path <RSS_HOME>/conf/exclude_nodes
# MEMORY_LOCALFILE_HDFS is recommended for production environment rss.storage.type MEMORY_LOCALFILE_HDFS # multiple remote storages are supported, and client will get assignment from coordinator rss.coordinator.remote.storage.path hdfs://cluster1/path,hdfs://cluster2/path rss.writer.require.memory.retryMax 1200 rss.client.retry.max 50 rss.writer.send.check.timeout 600000 rss.client.read.buffer.size 14m
bash RSS_HOME/bin/start-coordnator.sh
We recommend to use JDK 11+ if we want to have better performance when we deploy the shuffle server. Some benchmark tests among different JDK is as below: (using spark to write shuffle data with 20 executors. Single executor will total write 1G, and each time write 14M. Shuffle Server use GRPC to transfer data)
Java version | ShuffleServer GC | Max pause time | ThroughOutput |
---|---|---|---|
8 | G1 | 30s | 0.3 |
11 | G1 | 2.5s | 0.8 |
18 | G1 | 2.5s | 0.8 |
18 | ZGC | 0.2ms | 0.99997 |
Deploy Steps:
JAVA_HOME=<java_home> HADOOP_HOME=<hadoop home> XMX_SIZE="80g"
rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.rpc.executor.size 2000 # it should be configured the same as in coordinator rss.storage.type MEMORY_LOCALFILE_HDFS rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999 # local storage path for shuffle server rss.storage.basePath /data1/rssdata,/data2/rssdata.... # it's better to config thread num according to local disk num rss.server.flush.thread.alive 5 rss.server.flush.localfile.threadPool.size 10 rss.server.flush.hadoop.threadPool.size 60 rss.server.buffer.capacity 40g rss.server.read.buffer.capacity 20g rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 rss.server.commit.timeout 600000 rss.server.app.expired.withoutHeartbeat 120000 # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS # please set a proper value if DFS is used, e.g., 64m, 128m. rss.server.flush.cold.storage.threshold.size 100g
bash RSS_HOME/bin/start-shuffle-server.sh
Add client jar to Spark classpath, e.g., SPARK_HOME/jars/
The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-spark2-shaded-${version}.jar
The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-spark3-shaded-${version}.jar
Update Spark conf to enable Uniffle, e.g.,
# Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of # serialized object should be used. spark.serializer org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999 # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
To support spark dynamic allocation with Uniffle, spark code should be updated. There are 7 patches for spark (2.3.4/2.4.6/3.0.1/3.1.2/3.2.1/3.3.1/3.4.1) in patch/spark folder for reference.
After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
spark.shuffle.service.enabled false spark.dynamicAllocation.enabled true
For spark3.5 or above just add one more configuration:
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo
The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar
Update MapReduce conf to enable Uniffle, e.g.,
-Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999 -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
Note that the RssMRAppMaster will automatically disable slow start (i.e., mapreduce.job.reduce.slowstart.completedmaps=1
) and job recovery (i.e., yarn.app.mapreduce.am.job.recovery.enable=false
)
In production mode, you can append client jar (rss-client-tez-XXXXX-shaded.jar) to package which is set by ‘tez.lib.uris’.
In development mode, you can append client jar (rss-client-tez-XXXXX-shaded.jar) to HADOOP_CLASSPATH.
Property Name | Default | Description |
---|---|---|
tez.am.launch.cmd-opts | -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC org.apache.tez.dag.app.RssDAGAppMaster | enable remote shuffle service |
tez.rss.coordinator.quorum | coordinatorIp1:19999,coordinatorIp2:19999 | coordinator address |
Note that the RssDAGAppMaster will automatically disable slow start (i.e., tez.shuffle-vertex-manager.min-src-fraction=1
, tez.shuffle-vertex-manager.max-src-fraction=1
).
We have provided an operator for deploying uniffle in kubernetes environments.
For details, see the following document:
The important configuration is listed as follows.
Role | Link |
---|---|
coordinator | Uniffle Coordinator Guide |
shuffle server | Uniffle Shuffle Server Guide |
client | Uniffle Shuffle Client Guide |
The primary goals of the Uniffle Kerberos security are:
The following security configurations are introduced.
Property Name | Default | Description |
---|---|---|
rss.security.hadoop.kerberos.enable | false | Whether enable access secured hadoop cluster |
rss.security.hadoop.kerberos.krb5-conf.file | - | The file path of krb5.conf. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid |
rss.security.hadoop.kerberos.keytab.file | - | The kerberos keytab file path. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid |
rss.security.hadoop.kerberos.principal | - | The kerberos keytab principal. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid |
rss.security.hadoop.kerberos.relogin.interval.sec | 60 | The kerberos authentication relogin interval. unit: sec |
rss.security.hadoop.kerberos.proxy.user.enable | true | Whether using proxy user for job user to access secured Hadoop cluster. |
We provide some benchmark tests for Uniffle. For details, you can see Uniffle Benchmark
Uniffle is under the Apache License Version 2.0. See the LICENSE file for details.
For more information about contributing issues or pull requests, see Uniffle Contributing Guide.