| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Quickstart: DataFrame\n", |
| "\n", |
| "This is a short introduction and quickstart for the PySpark DataFrame API. PySpark DataFrames are lazily evaluated. They are implemented on top of [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)s. When Spark [transforms](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) data, it does not immediately compute the transformation but plans how to compute later. When [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) such as `collect()` are explicitly called, the computation starts.\n", |
| "This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself in 'Live Notebook: DataFrame' at [the quickstart page](https://spark.apache.org/docs/latest/api/python/getting_started/index.html).\n", |
| "\n", |
| "There is also other useful information in Apache Spark documentation site, see the latest version of [Spark SQL and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html), [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html), [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) and [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n", |
| "\n", |
| "PySpark applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via <code>pyspark</code> executable, the shell automatically creates the session in the variable <code>spark</code> for users." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 1, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from pyspark.sql import SparkSession\n", |
| "\n", |
| "spark = SparkSession.builder.getOrCreate()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## DataFrame Creation\n", |
| "\n", |
| "A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and an RDD consisting of such a list.\n", |
| "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.\n", |
| "\n", |
| "Firstly, you can create a PySpark DataFrame from a list of rows" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 2, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" |
| ] |
| }, |
| "execution_count": 2, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "from datetime import datetime, date\n", |
| "import pandas as pd\n", |
| "from pyspark.sql import Row\n", |
| "\n", |
| "df = spark.createDataFrame([\n", |
| " Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),\n", |
| " Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),\n", |
| " Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))\n", |
| "])\n", |
| "df" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Create a PySpark DataFrame with an explicit schema." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 3, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" |
| ] |
| }, |
| "execution_count": 3, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df = spark.createDataFrame([\n", |
| " (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n", |
| " (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n", |
| " (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n", |
| "], schema='a long, b double, c string, d date, e timestamp')\n", |
| "df" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Create a PySpark DataFrame from a pandas DataFrame" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" |
| ] |
| }, |
| "execution_count": 4, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "pandas_df = pd.DataFrame({\n", |
| " 'a': [1, 2, 3],\n", |
| " 'b': [2., 3., 4.],\n", |
| " 'c': ['string1', 'string2', 'string3'],\n", |
| " 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],\n", |
| " 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]\n", |
| "})\n", |
| "df = spark.createDataFrame(pandas_df)\n", |
| "df" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "The DataFrames created above all have the same results and schema." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+---+-------+----------+-------------------+\n", |
| "| a| b| c| d| e|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", |
| "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n", |
| "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "\n", |
| "root\n", |
| " |-- a: long (nullable = true)\n", |
| " |-- b: double (nullable = true)\n", |
| " |-- c: string (nullable = true)\n", |
| " |-- d: date (nullable = true)\n", |
| " |-- e: timestamp (nullable = true)\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "# All DataFrames above result same.\n", |
| "df.show()\n", |
| "df.printSchema()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Viewing Data\n", |
| "\n", |
| "The top rows of a DataFrame can be displayed using `DataFrame.show()`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 7, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+---+-------+----------+-------------------+\n", |
| "| a| b| c| d| e|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "only showing top 1 row\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show(1)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 8, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/html": [ |
| "<table border='1'>\n", |
| "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n", |
| "<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01 12:00:00</td></tr>\n", |
| "<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02 12:00:00</td></tr>\n", |
| "<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03 12:00:00</td></tr>\n", |
| "</table>\n" |
| ], |
| "text/plain": [ |
| "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" |
| ] |
| }, |
| "execution_count": 8, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n", |
| "df" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "The rows can also be shown vertically. This is useful when rows are too long to show horizontally." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "-RECORD 0------------------\n", |
| " a | 1 \n", |
| " b | 2.0 \n", |
| " c | string1 \n", |
| " d | 2000-01-01 \n", |
| " e | 2000-01-01 12:00:00 \n", |
| "only showing top 1 row\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.show(1, vertical=True)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "You can see the DataFrame's schema and column names as follows:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 10, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "['a', 'b', 'c', 'd', 'e']" |
| ] |
| }, |
| "execution_count": 10, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df.columns" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 11, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "root\n", |
| " |-- a: long (nullable = true)\n", |
| " |-- b: double (nullable = true)\n", |
| " |-- c: string (nullable = true)\n", |
| " |-- d: date (nullable = true)\n", |
| " |-- e: timestamp (nullable = true)\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.printSchema()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Show the summary of the DataFrame" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 12, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-------+---+---+-------+\n", |
| "|summary| a| b| c|\n", |
| "+-------+---+---+-------+\n", |
| "| count| 3| 3| 3|\n", |
| "| mean|2.0|3.0| null|\n", |
| "| stddev|1.0|1.0| null|\n", |
| "| min| 1|2.0|string1|\n", |
| "| max| 3|4.0|string3|\n", |
| "+-------+---+---+-------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.select(\"a\", \"b\", \"c\").describe().show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 13, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),\n", |
| " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),\n", |
| " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]" |
| ] |
| }, |
| "execution_count": 13, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df.collect()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 14, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]" |
| ] |
| }, |
| "execution_count": 14, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df.take(1)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "PySpark DataFrame also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas API. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 15, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/html": [ |
| "<div>\n", |
| "<style scoped>\n", |
| " .dataframe tbody tr th:only-of-type {\n", |
| " vertical-align: middle;\n", |
| " }\n", |
| "\n", |
| " .dataframe tbody tr th {\n", |
| " vertical-align: top;\n", |
| " }\n", |
| "\n", |
| " .dataframe thead th {\n", |
| " text-align: right;\n", |
| " }\n", |
| "</style>\n", |
| "<table border=\"1\" class=\"dataframe\">\n", |
| " <thead>\n", |
| " <tr style=\"text-align: right;\">\n", |
| " <th></th>\n", |
| " <th>a</th>\n", |
| " <th>b</th>\n", |
| " <th>c</th>\n", |
| " <th>d</th>\n", |
| " <th>e</th>\n", |
| " </tr>\n", |
| " </thead>\n", |
| " <tbody>\n", |
| " <tr>\n", |
| " <th>0</th>\n", |
| " <td>1</td>\n", |
| " <td>2.0</td>\n", |
| " <td>string1</td>\n", |
| " <td>2000-01-01</td>\n", |
| " <td>2000-01-01 12:00:00</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>1</th>\n", |
| " <td>2</td>\n", |
| " <td>3.0</td>\n", |
| " <td>string2</td>\n", |
| " <td>2000-02-01</td>\n", |
| " <td>2000-01-02 12:00:00</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>2</th>\n", |
| " <td>3</td>\n", |
| " <td>4.0</td>\n", |
| " <td>string3</td>\n", |
| " <td>2000-03-01</td>\n", |
| " <td>2000-01-03 12:00:00</td>\n", |
| " </tr>\n", |
| " </tbody>\n", |
| "</table>\n", |
| "</div>" |
| ], |
| "text/plain": [ |
| " a b c d e\n", |
| "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n", |
| "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n", |
| "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00" |
| ] |
| }, |
| "execution_count": 15, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df.toPandas()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Selecting and Accessing Data\n", |
| "\n", |
| "PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 16, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "Column<b'a'>" |
| ] |
| }, |
| "execution_count": 16, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df.a" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "In fact, most of column-wise operations return `Column`s." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 17, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "True" |
| ] |
| }, |
| "execution_count": 17, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "from pyspark.sql import Column\n", |
| "from pyspark.sql.functions import upper\n", |
| "\n", |
| "type(df.c) == type(upper(df.c)) == type(df.c.isNull())" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "These `Column`s can be used to select the columns from a DataFrame. For example, `DataFrame.select()` takes the `Column` instances that returns another DataFrame." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 18, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-------+\n", |
| "| c|\n", |
| "+-------+\n", |
| "|string1|\n", |
| "|string2|\n", |
| "|string3|\n", |
| "+-------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.select(df.c).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Assign new `Column` instance." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 19, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+---+-------+----------+-------------------+-------+\n", |
| "| a| b| c| d| e|upper_c|\n", |
| "+---+---+-------+----------+-------------------+-------+\n", |
| "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n", |
| "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n", |
| "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n", |
| "+---+---+-------+----------+-------------------+-------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.withColumn('upper_c', upper(df.c)).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "To select a subset of rows, use `DataFrame.filter()`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 20, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+---+-------+----------+-------------------+\n", |
| "| a| b| c| d| e|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.filter(df.a == 1).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Applying a Function\n", |
| "\n", |
| "PySpark supports various UDFs and APIs to allow users to execute Python native functions. See also the latest [Pandas UDFs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs) and [Pandas Function APIs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-function-apis). For instance, the example below allows users to directly use the APIs in [a pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) within Python native function." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 21, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+------------------+\n", |
| "|pandas_plus_one(a)|\n", |
| "+------------------+\n", |
| "| 2|\n", |
| "| 3|\n", |
| "| 4|\n", |
| "+------------------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "import pandas as pd\n", |
| "from pyspark.sql.functions import pandas_udf\n", |
| "\n", |
| "@pandas_udf('long')\n", |
| "def pandas_plus_one(series: pd.Series) -> pd.Series:\n", |
| " # Simply plus one by using pandas Series.\n", |
| " return series + 1\n", |
| "\n", |
| "df.select(pandas_plus_one(df.a)).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Another example is `DataFrame.mapInPandas` which allows users directly use the APIs in a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) without any restrictions such as the result length." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 22, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+---+---+-------+----------+-------------------+\n", |
| "| a| b| c| d| e|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", |
| "+---+---+-------+----------+-------------------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "def pandas_filter_func(iterator):\n", |
| " for pandas_df in iterator:\n", |
| " yield pandas_df[pandas_df.a == 1]\n", |
| "\n", |
| "df.mapInPandas(pandas_filter_func, schema=df.schema).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Grouping Data\n", |
| "\n", |
| "PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy.\n", |
| "It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 23, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----+------+---+---+\n", |
| "|color| fruit| v1| v2|\n", |
| "+-----+------+---+---+\n", |
| "| red|banana| 1| 10|\n", |
| "| blue|banana| 2| 20|\n", |
| "| red|carrot| 3| 30|\n", |
| "| blue| grape| 4| 40|\n", |
| "| red|carrot| 5| 50|\n", |
| "|black|carrot| 6| 60|\n", |
| "| red|banana| 7| 70|\n", |
| "| red| grape| 8| 80|\n", |
| "+-----+------+---+---+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df = spark.createDataFrame([\n", |
| " ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],\n", |
| " ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],\n", |
| " ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])\n", |
| "df.show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Grouping and then applying the `avg()` function to the resulting groups." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 24, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----+-------+-------+\n", |
| "|color|avg(v1)|avg(v2)|\n", |
| "+-----+-------+-------+\n", |
| "| red| 4.8| 48.0|\n", |
| "|black| 6.0| 60.0|\n", |
| "| blue| 3.0| 30.0|\n", |
| "+-----+-------+-------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.groupby('color').avg().show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "You can also apply a Python native function against each group by using pandas API." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 25, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----+------+---+---+\n", |
| "|color| fruit| v1| v2|\n", |
| "+-----+------+---+---+\n", |
| "| red|banana| -3| 10|\n", |
| "| red|carrot| -1| 30|\n", |
| "| red|carrot| 0| 50|\n", |
| "| red|banana| 2| 70|\n", |
| "| red| grape| 3| 80|\n", |
| "|black|carrot| 0| 60|\n", |
| "| blue|banana| -1| 20|\n", |
| "| blue| grape| 1| 40|\n", |
| "+-----+------+---+---+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "def plus_mean(pandas_df):\n", |
| " return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n", |
| "\n", |
| "df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Co-grouping and applying a function." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 26, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+--------+---+---+---+\n", |
| "| time| id| v1| v2|\n", |
| "+--------+---+---+---+\n", |
| "|20000101| 1|1.0| x|\n", |
| "|20000102| 1|3.0| x|\n", |
| "|20000101| 2|2.0| y|\n", |
| "|20000102| 2|4.0| y|\n", |
| "+--------+---+---+---+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df1 = spark.createDataFrame(\n", |
| " [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],\n", |
| " ('time', 'id', 'v1'))\n", |
| "\n", |
| "df2 = spark.createDataFrame(\n", |
| " [(20000101, 1, 'x'), (20000101, 2, 'y')],\n", |
| " ('time', 'id', 'v2'))\n", |
| "\n", |
| "def merge_ordered(l, r):\n", |
| " return pd.merge_ordered(l, r)\n", |
| "\n", |
| "df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n", |
| " merge_ordered, schema='time int, id int, v1 double, v2 string').show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Getting Data In/Out\n", |
| "\n", |
| "CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.\n", |
| "\n", |
| "There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in Apache Spark documentation." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### CSV" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 27, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----+------+---+---+\n", |
| "|color| fruit| v1| v2|\n", |
| "+-----+------+---+---+\n", |
| "| red|banana| 1| 10|\n", |
| "| blue|banana| 2| 20|\n", |
| "| red|carrot| 3| 30|\n", |
| "| blue| grape| 4| 40|\n", |
| "| red|carrot| 5| 50|\n", |
| "|black|carrot| 6| 60|\n", |
| "| red|banana| 7| 70|\n", |
| "| red| grape| 8| 80|\n", |
| "+-----+------+---+---+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.write.csv('foo.csv', header=True)\n", |
| "spark.read.csv('foo.csv', header=True).show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### Parquet" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 28, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----+------+---+---+\n", |
| "|color| fruit| v1| v2|\n", |
| "+-----+------+---+---+\n", |
| "| red|banana| 1| 10|\n", |
| "| blue|banana| 2| 20|\n", |
| "| red|carrot| 3| 30|\n", |
| "| blue| grape| 4| 40|\n", |
| "| red|carrot| 5| 50|\n", |
| "|black|carrot| 6| 60|\n", |
| "| red|banana| 7| 70|\n", |
| "| red| grape| 8| 80|\n", |
| "+-----+------+---+---+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.write.parquet('bar.parquet')\n", |
| "spark.read.parquet('bar.parquet').show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### ORC" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 29, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----+------+---+---+\n", |
| "|color| fruit| v1| v2|\n", |
| "+-----+------+---+---+\n", |
| "| red|banana| 1| 10|\n", |
| "| blue|banana| 2| 20|\n", |
| "| red|carrot| 3| 30|\n", |
| "| blue| grape| 4| 40|\n", |
| "| red|carrot| 5| 50|\n", |
| "|black|carrot| 6| 60|\n", |
| "| red|banana| 7| 70|\n", |
| "| red| grape| 8| 80|\n", |
| "+-----+------+---+---+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.write.orc('zoo.orc')\n", |
| "spark.read.orc('zoo.orc').show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Working with SQL\n", |
| "\n", |
| "DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 30, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+--------+\n", |
| "|count(1)|\n", |
| "+--------+\n", |
| "| 8|\n", |
| "+--------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df.createOrReplaceTempView(\"tableA\")\n", |
| "spark.sql(\"SELECT count(*) from tableA\").show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "In addition, UDFs can be registered and invoked in SQL out of the box:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 31, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----------+\n", |
| "|add_one(v1)|\n", |
| "+-----------+\n", |
| "| 2|\n", |
| "| 3|\n", |
| "| 4|\n", |
| "| 5|\n", |
| "| 6|\n", |
| "| 7|\n", |
| "| 8|\n", |
| "| 9|\n", |
| "+-----------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "@pandas_udf(\"integer\")\n", |
| "def add_one(s: pd.Series) -> pd.Series:\n", |
| " return s + 1\n", |
| "\n", |
| "spark.udf.register(\"add_one\", add_one)\n", |
| "spark.sql(\"SELECT add_one(v1) FROM tableA\").show()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "These SQL expressions can directly be mixed and used as PySpark columns." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 32, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "+-----------+\n", |
| "|add_one(v1)|\n", |
| "+-----------+\n", |
| "| 2|\n", |
| "| 3|\n", |
| "| 4|\n", |
| "| 5|\n", |
| "| 6|\n", |
| "| 7|\n", |
| "| 8|\n", |
| "| 9|\n", |
| "+-----------+\n", |
| "\n", |
| "+--------------+\n", |
| "|(count(1) > 0)|\n", |
| "+--------------+\n", |
| "| true|\n", |
| "+--------------+\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "from pyspark.sql.functions import expr\n", |
| "\n", |
| "df.selectExpr('add_one(v1)').show()\n", |
| "df.select(expr('count(*)') > 0).show()" |
| ] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 3", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.7.10" |
| }, |
| "name": "quickstart", |
| "notebookId": 1927513300154480 |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 1 |
| } |