commit | d2e8c1cb60e34a1c7e92374c07d682aa5ca79145 | [log] [tgz] |
---|---|---|
author | Julek Sompolski <Juliusz Sompolski> | Mon Sep 23 12:39:02 2024 +0900 |
committer | Hyukjin Kwon <gurwls223@apache.org> | Mon Sep 23 12:39:02 2024 +0900 |
tree | ed2fa927efba53507f880c825bd3ab4d787f26d9 | |
parent | 3c81f076ab9c72514cfc8372edd16e6da7c151d6 [diff] |
[SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan ### What changes were proposed in this pull request? Save the RDD created by doExecute, instead of creating a new one in execute each time. Currently, many types of SparkPlans already save the RDD they create. For example, shuffle just save `lazy val inputRDD: RDD[InternalRow] = child.execute()`. It creates inconsistencies when an action (e.g. repeated `df.collect()`) is executed on Dataframe twice: * The SparkPlan will be reused, since the same `df.queryExecution.executedPlan` will be used. * Any not-result stage will be reused, as the shuffle operators will just have their `inputRDD` reused. * However, for result stage, `execute()` will call `doExecute()` again, and the logic of generating the actual execution RDD will be reexecuted for the result stage. This means that for example for the result stage, WSCG code gen will generate and compile new code, create a new RDD out of it. Generation of execution RDDs is also often influenced by config: for example, staying with WSCG, various configs like `spark.sql.codegen.hugeMethodLimit` or `spark.sql.codegen.methodSplitThreshold`. The fact that upon re-execution this will be evaluated anew for the result stage, but not for earlier stages creates inconsistencies in what config changes are visible. By saving the result of `doExecute` and reusing the RDD in `execute` we make sure that work in creating that RDD is not duplicated, and it is more consistent that all RDDs of the plan are reused, same as with the `executedPlan`. Note, that while the results of earlier shuffle stages are also reused, the result stage still does get executed again, as the result of it are not saved and available for Reuse in BlockManager. We also add a `Lazy` utility instead of using `lazy val` to deal with shortcomings of scala lazy val. ### Why are the changes needed? Resolved subtle inconsistencies coming from object reuse vs. recreating objects from scratch. ### Does this PR introduce _any_ user-facing change? Subtle changes caused by the RDD being reused, e.g. when a config change might be picked up. However, it makes things more consistent. Spark 4.0.0 might be a good candidate for making such a change. ### How was this patch tested? Existing SQL execution tests validate that the change in SparkPlan works. Tests were added for the new Lazy utility. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Github Copilot (trivial code completion suggestions) Closes #48037 from juliuszsompolski/SPARK-48195-rdd. Lead-authored-by: Julek Sompolski <Juliusz Sompolski> Co-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.
You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.
Spark is built using Apache Maven. To build Spark and its example programs, run:
./build/mvn -DskipTests clean package
(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at “Building Spark”.
For general development tips, including info on developing Spark using an IDE, see “Useful Developer Tools”.
The easiest way to start using Spark is through the Scala shell:
./bin/spark-shell
Try the following command, which should return 1,000,000,000:
scala> spark.range(1000 * 1000 * 1000).count()
Alternatively, if you prefer Python, you can use the Python shell:
./bin/pyspark
And run the following command, which should also return 1,000,000,000:
>>> spark.range(1000 * 1000 * 1000).count()
Spark also comes with several sample programs in the examples
directory. To run one of them, use ./bin/run-example <class> [params]
. For example:
./bin/run-example SparkPi
will run the Pi example locally.
You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be spark:// URL, “yarn” to run on YARN, and “local” to run locally with one thread, or “local[N]” to run locally with N threads. You can also use an abbreviated class name if the class is in the examples
package. For instance:
MASTER=spark://host:7077 ./bin/run-example SparkPi
Many of the example programs print usage help if no params are given.
Testing first requires building Spark. Once Spark is built, tests can be run using:
./dev/run-tests
Please see the guidance on how to run tests for a module, or individual tests.
There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.
Please refer to the build documentation at “Specifying the Hadoop Version and Enabling YARN” for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions.
Please refer to the Configuration Guide in the online documentation for an overview on how to configure Spark.
Please review the Contribution to Spark guide for information on how to get started contributing to the project.