| { |
| "cells": [ |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "0b37fb0a", |
| "metadata": {}, |
| "source": [ |
| "# Chapter 1: DataFrames - A view into your structured data" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 1, |
| "id": "edfe9cd6", |
| "metadata": { |
| "tags": [ |
| "remove-cell" |
| ] |
| }, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Requirement already satisfied: pyspark in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (3.5.0)\n", |
| "Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (from pyspark) (0.10.9.7)\n", |
| "Note: you may need to restart the kernel to use updated packages.\n" |
| ] |
| } |
| ], |
| "source": [ |
| "pip install pyspark" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 2, |
| "id": "43b0e61f", |
| "metadata": { |
| "tags": [ |
| "remove-cell" |
| ] |
| }, |
| "outputs": [ |
| ], |
| "source": [ |
| "from pyspark.sql import SparkSession\n", |
| "\n", |
| "spark = SparkSession \\\n", |
| " .builder \\\n", |
| " .appName(\"Python Spark SQL basic example\") \\\n", |
| " .config(\"spark.some.config.option\", \"some-value\") \\\n", |
| " .getOrCreate()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "ae367125", |
| "metadata": {}, |
| "source": [ |
| "This section introduces the most fundamental data structure in PySpark: the DataFrame.\n", |
| "\n", |
| "A DataFrame is a two-dimensional labeled data structure with columns \n", |
| "of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. \n", |
| "Apache Spark DataFrames support a rich set of APIs (select columns, filter, join, aggregate, etc.) \n", |
| "that allow you to solve common data analysis problems efficiently.\n", |
| "\n", |
| "Compared to traditional relational databases, Spark DataFrames offer several key advantages for big data processing and analytics:\n", |
| "\n", |
| "- **Distributed computing**: Spark distributes data across multiple nodes in a cluster, allowing for parallel processing of big data\n", |
| "- **In-memory processing**: Spark performs computations in memory, which can be significantly faster than disk-based processing\n", |
| "- **Schema flexibility**: Unlike traditional databases, PySpark DataFrames support schema evolution and dynamic typing.\n", |
| "- **Fault tolerance**: PySpark DataFrames are built on top of Resilient Distributed Dataset (RDDs), which are inherently fault-tolerant. \n", |
| "Spark automatically handles node failures and data replication, ensuring data reliability and integrity.\n", |
| "\n", |
| "A note on RDDs: \n", |
| "Direct use of RDDs are no longer supported on Spark Connect as of Spark 4.0.\n", |
| "Interacting directly with Spark DataFrames uses a unified planning and optimization engine, \n", |
| "allowing us to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R)." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "443ebbb1", |
| "metadata": {}, |
| "source": [ |
| "## Create a DataFrame\n", |
| "\n", |
| "There are several ways to create a DataFrame in PySpark." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "33ebd507", |
| "metadata": {}, |
| "source": [ |
| "### From a list of dictionaries\n", |
| "\n", |
| "The simplest way is to use the createDataFrame() method like so:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 3, |
| "id": "b26403e5", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stderr", |
| "output_type": "stream", |
| "text": [ |
| " \r" |
| ] |
| }, |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+--------+\n", |
| "|age| name|\n", |
| "+---+--------+\n", |
| "| 30| John D.|\n", |
| "| 25|Alice G.|\n", |
| "| 35| Bob T.|\n", |
| "| 28| Eve A.|\n", |
| "+---+--------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "employees = [{\"name\": \"John D.\", \"age\": 30},\n", |
| " {\"name\": \"Alice G.\", \"age\": 25},\n", |
| " {\"name\": \"Bob T.\", \"age\": 35},\n", |
| " {\"name\": \"Eve A.\", \"age\": 28}]\n", |
| "\n", |
| "# Create a DataFrame containing the employees data\n", |
| "df = spark.createDataFrame(employees)\n", |
| "df.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "fe7cd086", |
| "metadata": {}, |
| "source": [ |
| "### From a local file\n", |
| "\n", |
| "We can also create a DataFrame from a local CSV file:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "id": "b421b87d", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----------+-----------------+-----------------+\n", |
| "|Employee ID| Role| Location|\n", |
| "+-----------+-----------------+-----------------+\n", |
| "| 19238| Data Analyst| Seattle, WA|\n", |
| "| 19239|Software Engineer| Seattle, WA|\n", |
| "| 19240| IT Specialist| Seattle, WA|\n", |
| "| 19241| Data Analyst| New York, NY|\n", |
| "| 19242| Recruiter|San Francisco, CA|\n", |
| "| 19243| Product Manager| New York, NY|\n", |
| "+-----------+-----------------+-----------------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df = spark.read.csv(\"../data/employees.csv\", header=True, inferSchema=True)\n", |
| "df.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "9e8a5246", |
| "metadata": {}, |
| "source": [ |
| "Or from a local JSON file:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 5, |
| "id": "4a2d7fe9", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----------+-----------------+-----------------+\n", |
| "|Employee ID| Location| Role|\n", |
| "+-----------+-----------------+-----------------+\n", |
| "| 19238| Seattle, WA| Data Analyst|\n", |
| "| 19239| Seattle, WA|Software Engineer|\n", |
| "| 19240| Seattle, WA| IT Specialist|\n", |
| "| 19241| New York, NY| Data Analyst|\n", |
| "| 19242|San Francisco, CA| Recruiter|\n", |
| "| 19243| New York, NY| Product Manager|\n", |
| "+-----------+-----------------+-----------------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df = spark.read.option(\"multiline\",\"true\").json(\"../data/employees.json\")\n", |
| "df.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "da022789", |
| "metadata": {}, |
| "source": [ |
| "### From an existing DataFrame\n", |
| "\n", |
| "We can even create a DataFrame from another existing DataFrame, by selecting certain columns:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "id": "d494632d", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "employees = [\n", |
| " {\"name\": \"John D.\", \"age\": 30, \"department\": \"HR\"},\n", |
| " {\"name\": \"Alice G.\", \"age\": 25, \"department\": \"Finance\"},\n", |
| " {\"name\": \"Bob T.\", \"age\": 35, \"department\": \"IT\"},\n", |
| " {\"name\": \"Eve A.\", \"age\": 28, \"department\": \"Marketing\"}\n", |
| "]\n", |
| "df = spark.createDataFrame(employees)\n", |
| "\n", |
| "# Select only the name and age columns\n", |
| "new_df = df.select(\"name\", \"age\")" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "a330bfa9", |
| "metadata": {}, |
| "source": [ |
| "### From a table\n", |
| "\n", |
| "If you have an existing table `table_name` in your Spark environment, you can create a DataFrame like this:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 7, |
| "id": "14ad6034", |
| "metadata": { |
| "tags": [ |
| "remove-output" |
| ] |
| }, |
| "outputs": [ |
| ], |
| "source": [ |
| "df = spark.read.table(\"table_name\")" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "892871a6", |
| "metadata": {}, |
| "source": [ |
| "### From a database\n", |
| "\n", |
| "If your table is in a database, you can use JDBC to read the table into a DataFrame.\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "id": "a40a5d54", |
| "metadata": { |
| "tags": [ |
| "remove-output" |
| ] |
| }, |
| "outputs": [ |
| ], |
| "source": [ |
| "url = \"jdbc:mysql://localhost:3306/mydatabase\"\n", |
| "table = \"employees\"\n", |
| "properties = {\n", |
| " \"user\": \"username\",\n", |
| " \"password\": \"password\"\n", |
| "}\n", |
| "\n", |
| "# Read table into DataFrame\n", |
| "df = spark.read.jdbc(url=url, table=table, properties=properties)" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "91c58617", |
| "metadata": {}, |
| "source": [ |
| "## View the DataFrame\n", |
| "\n", |
| "We can use PySpark to view and interact with our DataFrame." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "25faacd7", |
| "metadata": {}, |
| "source": [ |
| "### Display the DataFrame\n", |
| "\n", |
| "`df.show()` displays a basic visualization of the DataFrame's contents. From our above `createDataFrame()` example:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 10, |
| "id": "6a91ef12", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "employees = [{\"name\": \"John D.\", \"age\": 30},\n", |
| " {\"name\": \"Alice G.\", \"age\": 25},\n", |
| " {\"name\": \"Bob T.\", \"age\": 35},\n", |
| " {\"name\": \"Eve A.\", \"age\": 28}]\n", |
| "\n", |
| "# Create a DataFrame containing the employees data\n", |
| "df = spark.createDataFrame(employees)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 11, |
| "id": "c2ce1c82", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+--------+\n", |
| "|age| name|\n", |
| "+---+--------+\n", |
| "| 30| John D.|\n", |
| "| 25|Alice G.|\n", |
| "| 35| Bob T.|\n", |
| "| 28| Eve A.|\n", |
| "+---+--------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "c6ee7c70", |
| "metadata": {}, |
| "source": [ |
| "`df.show()` has 3 optional arguments: `n`, `truncate`, and `vertical`.\n", |
| "\n", |
| "By default, `df.show()` displays up to the first 20 rows of the DataFrame. \n", |
| "We can control the number of rows displayed by passing an argument to the show() method:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 12, |
| "id": "01417e41", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+--------+\n", |
| "|age| name|\n", |
| "+---+--------+\n", |
| "| 30| John D.|\n", |
| "| 25|Alice G.|\n", |
| "+---+--------+\n", |
| "only showing top 2 rows\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show(n=2)" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "605cd26c", |
| "metadata": {}, |
| "source": [ |
| "The truncate argument controls the length of displayed column values (default value is 20):" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 13, |
| "id": "b01d5223", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+----+\n", |
| "|age|name|\n", |
| "+---+----+\n", |
| "| 30| Joh|\n", |
| "| 25| Ali|\n", |
| "| 35| Bob|\n", |
| "| 28| Eve|\n", |
| "+---+----+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show(truncate=3)" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "e9bedaa6", |
| "metadata": {}, |
| "source": [ |
| "If we set `vertical` to True, the DataFrame will be displayed vertically with one line per value:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 14, |
| "id": "267facfc", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "-RECORD 0--------\n", |
| " age | 30 \n", |
| " name | John D. \n", |
| "-RECORD 1--------\n", |
| " age | 25 \n", |
| " name | Alice G. \n", |
| "-RECORD 2--------\n", |
| " age | 35 \n", |
| " name | Bob T. \n", |
| "-RECORD 3--------\n", |
| " age | 28 \n", |
| " name | Eve A. \n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show(vertical=True)" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "4de10f68", |
| "metadata": {}, |
| "source": [ |
| "### Print the DataFrame schema\n", |
| "\n", |
| "We can view information about the DataFrame schema using the `printSchema()` method:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 15, |
| "id": "27481fa9", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "root\n", |
| " |-- age: long (nullable = true)\n", |
| " |-- name: string (nullable = true)\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.printSchema()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "8e90d081", |
| "metadata": {}, |
| "source": [ |
| "## DataFrame Manipulation\n", |
| "\n", |
| "Let's look at some ways we can transform our DataFrames.\n", |
| "\n", |
| "For more detailed information, please see the section about data manipulation, ", |
| "[Chapter 3: Function Junction - Data manipulation with PySpark](https://spark.apache.org/docs/latest/api/python/user_guide/dataprep.html).\n", |
| "\n", |
| "\n", |
| "### Rename columns\n", |
| "\n", |
| "We can rename DataFrame columns using the `withColumnRenamed()` method:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 16, |
| "id": "65d6dfcb", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+--------+\n", |
| "|age| name|\n", |
| "+---+--------+\n", |
| "| 30| John D.|\n", |
| "| 25|Alice G.|\n", |
| "| 35| Bob T.|\n", |
| "| 28| Eve A.|\n", |
| "+---+--------+\n", |
| "\n", |
| "+---+---------+\n", |
| "|age|full_name|\n", |
| "+---+---------+\n", |
| "| 30| John D.|\n", |
| "| 25| Alice G.|\n", |
| "| 35| Bob T.|\n", |
| "| 28| Eve A.|\n", |
| "+---+---------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show()\n", |
| "df2 = df.withColumnRenamed(\"name\", \"full_name\")\n", |
| "df2.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "10f3c03f", |
| "metadata": {}, |
| "source": [ |
| "### Filter rows\n", |
| "\n", |
| "We can filter for employees within a certain age range.\n", |
| "The following `df.filter` will create a new DataFrame with rows that match our age condition:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 17, |
| "id": "af133309", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+-------+\n", |
| "|age| name|\n", |
| "+---+-------+\n", |
| "| 30|John D.|\n", |
| "| 28| Eve A.|\n", |
| "+---+-------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "filtered_df = df.filter((df[\"age\"] > 26) & (df[\"age\"] < 32))\n", |
| "filtered_df.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "c49ea696", |
| "metadata": {}, |
| "source": [ |
| "We can also use `df.where` to get the same result:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 18, |
| "id": "a29a0719", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+-------+\n", |
| "|age| name|\n", |
| "+---+-------+\n", |
| "| 30|John D.|\n", |
| "| 28| Eve A.|\n", |
| "+---+-------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "where_df = df.where((df[\"age\"] > 26) & (df[\"age\"] < 32))\n", |
| "where_df.show()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "b6d1026a", |
| "metadata": {}, |
| "source": [ |
| "## DataFrames vs. Tables\n", |
| "A DataFrame is an immutable distributed collection of data, only available in the current Spark session.\n", |
| "\n", |
| "A table is a persistent data structure that can be accessed across multiple Spark sessions.\n", |
| "\n", |
| "If you wish to promote a DataFrame to a table, you can use the `createOrReplaceTempView()` method:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 19, |
| "id": "778ad9c5", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "df.createOrReplaceTempView(\"employees\")" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "0a448326", |
| "metadata": {}, |
| "source": [ |
| "Note that the lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.\n", |
| "To persist the table beyond this Spark session, you will need to save it to persistent storage." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "bd253f06", |
| "metadata": {}, |
| "source": [ |
| "## Save DataFrame to Persistent Storage\n", |
| "\n", |
| "There are several ways to save a DataFrame to persistent storage in PySpark.\n", |
| "For more detailed information about saving data locally, see ", |
| "[Chapter 7: Load and Behold - Data loading, storage, file formats](https://spark.apache.org/docs/latest/api/python/user_guide/loadandbehold.html).\n" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "b72663f5", |
| "metadata": {}, |
| "source": [ |
| "### Save to file-based data source\n", |
| "\n", |
| "For file-based data source (text, parquet, json, etc.), you can specify a custom table path like so: " |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 20, |
| "id": "714d52bd", |
| "metadata": { |
| "tags": [ |
| "remove-output" |
| ] |
| }, |
| "outputs": [], |
| "source": [ |
| "df.write.option(\"path\", \"../dataout\").saveAsTable(\"dataframes_savetable_example\")" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "fcc3fc81", |
| "metadata": {}, |
| "source": [ |
| "Even if the table is dropped, the custom table path and table data will still be there. \n", |
| "\n", |
| "If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. \n", |
| "When the table is dropped, the default table path will be removed too." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "c98e0afb", |
| "metadata": {}, |
| "source": [ |
| "### Save to Hive metastore\n", |
| "To save to Hive metastore, you can use the following:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 21, |
| "id": "00c35126", |
| "metadata": { |
| "tags": [ |
| "remove-output" |
| ] |
| }, |
| "outputs": [ |
| ], |
| "source": [ |
| "df.write().mode(\"overwrite\").saveAsTable(\"schemaName.tableName\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Native DataFrame Plotting\n", |
| "\n", |
| "PySpark supports native plotting, allowing users to visualize data directly from PySpark DataFrames.\n", |
| "\n", |
| "The user interacts with PySpark Plotting by calling the `plot` property on a PySpark DataFrame and specifying the desired type of plot, either as a submethod or by setting the `kind` parameter. For instance:\n", |
| "\n", |
| "`df.plot.line(x=\"category\", y=\"int_val\")`\n", |
| "\n", |
| "or equivalently:\n", |
| "\n", |
| "`df.plot(kind=\"line\", x=\"category\", y=\"int_val\")`\n", |
| "\n", |
| "The feature is powered by [Plotly](https://plotly.com/python/) as the default visualization backend, offering rich, interactive plotting capabilities, while native [pandas](https://pandas.pydata.org/) is used internally to process data for most plots.\n" |
| ] |
| } |
| ], |
| "metadata": { |
| "celltoolbar": "Tags", |
| "kernelspec": { |
| "display_name": "", |
| "language": "python", |
| "name": "" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.11.4" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 5 |
| } |
| |