| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| A simple example demonstrating basic Spark SQL features. |
| Run with: |
| ./bin/spark-submit examples/src/main/python/sql/basic.py |
| """ |
| # $example on:init_session$ |
| from pyspark.sql import SparkSession |
| # $example off:init_session$ |
| |
| # $example on:schema_inferring$ |
| from pyspark.sql import Row |
| # $example off:schema_inferring$ |
| |
| # $example on:programmatic_schema$ |
| # Import data types |
| from pyspark.sql.types import StringType, StructType, StructField |
| # $example off:programmatic_schema$ |
| |
| |
| def basic_df_example(spark: SparkSession) -> None: |
| # $example on:create_df$ |
| # spark is an existing SparkSession |
| df = spark.read.json("examples/src/main/resources/people.json") |
| # Displays the content of the DataFrame to stdout |
| df.show() |
| # +----+-------+ |
| # | age| name| |
| # +----+-------+ |
| # |null|Michael| |
| # | 30| Andy| |
| # | 19| Justin| |
| # +----+-------+ |
| # $example off:create_df$ |
| |
| # $example on:untyped_ops$ |
| # spark, df are from the previous example |
| # Print the schema in a tree format |
| df.printSchema() |
| # root |
| # |-- age: long (nullable = true) |
| # |-- name: string (nullable = true) |
| |
| # Select only the "name" column |
| df.select("name").show() |
| # +-------+ |
| # | name| |
| # +-------+ |
| # |Michael| |
| # | Andy| |
| # | Justin| |
| # +-------+ |
| |
| # Select everybody, but increment the age by 1 |
| df.select(df['name'], df['age'] + 1).show() |
| # +-------+---------+ |
| # | name|(age + 1)| |
| # +-------+---------+ |
| # |Michael| null| |
| # | Andy| 31| |
| # | Justin| 20| |
| # +-------+---------+ |
| |
| # Select people older than 21 |
| df.filter(df['age'] > 21).show() |
| # +---+----+ |
| # |age|name| |
| # +---+----+ |
| # | 30|Andy| |
| # +---+----+ |
| |
| # Count people by age |
| df.groupBy("age").count().show() |
| # +----+-----+ |
| # | age|count| |
| # +----+-----+ |
| # | 19| 1| |
| # |null| 1| |
| # | 30| 1| |
| # +----+-----+ |
| # $example off:untyped_ops$ |
| |
| # $example on:run_sql$ |
| # Register the DataFrame as a SQL temporary view |
| df.createOrReplaceTempView("people") |
| |
| sqlDF = spark.sql("SELECT * FROM people") |
| sqlDF.show() |
| # +----+-------+ |
| # | age| name| |
| # +----+-------+ |
| # |null|Michael| |
| # | 30| Andy| |
| # | 19| Justin| |
| # +----+-------+ |
| # $example off:run_sql$ |
| |
| # $example on:global_temp_view$ |
| # Register the DataFrame as a global temporary view |
| df.createGlobalTempView("people") |
| |
| # Global temporary view is tied to a system preserved database `global_temp` |
| spark.sql("SELECT * FROM global_temp.people").show() |
| # +----+-------+ |
| # | age| name| |
| # +----+-------+ |
| # |null|Michael| |
| # | 30| Andy| |
| # | 19| Justin| |
| # +----+-------+ |
| |
| # Global temporary view is cross-session |
| spark.newSession().sql("SELECT * FROM global_temp.people").show() |
| # +----+-------+ |
| # | age| name| |
| # +----+-------+ |
| # |null|Michael| |
| # | 30| Andy| |
| # | 19| Justin| |
| # +----+-------+ |
| # $example off:global_temp_view$ |
| |
| |
| def schema_inference_example(spark: SparkSession) -> None: |
| # $example on:schema_inferring$ |
| sc = spark.sparkContext |
| |
| # Load a text file and convert each line to a Row. |
| lines = sc.textFile("examples/src/main/resources/people.txt") |
| parts = lines.map(lambda l: l.split(",")) |
| people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) |
| |
| # Infer the schema, and register the DataFrame as a table. |
| schemaPeople = spark.createDataFrame(people) |
| schemaPeople.createOrReplaceTempView("people") |
| |
| # SQL can be run over DataFrames that have been registered as a table. |
| teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") |
| |
| # The results of SQL queries are Dataframe objects. |
| # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. |
| teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() |
| for name in teenNames: |
| print(name) |
| # Name: Justin |
| # $example off:schema_inferring$ |
| |
| |
| def programmatic_schema_example(spark: SparkSession) -> None: |
| # $example on:programmatic_schema$ |
| sc = spark.sparkContext |
| |
| # Load a text file and convert each line to a Row. |
| lines = sc.textFile("examples/src/main/resources/people.txt") |
| parts = lines.map(lambda l: l.split(",")) |
| # Each line is converted to a tuple. |
| people = parts.map(lambda p: (p[0], p[1].strip())) |
| |
| # The schema is encoded in a string. |
| schemaString = "name age" |
| |
| fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] |
| schema = StructType(fields) |
| |
| # Apply the schema to the RDD. |
| schemaPeople = spark.createDataFrame(people, schema) |
| |
| # Creates a temporary view using the DataFrame |
| schemaPeople.createOrReplaceTempView("people") |
| |
| # SQL can be run over DataFrames that have been registered as a table. |
| results = spark.sql("SELECT name FROM people") |
| |
| results.show() |
| # +-------+ |
| # | name| |
| # +-------+ |
| # |Michael| |
| # | Andy| |
| # | Justin| |
| # +-------+ |
| # $example off:programmatic_schema$ |
| |
| |
| if __name__ == "__main__": |
| # $example on:init_session$ |
| spark = SparkSession \ |
| .builder \ |
| .appName("Python Spark SQL basic example") \ |
| .config("spark.some.config.option", "some-value") \ |
| .getOrCreate() |
| # $example off:init_session$ |
| |
| basic_df_example(spark) |
| schema_inference_example(spark) |
| programmatic_schema_example(spark) |
| |
| spark.stop() |