| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| A simple example demonstrating Spark SQL data sources. |
| Run with: |
| ./bin/spark-submit examples/src/main/python/sql/datasource.py |
| """ |
| from pyspark.sql import SparkSession |
| # $example on:schema_merging$ |
| from pyspark.sql import Row |
| # $example off:schema_merging$ |
| |
| |
| def generic_file_source_options_example(spark: SparkSession) -> None: |
| # $example on:ignore_corrupt_files$ |
| # enable ignore corrupt files via the data source option |
| # dir1/file3.json is corrupt from parquet's view |
| test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\ |
| .parquet("examples/src/main/resources/dir1/", |
| "examples/src/main/resources/dir1/dir2/") |
| test_corrupt_df0.show() |
| # +-------------+ |
| # | file| |
| # +-------------+ |
| # |file1.parquet| |
| # |file2.parquet| |
| # +-------------+ |
| |
| # enable ignore corrupt files via the configuration |
| spark.sql("set spark.sql.files.ignoreCorruptFiles=true") |
| # dir1/file3.json is corrupt from parquet's view |
| test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/", |
| "examples/src/main/resources/dir1/dir2/") |
| test_corrupt_df1.show() |
| # +-------------+ |
| # | file| |
| # +-------------+ |
| # |file1.parquet| |
| # |file2.parquet| |
| # +-------------+ |
| # $example off:ignore_corrupt_files$ |
| |
| # $example on:recursive_file_lookup$ |
| recursive_loaded_df = spark.read.format("parquet")\ |
| .option("recursiveFileLookup", "true")\ |
| .load("examples/src/main/resources/dir1") |
| recursive_loaded_df.show() |
| # +-------------+ |
| # | file| |
| # +-------------+ |
| # |file1.parquet| |
| # |file2.parquet| |
| # +-------------+ |
| # $example off:recursive_file_lookup$ |
| spark.sql("set spark.sql.files.ignoreCorruptFiles=false") |
| |
| # $example on:load_with_path_glob_filter$ |
| df = spark.read.load("examples/src/main/resources/dir1", |
| format="parquet", pathGlobFilter="*.parquet") |
| df.show() |
| # +-------------+ |
| # | file| |
| # +-------------+ |
| # |file1.parquet| |
| # +-------------+ |
| # $example off:load_with_path_glob_filter$ |
| |
| # $example on:load_with_modified_time_filter$ |
| # Only load files modified before 07/1/2050 @ 08:30:00 |
| df = spark.read.load("examples/src/main/resources/dir1", |
| format="parquet", modifiedBefore="2050-07-01T08:30:00") |
| df.show() |
| # +-------------+ |
| # | file| |
| # +-------------+ |
| # |file1.parquet| |
| # +-------------+ |
| # Only load files modified after 06/01/2050 @ 08:30:00 |
| df = spark.read.load("examples/src/main/resources/dir1", |
| format="parquet", modifiedAfter="2050-06-01T08:30:00") |
| df.show() |
| # +-------------+ |
| # | file| |
| # +-------------+ |
| # +-------------+ |
| # $example off:load_with_modified_time_filter$ |
| |
| |
| def basic_datasource_example(spark: SparkSession) -> None: |
| # $example on:generic_load_save_functions$ |
| users_df = spark.read.load("examples/src/main/resources/users.parquet") |
| users_df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") |
| # $example off:generic_load_save_functions$ |
| |
| # $example on:write_partitioning$ |
| users_df = spark.read.load("examples/src/main/resources/users.parquet") |
| users_df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") |
| # $example off:write_partitioning$ |
| |
| # $example on:write_partition_and_bucket$ |
| users_df = spark.read.parquet("examples/src/main/resources/users.parquet") |
| (users_df.write |
| .partitionBy("favorite_color") |
| .bucketBy(42, "name") |
| .saveAsTable("users_partitioned_bucketed")) |
| # $example off:write_partition_and_bucket$ |
| |
| # $example on:manual_load_options$ |
| people_df = spark.read.load("examples/src/main/resources/people.json", format="json") |
| people_df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") |
| # $example off:manual_load_options$ |
| |
| # $example on:manual_load_options_csv$ |
| people_df = spark.read.load( |
| "examples/src/main/resources/people.csv", |
| format="csv", |
| sep=";", |
| inferSchema="true", |
| header="true" |
| ) |
| # $example off:manual_load_options_csv$ |
| |
| # $example on:manual_save_options_orc$ |
| users_df = spark.read.orc("examples/src/main/resources/users.orc") |
| (users_df.write.format("orc") |
| .option("orc.bloom.filter.columns", "favorite_color") |
| .option("orc.dictionary.key.threshold", "1.0") |
| .option("orc.column.encoding.direct", "name") |
| .save("users_with_options.orc")) |
| # $example off:manual_save_options_orc$ |
| |
| # $example on:manual_save_options_parquet$ |
| users_df = spark.read.parquet("examples/src/main/resources/users.parquet") |
| (users_df.write.format("parquet") |
| .option("parquet.bloom.filter.enabled#favorite_color", "true") |
| .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000") |
| .option("parquet.enable.dictionary", "true") |
| .option("parquet.page.write-checksum.enabled", "false") |
| .save("users_with_options.parquet")) |
| # $example off:manual_save_options_parquet$ |
| |
| # $example on:write_sorting_and_bucketing$ |
| people_df = spark.read.json("examples/src/main/resources/people.json") |
| people_df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") |
| # $example off:write_sorting_and_bucketing$ |
| |
| # $example on:direct_sql$ |
| df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") |
| # $example off:direct_sql$ |
| |
| spark.sql("DROP TABLE IF EXISTS people_bucketed") |
| spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed") |
| |
| |
| def parquet_example(spark: SparkSession) -> None: |
| # $example on:basic_parquet_example$ |
| peopleDF = spark.read.json("examples/src/main/resources/people.json") |
| |
| # DataFrames can be saved as Parquet files, maintaining the schema information. |
| peopleDF.write.parquet("people.parquet") |
| |
| # Read in the Parquet file created above. |
| # Parquet files are self-describing so the schema is preserved. |
| # The result of loading a parquet file is also a DataFrame. |
| parquetFile = spark.read.parquet("people.parquet") |
| |
| # Parquet files can also be used to create a temporary view and then used in SQL statements. |
| parquetFile.createOrReplaceTempView("parquetFile") |
| teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") |
| teenagers.show() |
| # +------+ |
| # | name| |
| # +------+ |
| # |Justin| |
| # +------+ |
| # $example off:basic_parquet_example$ |
| |
| |
| def parquet_schema_merging_example(spark: SparkSession) -> None: |
| # $example on:schema_merging$ |
| # spark is from the previous example. |
| # Create a simple DataFrame, stored into a partition directory |
| sc = spark.sparkContext |
| |
| squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)) |
| .map(lambda i: Row(single=i, double=i ** 2))) |
| squaresDF.write.parquet("data/test_table/key=1") |
| |
| # Create another DataFrame in a new partition directory, |
| # adding a new column and dropping an existing column |
| cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11)) |
| .map(lambda i: Row(single=i, triple=i ** 3))) |
| cubesDF.write.parquet("data/test_table/key=2") |
| |
| # Read the partitioned table |
| mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") |
| mergedDF.printSchema() |
| |
| # The final schema consists of all 3 columns in the Parquet files together |
| # with the partitioning column appeared in the partition directory paths. |
| # root |
| # |-- double: long (nullable = true) |
| # |-- single: long (nullable = true) |
| # |-- triple: long (nullable = true) |
| # |-- key: integer (nullable = true) |
| # $example off:schema_merging$ |
| |
| |
| def json_dataset_example(spark: SparkSession) -> None: |
| # $example on:json_dataset$ |
| # spark is from the previous example. |
| sc = spark.sparkContext |
| |
| # A JSON dataset is pointed to by path. |
| # The path can be either a single text file or a directory storing text files |
| path = "examples/src/main/resources/people.json" |
| peopleDF = spark.read.json(path) |
| |
| # The inferred schema can be visualized using the printSchema() method |
| peopleDF.printSchema() |
| # root |
| # |-- age: long (nullable = true) |
| # |-- name: string (nullable = true) |
| |
| # Creates a temporary view using the DataFrame |
| peopleDF.createOrReplaceTempView("people") |
| |
| # SQL statements can be run by using the sql methods provided by spark |
| teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") |
| teenagerNamesDF.show() |
| # +------+ |
| # | name| |
| # +------+ |
| # |Justin| |
| # +------+ |
| |
| # Alternatively, a DataFrame can be created for a JSON dataset represented by |
| # an RDD[String] storing one JSON object per string |
| jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] |
| otherPeopleRDD = sc.parallelize(jsonStrings) |
| otherPeople = spark.read.json(otherPeopleRDD) |
| otherPeople.show() |
| # +---------------+----+ |
| # | address|name| |
| # +---------------+----+ |
| # |[Columbus,Ohio]| Yin| |
| # +---------------+----+ |
| # $example off:json_dataset$ |
| |
| |
| def csv_dataset_example(spark: SparkSession) -> None: |
| # $example on:csv_dataset$ |
| # spark is from the previous example |
| sc = spark.sparkContext |
| |
| # A CSV dataset is pointed to by path. |
| # The path can be either a single CSV file or a directory of CSV files |
| path = "examples/src/main/resources/people.csv" |
| |
| df = spark.read.csv(path) |
| df.show() |
| # +------------------+ |
| # | _c0| |
| # +------------------+ |
| # | name;age;job| |
| # |Jorge;30;Developer| |
| # | Bob;32;Developer| |
| # +------------------+ |
| |
| # Read a csv with delimiter, the default delimiter is "," |
| df2 = spark.read.option("delimiter", ";").csv(path) |
| df2.show() |
| # +-----+---+---------+ |
| # | _c0|_c1| _c2| |
| # +-----+---+---------+ |
| # | name|age| job| |
| # |Jorge| 30|Developer| |
| # | Bob| 32|Developer| |
| # +-----+---+---------+ |
| |
| # Read a csv with delimiter and a header |
| df3 = spark.read.option("delimiter", ";").option("header", True).csv(path) |
| df3.show() |
| # +-----+---+---------+ |
| # | name|age| job| |
| # +-----+---+---------+ |
| # |Jorge| 30|Developer| |
| # | Bob| 32|Developer| |
| # +-----+---+---------+ |
| |
| # You can also use options() to use multiple options |
| df4 = spark.read.options(delimiter=";", header=True).csv(path) |
| |
| # "output" is a folder which contains multiple csv files and a _SUCCESS file. |
| df3.write.csv("output") |
| |
| # Read all files in a folder, please make sure only CSV files should present in the folder. |
| folderPath = "examples/src/main/resources" |
| df5 = spark.read.csv(folderPath) |
| df5.show() |
| # Wrong schema because non-CSV files are read |
| # +-----------+ |
| # | _c0| |
| # +-----------+ |
| # |238val_238| |
| # | 86val_86| |
| # |311val_311| |
| # | 27val_27| |
| # |165val_165| |
| # +-----------+ |
| |
| # $example off:csv_dataset$ |
| |
| |
| def text_dataset_example(spark: SparkSession) -> None: |
| # $example on:text_dataset$ |
| # spark is from the previous example |
| sc = spark.sparkContext |
| |
| # A text dataset is pointed to by path. |
| # The path can be either a single text file or a directory of text files |
| path = "examples/src/main/resources/people.txt" |
| |
| df1 = spark.read.text(path) |
| df1.show() |
| # +-----------+ |
| # | value| |
| # +-----------+ |
| # |Michael, 29| |
| # | Andy, 30| |
| # | Justin, 19| |
| # +-----------+ |
| |
| # You can use 'lineSep' option to define the line separator. |
| # The line separator handles all `\r`, `\r\n` and `\n` by default. |
| df2 = spark.read.text(path, lineSep=",") |
| df2.show() |
| # +-----------+ |
| # | value| |
| # +-----------+ |
| # | Michael| |
| # | 29\nAndy| |
| # | 30\nJustin| |
| # | 19\n| |
| # +-----------+ |
| |
| # You can also use 'wholetext' option to read each input file as a single row. |
| df3 = spark.read.text(path, wholetext=True) |
| df3.show() |
| # +--------------------+ |
| # | value| |
| # +--------------------+ |
| # |Michael, 29\nAndy...| |
| # +--------------------+ |
| |
| # "output" is a folder which contains multiple text files and a _SUCCESS file. |
| df1.write.csv("output") |
| |
| # You can specify the compression format using the 'compression' option. |
| df1.write.text("output_compressed", compression="gzip") |
| |
| # $example off:text_dataset$ |
| |
| |
| def jdbc_dataset_example(spark: SparkSession) -> None: |
| # $example on:jdbc_dataset$ |
| # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods |
| # Loading data from a JDBC source |
| jdbcDF = spark.read \ |
| .format("jdbc") \ |
| .option("url", "jdbc:postgresql:dbserver") \ |
| .option("dbtable", "schema.tablename") \ |
| .option("user", "username") \ |
| .option("password", "password") \ |
| .load() |
| |
| jdbcDF2 = spark.read \ |
| .jdbc("jdbc:postgresql:dbserver", "schema.tablename", |
| properties={"user": "username", "password": "password"}) |
| |
| # Specifying dataframe column data types on read |
| jdbcDF3 = spark.read \ |
| .format("jdbc") \ |
| .option("url", "jdbc:postgresql:dbserver") \ |
| .option("dbtable", "schema.tablename") \ |
| .option("user", "username") \ |
| .option("password", "password") \ |
| .option("customSchema", "id DECIMAL(38, 0), name STRING") \ |
| .load() |
| |
| # Saving data to a JDBC source |
| jdbcDF.write \ |
| .format("jdbc") \ |
| .option("url", "jdbc:postgresql:dbserver") \ |
| .option("dbtable", "schema.tablename") \ |
| .option("user", "username") \ |
| .option("password", "password") \ |
| .save() |
| |
| jdbcDF2.write \ |
| .jdbc("jdbc:postgresql:dbserver", "schema.tablename", |
| properties={"user": "username", "password": "password"}) |
| |
| # Specifying create table column data types on write |
| jdbcDF.write \ |
| .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ |
| .jdbc("jdbc:postgresql:dbserver", "schema.tablename", |
| properties={"user": "username", "password": "password"}) |
| # $example off:jdbc_dataset$ |
| |
| |
| def xml_dataset_example(spark: SparkSession) -> None: |
| # $example on:xml_dataset$ |
| # Primitive types (Int, String, etc) and Product types (case classes) encoders are |
| # supported by importing this when creating a Dataset. |
| # An XML dataset is pointed to by path. |
| # The path can be either a single xml file or more xml files |
| path = "examples/src/main/resources/people.xml" |
| peopleDF = spark.read.option("rowTag", "person").format("xml").load(path) |
| |
| # The inferred schema can be visualized using the printSchema() method |
| peopleDF.printSchema() |
| # root |
| # |-- age: long (nullable = true) |
| # |-- name: string (nullable = true) |
| |
| # Creates a temporary view using the DataFrame |
| peopleDF.createOrReplaceTempView("people") |
| |
| # SQL statements can be run by using the sql methods provided by spark |
| teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") |
| teenagerNamesDF.show() |
| # +------+ |
| # | name| |
| # +------+ |
| # |Justin| |
| # +------+ |
| |
| # Alternatively, a DataFrame can be created for an XML dataset represented by a Dataset[String] |
| xmlStrings = [""" |
| <person> |
| <name>laglangyue</name> |
| <job>Developer</job> |
| <age>28</age> |
| </person> |
| """] |
| xmlRDD = spark.sparkContext.parallelize(xmlStrings) |
| otherPeople = spark.read \ |
| .option("rowTag", "person") \ |
| .xml(xmlRDD) |
| otherPeople.show() |
| # +---+---------+----------+ |
| # |age| job| name| |
| # +---+---------+----------+ |
| # | 28|Developer|laglangyue| |
| # +---+---------+----------+ |
| # $example off:xml_dataset$ |
| |
| |
| if __name__ == "__main__": |
| spark = SparkSession \ |
| .builder \ |
| .appName("Python Spark SQL data source example") \ |
| .getOrCreate() |
| |
| basic_datasource_example(spark) |
| generic_file_source_options_example(spark) |
| parquet_example(spark) |
| parquet_schema_merging_example(spark) |
| json_dataset_example(spark) |
| csv_dataset_example(spark) |
| text_dataset_example(spark) |
| jdbc_dataset_example(spark) |
| xml_dataset_example(spark) |
| |
| spark.stop() |