blob: ab4843f8473b1aa16b5187bdb9533a8df85001d0 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
- case: scalarUDF
main: |
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
import pandas.core.series
import pandas.core.frame
@pandas_udf("string", PandasUDFType.SCALAR)
def f(x: pandas.core.series.Series) -> pandas.core.series.Series:
return x
@pandas_udf("string", PandasUDFType.SCALAR)
def g(x: pandas.core.series.Series, y: pandas.core.series.Series) -> pandas.core.series.Series:
return x
@pandas_udf("string", PandasUDFType.SCALAR)
def h(*xs: pandas.core.series.Series) -> pandas.core.series.Series:
return xs[0]
@pandas_udf("x string, y string", PandasUDFType.SCALAR)
def k(x: pandas.core.frame.DataFrame, y: pandas.core.series.Series) -> pandas.core.frame.DataFrame:
return x
pandas_udf(lambda x: x, "string", PandasUDFType.SCALAR)
pandas_udf(lambda x, y: x, "string", PandasUDFType.SCALAR)
pandas_udf(lambda *xs: xs[0], "string", PandasUDFType.SCALAR)
- case: scalarIterUDF
main: |
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType
import pandas.core.series
from typing import Iterator
@pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER)
def f(xs: Iterator[pandas.core.series.Series]) -> Iterator[pandas.core.series.Series]:
for x in xs:
yield x + 1
- case: groupedMapUdf
main: |
from typing import Any
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructField, StructType, LongType
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
import pandas.core.frame
@pandas_udf("id long", PandasUDFType.GROUPED_MAP)
def f(pdf: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
return pdf
spark = SparkSession.builder.getOrCreate()
dfg = spark.range(1).groupBy("id")
dfg.apply(f)
@pandas_udf("id long", PandasUDFType.GROUPED_MAP)
def g(key: Any, pdf: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
return pdf
dfg.apply(g)
def h(pdf: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
return pdf
dfg.applyInPandas(h, "id long")
dfg.applyInPandas(h, StructType([StructField("id", LongType())]))
- case: groupedAggUDF
main: |
# Let's keep this one to make sure compatibility imports work
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType
import pandas.core.series
@pandas_udf(IntegerType(), PandasUDFType.GROUPED_AGG)
def f(x: pandas.core.series.Series) -> int:
return 42
@pandas_udf("int", PandasUDFType.GROUPED_AGG)
def g(x: pandas.core.series.Series, y: pandas.core.series.Series) -> int:
return 42
@pandas_udf("int", PandasUDFType.GROUPED_AGG)
def h(*xs: pandas.core.series.Series) -> int:
return 42
pandas_udf(lambda x: 42, "str", PandasUDFType.GROUPED_AGG)
pandas_udf(lambda x, y: 42, "str", PandasUDFType.GROUPED_AGG)
pandas_udf(lambda *xs: 42, "str", PandasUDFType.GROUPED_AGG)
- case: mapIterUdf
main: |
from pyspark.sql.session import SparkSession
from typing import Iterator
import pandas.core.frame
spark = SparkSession.builder.getOrCreate()
def f(batch_iter: Iterator[pandas.core.frame.DataFrame]) -> Iterator[pandas.core.frame.DataFrame]:
for pdf in batch_iter:
yield pdf[pdf.id == 1]
spark.range(1).mapInPandas(f, "id long").show()
- case: legacyUDF
main: |
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
udf(lambda x: x, "string")
udf(lambda x: x)
@udf("string")
def f(x: str) -> str:
return x
@udf(returnType="string")
def g(x: str) -> str:
return x
@udf(returnType=IntegerType())
def h(x: int) -> int:
return x
@udf
def i(x: str) -> str:
return x
- case: cogroupedAggUdf
main: |
from pyspark.sql.session import SparkSession
import pandas.core.frame
from pyspark.sql.types import StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
dfg1 = spark.range(1).groupBy("id")
dfg2 = spark.range(1).groupBy("id")
def f(x: pandas.core.frame.DataFrame, y: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
return x
dfg1.cogroup(dfg2).applyInPandas(f, "id int")
dfg1.cogroup(dfg2).applyInPandas(f, StructType([StructField("id", LongType())]))