"text": "%md\n\nThis tutorial is for how to customize pyspark runtime environment via conda in yarn-cluster mode.\nIn this approach, the spark interpreter (driver) and spark executor all run in yarn containers. \nAnd remmeber this approach only works when ipython is enabled, so make sure you include the following python packages in your conda env which are required for ipython.\n\n* jupyter\n* grpcio\n* protobuf\n\nThis turorial is only verified with spark 3.1.2, other versions of spark may not work especially when using pyarrow.\n\n",
"title": "Create Python conda env",
"text": "%sh\n\n# make sure you have conda and momba installed.\n# install miniconda:\n# install mamba:\n\necho \"name: pyspark_env\nchannels:\n - conda-forge\n - defaults\ndependencies:\n - python\u003d3.8 \n - jupyter\n - grpcio\n - protobuf\n - pandasql\n - pycodestyle\n # use numpy \u003c 1.20, otherwise the following pandas udf example will fail, see\n - numpy\u003d\u003d1.19.5 \n # other versions of pandas may not work with pyarrow\n - pandas\u003d\u003d0.25.3\n - scipy\n - panel\n - pyyaml\n - seaborn\n - plotnine\n - hvplot\n - intake\n - intake-parquet\n - intake-xarray\n - altair\n - vega_datasets\n - pyarrow\u003d\u003d1.0.1\" \u003e pyspark_env.yml\n \nmamba env remove -n pyspark_env\nmamba env create -f pyspark_env.yml\n",
Remove all packages in environment /mnt/disk1/jzhang/miniconda3/envs/pyspark_env:

Transaction

 Prefix: /mnt/disk1/jzhang/miniconda3/envs/pyspark_env

 Package Version Build Channel Size
──────────────────────────────────────────────────────────────────────────────────────────────────────────
 Install:
──────────────────────────────────────────────────────────────────────────────────────────────────────────

 + _libgcc_mutex 0.1 conda_forge conda-forge/linux-64 Cached
 + _openmp_mutex 4.5 1_gnu + zstd 1.5.0 ha95c52a_0 conda-forge/linux-64 Cached

 Summary:

 Install: 259 packages

 Total download: 0 B

──────────────────────────────────────────────────────────────────────────────────────────────────────────


