[SPARK-45780][CONNECT] Propagate all Spark Connect client threadlocals in InheritableThread
### What changes were proposed in this pull request?
Currently pyspark InheritableThread propagates Spark Connect session.client.thread_local.tags to child threads. Generalize this to propagate all thread locals, and also make a deep copy, just like the scala equivalent does a clone.
### Why are the changes needed?
Generalize the mechanism of SparkConnectClient.thread_local
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing test for propagating SparkSession tags should cover this.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43649 from juliuszsompolski/SPARK-45780.
Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index 9c70bac..4a828d6 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -16,6 +16,7 @@
# limitations under the License.
#
+import copy
import functools
import itertools
import os
@@ -343,14 +344,19 @@
assert session is not None, "Spark Connect session must be provided."
def outer(ff: Callable) -> Callable:
- if not hasattr(session.client.thread_local, "tags"): # type: ignore[union-attr]
- session.client.thread_local.tags = set() # type: ignore[union-attr]
- tags = set(session.client.thread_local.tags) # type: ignore[union-attr]
+ session_client_thread_local_attrs = [
+ (attr, copy.deepcopy(value))
+ for (
+ attr,
+ value,
+ ) in session.client.thread_local.__dict__.items() # type: ignore[union-attr]
+ ]
@functools.wraps(ff)
def inner(*args: Any, **kwargs: Any) -> Any:
- # Set tags in child thread.
- session.client.thread_local.tags = tags # type: ignore[union-attr]
+ # Set thread locals in child thread.
+ for attr, value in session_client_thread_local_attrs:
+ setattr(session.client.thread_local, attr, value) # type: ignore[union-attr]
return ff(*args, **kwargs)
return inner