| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| - case: scalarUDF |
| main: | |
| from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType |
| import pandas.core.series |
| import pandas.core.frame |
| |
| @pandas_udf("string", PandasUDFType.SCALAR) |
| def f(x: pandas.core.series.Series) -> pandas.core.series.Series: |
| return x |
| |
| @pandas_udf("string", PandasUDFType.SCALAR) |
| def g(x: pandas.core.series.Series, y: pandas.core.series.Series) -> pandas.core.series.Series: |
| return x |
| |
| @pandas_udf("string", PandasUDFType.SCALAR) |
| def h(*xs: pandas.core.series.Series) -> pandas.core.series.Series: |
| return xs[0] |
| |
| @pandas_udf("x string, y string", PandasUDFType.SCALAR) |
| def k(x: pandas.core.frame.DataFrame, y: pandas.core.series.Series) -> pandas.core.frame.DataFrame: |
| return x |
| |
| pandas_udf(lambda x: x, "string", PandasUDFType.SCALAR) |
| pandas_udf(lambda x, y: x, "string", PandasUDFType.SCALAR) |
| pandas_udf(lambda *xs: xs[0], "string", PandasUDFType.SCALAR) |
| |
| |
| - case: scalarIterUDF |
| main: | |
| from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType |
| from pyspark.sql.types import IntegerType |
| import pandas.core.series |
| from typing import Iterator |
| |
| @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER) |
| def f(xs: Iterator[pandas.core.series.Series]) -> Iterator[pandas.core.series.Series]: |
| for x in xs: |
| yield x + 1 |
| |
| |
| - case: groupedMapUdf |
| main: | |
| from typing import Any |
| |
| from pyspark.sql.session import SparkSession |
| from pyspark.sql.types import StructField, StructType, LongType |
| from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType |
| import pandas.core.frame |
| |
| @pandas_udf("id long", PandasUDFType.GROUPED_MAP) |
| def f(pdf: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame: |
| return pdf |
| |
| spark = SparkSession.builder.getOrCreate() |
| |
| dfg = spark.range(1).groupBy("id") |
| dfg.apply(f) |
| |
| @pandas_udf("id long", PandasUDFType.GROUPED_MAP) |
| def g(key: Any, pdf: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame: |
| return pdf |
| |
| dfg.apply(g) |
| |
| |
| def h(pdf: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame: |
| return pdf |
| |
| dfg.applyInPandas(h, "id long") |
| dfg.applyInPandas(h, StructType([StructField("id", LongType())])) |
| |
| |
| - case: groupedAggUDF |
| main: | |
| # Let's keep this one to make sure compatibility imports work |
| from pyspark.sql.functions import pandas_udf, PandasUDFType |
| from pyspark.sql.types import IntegerType |
| import pandas.core.series |
| |
| @pandas_udf(IntegerType(), PandasUDFType.GROUPED_AGG) |
| def f(x: pandas.core.series.Series) -> int: |
| return 42 |
| |
| @pandas_udf("int", PandasUDFType.GROUPED_AGG) |
| def g(x: pandas.core.series.Series, y: pandas.core.series.Series) -> int: |
| return 42 |
| |
| @pandas_udf("int", PandasUDFType.GROUPED_AGG) |
| def h(*xs: pandas.core.series.Series) -> int: |
| return 42 |
| |
| pandas_udf(lambda x: 42, "str", PandasUDFType.GROUPED_AGG) |
| pandas_udf(lambda x, y: 42, "str", PandasUDFType.GROUPED_AGG) |
| pandas_udf(lambda *xs: 42, "str", PandasUDFType.GROUPED_AGG) |
| |
| |
| - case: mapIterUdf |
| main: | |
| from pyspark.sql.session import SparkSession |
| from typing import Iterator |
| import pandas.core.frame |
| |
| spark = SparkSession.builder.getOrCreate() |
| |
| def f(batch_iter: Iterator[pandas.core.frame.DataFrame]) -> Iterator[pandas.core.frame.DataFrame]: |
| for pdf in batch_iter: |
| yield pdf[pdf.id == 1] |
| |
| spark.range(1).mapInPandas(f, "id long").show() |
| |
| |
| - case: legacyUDF |
| main: | |
| from pyspark.sql.functions import udf |
| from pyspark.sql.types import IntegerType |
| |
| udf(lambda x: x, "string") |
| |
| udf(lambda x: x) |
| |
| @udf("string") |
| def f(x: str) -> str: |
| return x |
| |
| @udf(returnType="string") |
| def g(x: str) -> str: |
| return x |
| |
| @udf(returnType=IntegerType()) |
| def h(x: int) -> int: |
| return x |
| |
| @udf |
| def i(x: str) -> str: |
| return x |
| |
| |
| - case: cogroupedAggUdf |
| main: | |
| from pyspark.sql.session import SparkSession |
| import pandas.core.frame |
| from pyspark.sql.types import StructType, StructField, LongType |
| |
| spark = SparkSession.builder.getOrCreate() |
| |
| dfg1 = spark.range(1).groupBy("id") |
| dfg2 = spark.range(1).groupBy("id") |
| |
| def f(x: pandas.core.frame.DataFrame, y: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame: |
| return x |
| |
| dfg1.cogroup(dfg2).applyInPandas(f, "id int") |
| dfg1.cogroup(dfg2).applyInPandas(f, StructType([StructField("id", LongType())])) |