| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| - case: toDF |
| main: | |
| from pyspark.sql.types import ( |
| IntegerType, |
| Row, |
| StructType, |
| StringType, |
| StructField, |
| ) |
| from collections import namedtuple |
| from pyspark.sql import SparkSession |
| |
| spark = SparkSession.builder.getOrCreate() |
| sc = spark.sparkContext |
| |
| struct = StructType([ |
| StructField("a", IntegerType()), |
| StructField("b", StringType()) |
| ]) |
| |
| AB = namedtuple("AB", ["a", "b"]) |
| |
| rdd_row = sc.parallelize([Row(a=1, b="foo")]) |
| rdd_row.toDF() |
| rdd_row.toDF(sampleRatio=0.4) |
| rdd_row.toDF(["a", "b"], sampleRatio=0.4) |
| rdd_row.toDF(struct) |
| |
| rdd_tuple = sc.parallelize([(1, "foo")]) |
| rdd_tuple.toDF() |
| rdd_tuple.toDF(sampleRatio=0.4) |
| rdd_tuple.toDF(["a", "b"], sampleRatio=0.4) |
| rdd_tuple.toDF(struct) |
| |
| rdd_list = sc.parallelize([[1, "foo"]]) |
| rdd_list.toDF() |
| rdd_list.toDF(sampleRatio=0.4) |
| rdd_list.toDF(["a", "b"], sampleRatio=0.4) |
| rdd_list.toDF(struct) |
| |
| rdd_named_tuple = sc.parallelize([AB(1, "foo")]) |
| rdd_named_tuple.toDF() |
| rdd_named_tuple.toDF(sampleRatio=0.4) |
| rdd_named_tuple.toDF(["a", "b"], sampleRatio=0.4) |
| rdd_named_tuple.toDF(struct) |
| |
| |
| - case: rddMethods |
| main: | |
| from operator import add |
| from typing import Iterable, Set, Tuple |
| from pyspark.sql import SparkSession |
| |
| spark = SparkSession.builder.getOrCreate() |
| sc = spark.sparkContext |
| |
| def f1(x: int) -> str: |
| return str(x) |
| |
| reveal_type(sc.range(10).map(f1)) |
| |
| def f2(x: int) -> Iterable[int]: |
| return range(x) |
| |
| reveal_type(sc.range(10).flatMap(f2)) |
| |
| reveal_type(sc.parallelize([("a", 1), ("b", 0)]).filter(lambda x: x[1] != 0)) |
| |
| reveal_type(sc.parallelize([("a", 1), ("b", 0)]).max()) |
| |
| reveal_type(sc.range(10).reduce(add)) |
| |
| def seq_func(xs: Set[str], x: int) -> Set[str]: |
| xs.add(str(x % 11)) |
| return xs |
| |
| def comb_func(xs: Set[str], ys: Set[str]) -> Set[str]: |
| xs.update(ys) |
| return xs |
| |
| zero: Set[str] = set() |
| |
| reveal_type(sc.parallelize([("a", 1)]).aggregateByKey(zero, seq_func, comb_func)) |
| |
| out: | |
| main:11: note: Revealed type is "pyspark.core.rdd.RDD[builtins.str]" |
| main:16: note: Revealed type is "pyspark.core.rdd.RDD[builtins.int]" |
| main:18: note: Revealed type is "pyspark.core.rdd.RDD[tuple[builtins.str, builtins.int]]" |
| main:20: note: Revealed type is "tuple[builtins.str, builtins.int]" |
| main:22: note: Revealed type is "builtins.int" |
| main:34: note: Revealed type is "pyspark.core.rdd.RDD[tuple[builtins.str, builtins.set[builtins.str]]]" |
| |
| - case: rddMethodsErrors |
| main: | |
| from pyspark.sql import SparkSession |
| |
| spark = SparkSession.builder.getOrCreate() |
| sc = spark.sparkContext |
| |
| def f1(x: str) -> str: |
| return x |
| |
| sc.range(10).map(f1) |
| |
| def f2(x: int) -> str: |
| return str(x) |
| |
| sc.range(10).reduce(f2) |
| |
| out: | |
| main:9: error: Argument 1 to "map" of "RDD" has incompatible type "Callable[[str], str]"; expected "Callable[[int], str]" [arg-type] |
| main:14: error: Argument 1 to "reduce" of "RDD" has incompatible type "Callable[[int], str]"; expected "Callable[[int, int], int]" [arg-type] |