# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
A simple example demonstrating Arrow in Spark.
Run with:
./bin/spark-submit examples/src/main/python/sql/
# NOTE that this file is imported in user guide in PySpark documentation.
# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx.
import pandas as pd
from typing import Iterable
from pyspark.sql import SparkSession
from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
def dataframe_to_arrow_table_example(spark: SparkSession) -> None:
import pyarrow as pa # noqa: F401
from pyspark.sql.functions import rand
# Create a Spark DataFrame
df = spark.range(100).drop("id").withColumns({"0": rand(), "1": rand(), "2": rand()})
# Convert the Spark DataFrame to a PyArrow Table
table ="*").toArrow()
# 0: double not null
# 1: double not null
# 2: double not null
def dataframe_with_arrow_example(spark: SparkSession) -> None:
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf ="*").toPandas()
print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
def ser_to_frame_pandas_udf_example(spark: SparkSession) -> None:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long") # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)"long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
def ser_to_ser_pandas_udf_example(spark: SparkSession) -> None:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType()) # type: ignore[call-overload]
# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF"x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
def iter_ser_to_iter_ser_pandas_udf_example(spark: SparkSession) -> None:
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in iterator:
yield x + 1"x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
def iter_sers_to_iter_ser_pandas_udf_example(spark: SparkSession) -> None:
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b"x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
def ser_to_scalar_pandas_udf_example(spark: SparkSession) -> None:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double") # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
return v.mean()['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
def grouped_apply_in_pandas_example(spark: SparkSession) -> None:
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
def map_in_pandas_example(spark: SparkSession) -> None:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
for pdf in iterator:
yield pdf[ == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
def cogrouped_apply_in_pandas_example(spark: SparkSession) -> None:
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)
merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# | time| id| v1| v2|
# +--------+---+---+----+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0|null|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0|null|
# +--------+---+---+----+
def arrow_python_udf_example(spark: SparkSession) -> None:
from pyspark.sql.functions import udf
@udf(returnType='int') # A default, pickled Python UDF
def slen(s): # type: ignore[no-untyped-def]
return len(s)
@udf(returnType='int', useArrow=True) # An Arrow Python UDF
def arrow_slen(s): # type: ignore[no-untyped-def]
return len(s)
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))"name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# | 8| 8|
# +----------+----------------+
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Arrow-in-Spark example") \
print("Running Arrow conversion example: DataFrame to Table")
print("Running Pandas to/from conversion example")
print("Running pandas_udf example: Series to Frame")
print("Running pandas_udf example: Series to Series")
print("Running pandas_udf example: Iterator of Series to Iterator of Series")
print("Running pandas_udf example: Iterator of Multiple Series to Iterator of Series")
print("Running pandas_udf example: Series to Scalar")
print("Running pandas function example: Grouped Map")
print("Running pandas function example: Map")
print("Running pandas function example: Co-grouped Map")