Looking for: ['python=3.8', 'jupyter', 'grpcio', 'protobuf', 'pandasql', 'pycodestyle', 'numpy==1.19.5', 'pandas==0.25.3', 'scipy', 'panel', 'pyyaml', 'seaborn', 'plotnine', 'hvplot', 'intake', 'intake-parquet', 'intake-xarray', 'altair', 'vega_datasets', 'pyarrow==1.0.1']


Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... "title": "Create Python conda tar",
"text": "%sh\n\nrm -rf pyspark_env.tar.gz\nconda pack -n pyspark_env\n",
"title": "Upload Python conda tar to hdfs",
"text": "%sh\n\nhadoop fs -rmr /tmp/pyspark_env.tar.gz\nhadoop fs -put pyspark_env.tar.gz /tmp\n# The python conda tar should be public accessible, so need to change permission here.\nhadoop fs -chmod 644 /tmp/pyspark_env.tar.gz\n",
"title": "Configure Spark Interpreter",
"text": "%spark.conf\n\n# set the following 2 properties to run spark in yarn-cluster mode\nspark.master yarn\nspark.submit.deployMode cluster\n\nspark.driver.memory 4g\nspark.executor.memory 4g\n\n# spark.yarn.dist.archives can be either local file or hdfs file\nspark.yarn.dist.archives hdfs:///tmp/pyspark_env.tar.gz#environment\n# spark.yarn.dist.archives pyspark_env.tar.gz#environment\n\ environment\n\nspark.sql.execution.arrow.pyspark.enabled true\nspark.sql.execution.arrow.pyspark.fallback.enabled false\n\n# Set the following setting for ARROW if you are using spark 2.x, otherwise using pyarrow udf would fail\n# spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT 1\n# spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT 1\n",
"title": "Use Matplotlib",
"text": "%md\n\nThe following example use matplotlib in pyspark. Here the matplotlib is only used in spark driver.\n",
"title": "Use Matplotlib",
"text": "%spark.pyspark\n\n%matplotlib inline\n\nimport matplotlib.pyplot as plt\n\nplt.plot([1,2,3,4])\nplt.ylabel(\u0027some numbers\u0027)\\n",
"title": "PySpark UDF using Pandas and PyArrow",
"text": "%md\n\nFollowing are examples of using pandas and pyarrow in udf. Here we use python packages in both spark driver and executors. All the examples are from [apache spark official document](",
"title": "Enabling for Conversion to/from Pandas",
"text": "%md\n\nArrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call `DataFrame.toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with `SparkSession.createDataFrame()`. To use Arrow when executing these calls, users need to first set the Spark configuration `spark.sql.execution.arrow.pyspark.enabled` to true. This is disabled by default.\n\nIn addition, optimizations enabled by `spark.sql.execution.arrow.pyspark.enabled` could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by `spark.sql.execution.arrow.pyspark.fallback.enabled`.\n",
"text": "%spark.pyspark\n\nimport pandas as pd\nimport numpy as np\n\n# Generate a Pandas DataFrame\npdf \u003d pd.DataFrame(np.random.rand(100, 3))\n\n# Create a Spark DataFrame from a Pandas DataFrame using Arrow\ndf \u003d spark.createDataFrame(pdf)\n\n# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow\nresult_pdf \u003d\"*\").toPandas()\n",
"title": "Pandas UDFs (a.k.a. Vectorized UDFs)",
"text": "%md\n\nPandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the `pandas_udf()` as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.\n\nBefore Spark 3.0, Pandas UDFs used to be defined with `pyspark.sql.functions.PandasUDFType`. From Spark 3.0 with Python 3.6+, you can also use Python type hints. Using Python type hints is preferred and using `pyspark.sql.functions.PandasUDFType` will be deprecated in the future release.\n\nNote that the type hint should use `pandas.Series` in all cases but there is one variant that `pandas.DataFrame` should be used for its input or output type hint instead when the input or output column is of StructType. The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of `pandas.Series` and `pandas.DataFrame` as below\n",
"text": "%spark.pyspark\n\nimport pandas as pd\n\nfrom pyspark.sql.functions import pandas_udf\n\n@pandas_udf(\"col1 string, col2 long\")\ndef func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -\u003e pd.DataFrame:\n s3[\u0027col2\u0027] \u003d s1 + s2.str.len()\n return s3\n\n# Create a Spark DataFrame that has three columns including a struct column.\ndf \u003d spark.createDataFrame(\n [[1, \"a string\", (\"a nested string\",)]],\n \"long_col long, string_col string, struct_col struct\u003ccol1:string\u003e\")\n\ndf.printSchema()\n\\"long_col\", \"string_col\", \"struct_col\")).printSchema()",
"user": "anonymous",
"title": "Series to Series",
"text": "%md\n\nThe type hint can be expressed as `pandas.Series`, … -\u003e `pandas.Series`.\n\nBy using `pandas_udf()` with the function having such type hints above, it creates a Pandas UDF where the given function takes one or more `pandas.Series` and outputs one `pandas.Series`. The output of the function should always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.\n\nThe following example shows how to create this Pandas UDF that computes the product of 2 columns.",
"user": "anonymous",
"title": "Series to Series",
"text": "%spark.pyspark\n\nimport pandas as pd\n\nfrom pyspark.sql.functions import col, pandas_udf\nfrom pyspark.sql.types import LongType\n\n# Declare the function and create the UDF\ndef multiply_func(a: pd.Series, b: pd.Series) -\u003e pd.Series:\n return a * b\n\nmultiply \u003d pandas_udf(multiply_func, returnType\u003dLongType())\n\n# The function for a pandas_udf should be able to execute with local Pandas data\nx \u003d pd.Series([1, 2, 3])\nprint(multiply_func(x, x))\n# 0 1\n# 1 4\n# 2 9\n# dtype: int64\n\n# Create a Spark DataFrame, \u0027spark\u0027 is an existing SparkSession\ndf \u003d spark.createDataFrame(pd.DataFrame(x, columns\u003d[\"x\"]))\n\n# Execute function as a Spark vectorized UDF\\"x\"), col(\"x\"))).show()\n",
"title": "Iterator of Series to Iterator of Series",
"text": "%md\n\nThe type hint can be expressed as `Iterator[pandas.Series]` -\u003e `Iterator[pandas.Series]`.\n\nBy using `pandas_udf()` with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of pandas.Series and outputs an iterator of `pandas.Series`. The length of the entire output from the function should be the same length of the entire input; therefore, it can prefetch the data from the input iterator as long as the lengths are the same. In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator of Series.\n\nIt is also useful when the UDF execution requires initializing some states although internally it works identically as `Series` to `Series` case. The pseudocode below illustrates the example.\n",
"title": "Iterator of Series to Iterator of Series",
"text": "%spark.pyspark\n\nfrom typing import Iterator\n\nimport pandas as pd\n\nfrom pyspark.sql.functions import pandas_udf\n\npdf \u003d pd.DataFrame([1, 2, 3], columns\u003d[\"x\"])\ndf \u003d spark.createDataFrame(pdf)\n\n# Declare the function and create the UDF\n@pandas_udf(\"long\")\ndef plus_one(iterator: Iterator[pd.Series]) -\u003e Iterator[pd.Series]:\n for x in iterator:\n yield x + 1\n\\"x\")).show()",
"title": "Iterator of Multiple Series to Iterator of Series",
"text": "%md\n\nThe type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -\u003e `Iterator[pandas.Series]`.\n\nBy using `pandas_udf()` with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of a tuple of multiple pandas.Series and outputs an iterator of` pandas.Series`. In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series to Iterator of Series case.\n\nThe following example shows how to create this Pandas UDF:",
"title": "Iterator of Multiple Series to Iterator of Series",
"text": "%spark.pyspark\n\nfrom typing import Iterator, Tuple\n\nimport pandas as pd\n\nfrom pyspark.sql.functions import pandas_udf\n\npdf \u003d pd.DataFrame([1, 2, 3], columns\u003d[\"x\"])\ndf \u003d spark.createDataFrame(pdf)\n\n# Declare the function and create the UDF\n@pandas_udf(\"long\")\ndef multiply_two_cols(\n iterator: Iterator[Tuple[pd.Series, pd.Series]]) -\u003e Iterator[pd.Series]:\n for a, b in iterator:\n yield a * b\n\\"x\", \"x\")).show()",
"title": "Series to Scalar",
"text": "%md\n\nThe type hint can be expressed as `pandas.Series`, … -\u003e `Any`.\n\nBy using `pandas_udf()` with the function having such type hints above, it creates a Pandas UDF similar to PySpark’s aggregate functions. The given function takes pandas.Series and returns a scalar value. The return type should be a primitive data type, and the returned scalar can be either a python primitive type, e.g., int or float or a numpy data type, e.g., numpy.int64 or numpy.float64. Any should ideally be a specific scalar type accordingly.\n\nThis UDF can be also used with `GroupedData.agg()` and Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each `pandas.Series` represents a column within the group or window.\n\nNote that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by and window operations:\n",
"title": "Series to Scalar",
"text": "%spark.pyspark\n\nimport pandas as pd\n\nfrom pyspark.sql.functions import pandas_udf\nfrom pyspark.sql import Window\n\ndf \u003d spark.createDataFrame(\n [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],\n (\"id\", \"v\"))\n\n# Declare the function and create the UDF\n@pandas_udf(\"double\")\ndef mean_udf(v: pd.Series) -\u003e float:\n return v.mean()\n\[\u0027v\u0027])).show()\n\n\ndf.groupby(\"id\").agg(mean_udf(df[\u0027v\u0027])).show()\n\nw \u003d Window \\\n .partitionBy(\u0027id\u0027) \\\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\ndf.withColumn(\u0027mean_v\u0027, mean_udf(df[\u0027v\u0027]).over(w)).show()",
"title": "Pandas Function APIs",
"text": "%md\n\nPandas Function APIs can directly apply a Python native function against the whole DataFrame by using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. However, a Pandas Function API behaves as a regular API under PySpark DataFrame instead of Column, and Python type hints in Pandas Functions APIs are optional and do not affect how it works internally at this moment although they might be required in the future.\n\nFrom Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API, `DataFrame.groupby().applyInPandas()`. It is still possible to use it with `pyspark.sql.functions.PandasUDFType` and `DataFrame.groupby().apply()` as it was; however, it is preferred to use `DataFrame.groupby().applyInPandas()` directly. Using `pyspark.sql.functions.PandasUDFType` will be deprecated in the future\n",
"title": "Grouped Map",
"text": "%md\n\nGrouped map operations with Pandas instances are supported by `DataFrame.groupby().applyInPandas()` which requires a Python function that takes a `pandas.DataFrame` and return another `pandas.DataFrame`. It maps each group to each pandas.DataFrame in the Python function.\n\nThis API implements the “split-apply-combine” pattern which consists of three steps:\n\n* Split the data into groups by using `DataFrame.groupBy()`.\n* Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.\n* Combine the results into a new PySpark DataFrame.\n\nTo use `DataFrame.groupBy().applyInPandas()`, the user needs to define the following:\n\n* A Python function that defines the computation for each group.\n* A StructType object or a string that defines the schema of the output PySpark DataFrame.\n\nThe column labels of the returned `pandas.DataFrame` must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See `pandas.DataFrame` on how to label columns when constructing a `pandas.DataFrame`.\n\nNote that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.\n\nThe following example shows how to use `DataFrame.groupby().applyInPandas()` to subtract the mean from each value in the group.\n\n\n",
"title": "Grouped Map",
"text": "%spark.pyspark\n\ndf \u003d spark.createDataFrame(\n [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],\n (\"id\", \"v\"))\n\ndef subtract_mean(pdf):\n # pdf is a pandas.DataFrame\n v \u003d pdf.v\n return pdf.assign(v\u003dv - v.mean())\n\ndf.groupby(\"id\").applyInPandas(subtract_mean, schema\u003d\"id long, v double\").show()",
"title": "Map",
"text": "%md\n\nMap operations with Pandas instances are supported by `DataFrame.mapInPandas()` which maps an iterator of pandas.DataFrames to another iterator of `pandas.DataFrames` that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame. The function takes and outputs an iterator of `pandas.DataFrame`. It can return the output of arbitrary length in contrast to some Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.\n\nThe following example shows how to use `DataFrame.mapInPandas()`:\n",
"title": "Map",
"text": "%spark.pyspark\n\ndf \u003d spark.createDataFrame([(1, 21), (2, 30)], (\"id\", \"age\"))\n\ndef filter_func(iterator):\n for pdf in iterator:\n yield pdf[ \u003d\u003d 1]\n\ndf.mapInPandas(filter_func, schema\u003ddf.schema).show()",
"title": "Co-grouped Map",
"text": "%md\n\n\u003cbr/\u003e\n\nCo-grouped map operations with Pandas instances are supported by `DataFrame.groupby().cogroup().applyInPandas()` which allows two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. It consists of the following steps:\n\n* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.\n* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional tuple representing the key). The output of the function is a `pandas.DataFrame`.\n* Combine the `pandas.DataFrames` from all groups into a new PySpark DataFrame.\n\nTo use `groupBy().cogroup().applyInPandas()`, the user needs to define the following:\n\n* A Python function that defines the computation for each cogroup.\n* A StructType object or a string that defines the schema of the output PySpark DataFrame.\n\nThe column labels of the returned `pandas.DataFrame` must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See `pandas.DataFrame`. on how to label columns when constructing a pandas.DataFrame.\n\nNote that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.\n\nThe following example shows how to use `DataFrame.groupby().cogroup().applyInPandas()` to perform an asof join between two datasets.",
"title": "Co-grouped Map",
"text": "%spark.pyspark\n\nimport pandas as pd\n\ndf1 \u003d spark.createDataFrame(\n [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],\n (\"time\", \"id\", \"v1\"))\n\ndf2 \u003d spark.createDataFrame(\n [(20000101, 1, \"x\"), (20000101, 2, \"y\")],\n (\"time\", \"id\", \"v2\"))\n\ndef asof_join(l, r):\n return pd.merge_asof(l, r, on\u003d\"time\", by\u003d\"id\")\n\ndf1.groupby(\"id\").cogroup(df2.groupby(\"id\")).applyInPandas(\n asof_join, schema\u003d\"time int, id int, v1 double, v2 string\").show()",
"text": "%spark.pyspark\n",
