|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | __all__ = ["SparkConf"] | 
|  |  | 
|  | import sys | 
|  | from typing import Dict, List, Optional, Tuple, cast, overload, TYPE_CHECKING | 
|  |  | 
|  | from pyspark.util import is_remote_only | 
|  | from pyspark.errors import PySparkRuntimeError | 
|  |  | 
|  | if TYPE_CHECKING: | 
|  | from py4j.java_gateway import JVMView, JavaObject | 
|  |  | 
|  |  | 
|  | class SparkConf: | 
|  | """ | 
|  | Configuration for a Spark application. Used to set various Spark | 
|  | parameters as key-value pairs. | 
|  |  | 
|  | Most of the time, you would create a SparkConf object with | 
|  | ``SparkConf()``, which will load values from `spark.*` Java system | 
|  | properties as well. In this case, any parameters you set directly on | 
|  | the :class:`SparkConf` object take priority over system properties. | 
|  |  | 
|  | For unit tests, you can also call ``SparkConf(false)`` to skip | 
|  | loading external settings and get the same configuration no matter | 
|  | what the system properties are. | 
|  |  | 
|  | All setter methods in this class support chaining. For example, | 
|  | you can write ``conf.setMaster("local").setAppName("My app")``. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | loadDefaults : bool | 
|  | whether to load values from Java system properties (True by default) | 
|  | _jvm : class:`py4j.java_gateway.JVMView` | 
|  | internal parameter used to pass a handle to the | 
|  | Java VM; does not need to be set by users | 
|  | _jconf : class:`py4j.java_gateway.JavaObject` | 
|  | Optionally pass in an existing SparkConf handle | 
|  | to use its parameters | 
|  |  | 
|  | Notes | 
|  | ----- | 
|  | Once a SparkConf object is passed to Spark, it is cloned | 
|  | and can no longer be modified by the user. | 
|  |  | 
|  | Examples | 
|  | -------- | 
|  | >>> from pyspark import SparkConf, SparkContext | 
|  | >>> conf = SparkConf() | 
|  | >>> conf.setMaster("local").setAppName("My app") | 
|  | <pyspark.conf.SparkConf object at ...> | 
|  | >>> conf.get("spark.master") | 
|  | 'local' | 
|  | >>> conf.get("spark.app.name") | 
|  | 'My app' | 
|  | >>> sc = SparkContext(conf=conf) | 
|  | >>> sc.master | 
|  | 'local' | 
|  | >>> sc.appName | 
|  | 'My app' | 
|  | >>> sc.sparkHome is None | 
|  | True | 
|  |  | 
|  | >>> conf = SparkConf(loadDefaults=False) | 
|  | >>> conf.setSparkHome("/path") | 
|  | <pyspark.conf.SparkConf object at ...> | 
|  | >>> conf.get("spark.home") | 
|  | '/path' | 
|  | >>> conf.setExecutorEnv("VAR1", "value1") | 
|  | <pyspark.conf.SparkConf object at ...> | 
|  | >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) | 
|  | <pyspark.conf.SparkConf object at ...> | 
|  | >>> conf.get("spark.executorEnv.VAR1") | 
|  | 'value1' | 
|  | >>> print(conf.toDebugString()) | 
|  | spark.executorEnv.VAR1=value1 | 
|  | spark.executorEnv.VAR3=value3 | 
|  | spark.executorEnv.VAR4=value4 | 
|  | spark.home=/path | 
|  | >>> for p in sorted(conf.getAll(), key=lambda p: p[0]): | 
|  | ...     print(p) | 
|  | ('spark.executorEnv.VAR1', 'value1') | 
|  | ('spark.executorEnv.VAR3', 'value3') | 
|  | ('spark.executorEnv.VAR4', 'value4') | 
|  | ('spark.home', '/path') | 
|  | >>> conf._jconf.setExecutorEnv("VAR5", "value5") | 
|  | JavaObject id... | 
|  | >>> print(conf.toDebugString()) | 
|  | spark.executorEnv.VAR1=value1 | 
|  | spark.executorEnv.VAR3=value3 | 
|  | spark.executorEnv.VAR4=value4 | 
|  | spark.executorEnv.VAR5=value5 | 
|  | spark.home=/path | 
|  | """ | 
|  |  | 
|  | _jconf: Optional["JavaObject"] | 
|  | _conf: Optional[Dict[str, str]] | 
|  |  | 
|  | def __init__( | 
|  | self, | 
|  | loadDefaults: bool = True, | 
|  | _jvm: Optional["JVMView"] = None, | 
|  | _jconf: Optional["JavaObject"] = None, | 
|  | ): | 
|  | """ | 
|  | Create a new Spark configuration. | 
|  | """ | 
|  | if _jconf: | 
|  | self._jconf = _jconf | 
|  | else: | 
|  | jvm = None | 
|  | if not is_remote_only(): | 
|  | from pyspark.core.context import SparkContext | 
|  |  | 
|  | jvm = _jvm or SparkContext._jvm | 
|  |  | 
|  | if jvm is not None: | 
|  | # JVM is created, so create self._jconf directly through JVM | 
|  | self._jconf = jvm.SparkConf(loadDefaults) | 
|  | self._conf = None | 
|  | else: | 
|  | # JVM is not created, so store data in self._conf first | 
|  | self._jconf = None | 
|  | self._conf = {} | 
|  |  | 
|  | def set(self, key: str, value: str) -> "SparkConf": | 
|  | """Set a configuration property.""" | 
|  | # Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet. | 
|  | if self._jconf is not None: | 
|  | self._jconf.set(key, str(value)) | 
|  | else: | 
|  | assert self._conf is not None | 
|  | self._conf[key] = str(value) | 
|  | return self | 
|  |  | 
|  | def setIfMissing(self, key: str, value: str) -> "SparkConf": | 
|  | """Set a configuration property, if not already set.""" | 
|  | if self.get(key) is None: | 
|  | self.set(key, value) | 
|  | return self | 
|  |  | 
|  | def setMaster(self, value: str) -> "SparkConf": | 
|  | """Set master URL to connect to.""" | 
|  | self.set("spark.master", value) | 
|  | return self | 
|  |  | 
|  | def setAppName(self, value: str) -> "SparkConf": | 
|  | """Set application name.""" | 
|  | self.set("spark.app.name", value) | 
|  | return self | 
|  |  | 
|  | def setSparkHome(self, value: str) -> "SparkConf": | 
|  | """Set path where Spark is installed on worker nodes.""" | 
|  | self.set("spark.home", value) | 
|  | return self | 
|  |  | 
|  | @overload | 
|  | def setExecutorEnv(self, key: str, value: str) -> "SparkConf": | 
|  | ... | 
|  |  | 
|  | @overload | 
|  | def setExecutorEnv(self, *, pairs: List[Tuple[str, str]]) -> "SparkConf": | 
|  | ... | 
|  |  | 
|  | def setExecutorEnv( | 
|  | self, | 
|  | key: Optional[str] = None, | 
|  | value: Optional[str] = None, | 
|  | pairs: Optional[List[Tuple[str, str]]] = None, | 
|  | ) -> "SparkConf": | 
|  | """Set an environment variable to be passed to executors.""" | 
|  | if (key is not None and pairs is not None) or (key is None and pairs is None): | 
|  | raise PySparkRuntimeError( | 
|  | errorClass="KEY_VALUE_PAIR_REQUIRED", | 
|  | messageParameters={}, | 
|  | ) | 
|  | elif key is not None: | 
|  | self.set("spark.executorEnv.{}".format(key), cast(str, value)) | 
|  | elif pairs is not None: | 
|  | for k, v in pairs: | 
|  | self.set("spark.executorEnv.{}".format(k), v) | 
|  | return self | 
|  |  | 
|  | def setAll(self, pairs: List[Tuple[str, str]]) -> "SparkConf": | 
|  | """ | 
|  | Set multiple parameters, passed as a list of key-value pairs. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | pairs : iterable of tuples | 
|  | list of key-value pairs to set | 
|  | """ | 
|  | for k, v in pairs: | 
|  | self.set(k, v) | 
|  | return self | 
|  |  | 
|  | @overload | 
|  | def get(self, key: str) -> Optional[str]: | 
|  | ... | 
|  |  | 
|  | @overload | 
|  | def get(self, key: str, defaultValue: None) -> Optional[str]: | 
|  | ... | 
|  |  | 
|  | @overload | 
|  | def get(self, key: str, defaultValue: str) -> str: | 
|  | ... | 
|  |  | 
|  | def get(self, key: str, defaultValue: Optional[str] = None) -> Optional[str]: | 
|  | """Get the configured value for some key, or return a default otherwise.""" | 
|  | if defaultValue is None:  # Py4J doesn't call the right get() if we pass None | 
|  | if self._jconf is not None: | 
|  | if not self._jconf.contains(key): | 
|  | return None | 
|  | return self._jconf.get(key) | 
|  | else: | 
|  | assert self._conf is not None | 
|  | return self._conf.get(key, None) | 
|  | else: | 
|  | if self._jconf is not None: | 
|  | return self._jconf.get(key, defaultValue) | 
|  | else: | 
|  | assert self._conf is not None | 
|  | return self._conf.get(key, defaultValue) | 
|  |  | 
|  | def getAll(self) -> List[Tuple[str, str]]: | 
|  | """Get all values as a list of key-value pairs.""" | 
|  | if self._jconf is not None: | 
|  | from py4j.java_gateway import JavaObject | 
|  |  | 
|  | return [(elem._1(), elem._2()) for elem in cast(JavaObject, self._jconf).getAll()] | 
|  | else: | 
|  | assert self._conf is not None | 
|  | return list(self._conf.items()) | 
|  |  | 
|  | def contains(self, key: str) -> bool: | 
|  | """Does this configuration contain a given key?""" | 
|  | if self._jconf is not None: | 
|  | return self._jconf.contains(key) | 
|  | else: | 
|  | assert self._conf is not None | 
|  | return key in self._conf | 
|  |  | 
|  | def toDebugString(self) -> str: | 
|  | """ | 
|  | Returns a printable version of the configuration, as a list of | 
|  | key=value pairs, one per line. | 
|  | """ | 
|  | if self._jconf is not None: | 
|  | return self._jconf.toDebugString() | 
|  | else: | 
|  | assert self._conf is not None | 
|  | return "\n".join("%s=%s" % (k, v) for k, v in self._conf.items()) | 
|  |  | 
|  |  | 
|  | def _test() -> None: | 
|  | import doctest | 
|  |  | 
|  | (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) | 
|  | if failure_count: | 
|  | sys.exit(-1) | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | _test() |