|  | { | 
|  | "cells": [ | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "# Quickstart: DataFrame\n", | 
|  | "\n", | 
|  | "This is a short introduction and quickstart for the PySpark DataFrame API. PySpark DataFrames are lazily evaluated. They are implemented on top of [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)s. When Spark [transforms](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) data, it does not immediately compute the transformation but plans how to compute later. When [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) such as `collect()` are explicitly called, the computation starts.\n", | 
|  | "This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself in 'Live Notebook: DataFrame' at [the quickstart page](https://spark.apache.org/docs/latest/api/python/getting_started/index.html).\n", | 
|  | "\n", | 
|  | "There is also other useful information in Apache Spark documentation site, see the latest version of [Spark SQL and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html), [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html), [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) and [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n", | 
|  | "\n", | 
|  | "PySpark applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via <code>pyspark</code> executable, the shell automatically creates the session in the variable <code>spark</code> for users." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 1, | 
|  | "metadata": {}, | 
|  | "outputs": [], | 
|  | "source": [ | 
|  | "from pyspark.sql import SparkSession\n", | 
|  | "\n", | 
|  | "spark = SparkSession.builder.getOrCreate()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## DataFrame Creation\n", | 
|  | "\n", | 
|  | "A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and an RDD consisting of such a list.\n", | 
|  | "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.\n", | 
|  | "\n", | 
|  | "Firstly, you can create a PySpark DataFrame from a list of rows" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 2, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 2, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "from datetime import datetime, date\n", | 
|  | "import pandas as pd\n", | 
|  | "from pyspark.sql import Row\n", | 
|  | "\n", | 
|  | "df = spark.createDataFrame([\n", | 
|  | "    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),\n", | 
|  | "    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),\n", | 
|  | "    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))\n", | 
|  | "])\n", | 
|  | "df" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Create a PySpark DataFrame with an explicit schema." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 3, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 3, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df = spark.createDataFrame([\n", | 
|  | "    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n", | 
|  | "    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n", | 
|  | "    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n", | 
|  | "], schema='a long, b double, c string, d date, e timestamp')\n", | 
|  | "df" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Create a PySpark DataFrame from a pandas DataFrame" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 4, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 4, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "pandas_df = pd.DataFrame({\n", | 
|  | "    'a': [1, 2, 3],\n", | 
|  | "    'b': [2., 3., 4.],\n", | 
|  | "    'c': ['string1', 'string2', 'string3'],\n", | 
|  | "    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],\n", | 
|  | "    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]\n", | 
|  | "})\n", | 
|  | "df = spark.createDataFrame(pandas_df)\n", | 
|  | "df" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "The DataFrames created above all have the same results and schema." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 6, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  a|  b|      c|         d|                  e|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", | 
|  | "|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n", | 
|  | "|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "\n", | 
|  | "root\n", | 
|  | " |-- a: long (nullable = true)\n", | 
|  | " |-- b: double (nullable = true)\n", | 
|  | " |-- c: string (nullable = true)\n", | 
|  | " |-- d: date (nullable = true)\n", | 
|  | " |-- e: timestamp (nullable = true)\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "# All DataFrames above result same.\n", | 
|  | "df.show()\n", | 
|  | "df.printSchema()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## Viewing Data\n", | 
|  | "\n", | 
|  | "The top rows of a DataFrame can be displayed using `DataFrame.show()`." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 7, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  a|  b|      c|         d|                  e|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "only showing top 1 row\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.show(1)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 8, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/html": [ | 
|  | "<table border='1' style=\"table-layout: auto;margin-right: auto;margin-left: 0;\">\n", | 
|  | "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n", | 
|  | "<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01 12:00:00</td></tr>\n", | 
|  | "<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02 12:00:00</td></tr>\n", | 
|  | "<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03 12:00:00</td></tr>\n", | 
|  | "</table>\n" | 
|  | ], | 
|  | "text/plain": [ | 
|  | "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 8, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n", | 
|  | "df" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "The rows can also be shown vertically. This is useful when rows are too long to show horizontally." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 9, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "-RECORD 0------------------\n", | 
|  | " a   | 1                   \n", | 
|  | " b   | 2.0                 \n", | 
|  | " c   | string1             \n", | 
|  | " d   | 2000-01-01          \n", | 
|  | " e   | 2000-01-01 12:00:00 \n", | 
|  | "only showing top 1 row\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.show(1, vertical=True)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "You can see the DataFrame's schema and column names as follows:" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 10, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "['a', 'b', 'c', 'd', 'e']" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 10, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.columns" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 11, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "root\n", | 
|  | " |-- a: long (nullable = true)\n", | 
|  | " |-- b: double (nullable = true)\n", | 
|  | " |-- c: string (nullable = true)\n", | 
|  | " |-- d: date (nullable = true)\n", | 
|  | " |-- e: timestamp (nullable = true)\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.printSchema()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Show the summary of the DataFrame" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 12, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-------+---+---+-------+\n", | 
|  | "|summary|  a|  b|      c|\n", | 
|  | "+-------+---+---+-------+\n", | 
|  | "|  count|  3|  3|      3|\n", | 
|  | "|   mean|2.0|3.0|   null|\n", | 
|  | "| stddev|1.0|1.0|   null|\n", | 
|  | "|    min|  1|2.0|string1|\n", | 
|  | "|    max|  3|4.0|string3|\n", | 
|  | "+-------+---+---+-------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.select(\"a\", \"b\", \"c\").describe().show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 13, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),\n", | 
|  | " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),\n", | 
|  | " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 13, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.collect()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 14, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 14, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.take(1)" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "PySpark DataFrame also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas API. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 15, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/html": [ | 
|  | "<div>\n", | 
|  | "<style scoped>\n", | 
|  | "    .dataframe tbody tr th:only-of-type {\n", | 
|  | "        vertical-align: middle;\n", | 
|  | "    }\n", | 
|  | "\n", | 
|  | "    .dataframe tbody tr th {\n", | 
|  | "        vertical-align: top;\n", | 
|  | "    }\n", | 
|  | "\n", | 
|  | "    .dataframe thead th {\n", | 
|  | "        text-align: right;\n", | 
|  | "    }\n", | 
|  | "</style>\n", | 
|  | "<table border=\"1\" class=\"dataframe\" style=\"table-layout: auto;margin-right: auto;margin-left: 0;\">\n", | 
|  | "  <thead>\n", | 
|  | "    <tr style=\"text-align: right;\">\n", | 
|  | "      <th></th>\n", | 
|  | "      <th>a</th>\n", | 
|  | "      <th>b</th>\n", | 
|  | "      <th>c</th>\n", | 
|  | "      <th>d</th>\n", | 
|  | "      <th>e</th>\n", | 
|  | "    </tr>\n", | 
|  | "  </thead>\n", | 
|  | "  <tbody>\n", | 
|  | "    <tr>\n", | 
|  | "      <th>0</th>\n", | 
|  | "      <td>1</td>\n", | 
|  | "      <td>2.0</td>\n", | 
|  | "      <td>string1</td>\n", | 
|  | "      <td>2000-01-01</td>\n", | 
|  | "      <td>2000-01-01 12:00:00</td>\n", | 
|  | "    </tr>\n", | 
|  | "    <tr>\n", | 
|  | "      <th>1</th>\n", | 
|  | "      <td>2</td>\n", | 
|  | "      <td>3.0</td>\n", | 
|  | "      <td>string2</td>\n", | 
|  | "      <td>2000-02-01</td>\n", | 
|  | "      <td>2000-01-02 12:00:00</td>\n", | 
|  | "    </tr>\n", | 
|  | "    <tr>\n", | 
|  | "      <th>2</th>\n", | 
|  | "      <td>3</td>\n", | 
|  | "      <td>4.0</td>\n", | 
|  | "      <td>string3</td>\n", | 
|  | "      <td>2000-03-01</td>\n", | 
|  | "      <td>2000-01-03 12:00:00</td>\n", | 
|  | "    </tr>\n", | 
|  | "  </tbody>\n", | 
|  | "</table>\n", | 
|  | "</div>" | 
|  | ], | 
|  | "text/plain": [ | 
|  | "   a    b        c           d                   e\n", | 
|  | "0  1  2.0  string1  2000-01-01 2000-01-01 12:00:00\n", | 
|  | "1  2  3.0  string2  2000-02-01 2000-01-02 12:00:00\n", | 
|  | "2  3  4.0  string3  2000-03-01 2000-01-03 12:00:00" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 15, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.toPandas()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## Selecting and Accessing Data\n", | 
|  | "\n", | 
|  | "PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 16, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "Column<b'a'>" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 16, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.a" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "In fact, most of column-wise operations return `Column`s." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 17, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "data": { | 
|  | "text/plain": [ | 
|  | "True" | 
|  | ] | 
|  | }, | 
|  | "execution_count": 17, | 
|  | "metadata": {}, | 
|  | "output_type": "execute_result" | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "from pyspark.sql import Column\n", | 
|  | "from pyspark.sql.functions import upper\n", | 
|  | "\n", | 
|  | "type(df.c) == type(upper(df.c)) == type(df.c.isNull())" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "These `Column`s can be used to select the columns from a DataFrame. For example, `DataFrame.select()` takes the `Column` instances that returns another DataFrame." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 18, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-------+\n", | 
|  | "|      c|\n", | 
|  | "+-------+\n", | 
|  | "|string1|\n", | 
|  | "|string2|\n", | 
|  | "|string3|\n", | 
|  | "+-------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.select(df.c).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Assign new `Column` instance." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 19, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+---+---+-------+----------+-------------------+-------+\n", | 
|  | "|  a|  b|      c|         d|                  e|upper_c|\n", | 
|  | "+---+---+-------+----------+-------------------+-------+\n", | 
|  | "|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n", | 
|  | "|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n", | 
|  | "|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n", | 
|  | "+---+---+-------+----------+-------------------+-------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.withColumn('upper_c', upper(df.c)).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "To select a subset of rows, use `DataFrame.filter()`." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 20, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  a|  b|      c|         d|                  e|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.filter(df.a == 1).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## Applying a Function\n", | 
|  | "\n", | 
|  | "PySpark supports various UDFs and APIs to allow users to execute Python native functions. See also the latest [Pandas UDFs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs) and [Pandas Function APIs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-function-apis). For instance, the example below allows users to directly use the APIs in [a pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) within Python native function." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 21, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+------------------+\n", | 
|  | "|pandas_plus_one(a)|\n", | 
|  | "+------------------+\n", | 
|  | "|                 2|\n", | 
|  | "|                 3|\n", | 
|  | "|                 4|\n", | 
|  | "+------------------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "import pandas as pd\n", | 
|  | "from pyspark.sql.functions import pandas_udf\n", | 
|  | "\n", | 
|  | "@pandas_udf('long')\n", | 
|  | "def pandas_plus_one(series: pd.Series) -> pd.Series:\n", | 
|  | "    # Simply plus one by using pandas Series.\n", | 
|  | "    return series + 1\n", | 
|  | "\n", | 
|  | "df.select(pandas_plus_one(df.a)).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Another example is `DataFrame.mapInPandas` which allows users directly use the APIs in a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) without any restrictions such as the result length." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 22, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  a|  b|      c|         d|                  e|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", | 
|  | "+---+---+-------+----------+-------------------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "def pandas_filter_func(iterator):\n", | 
|  | "    for pandas_df in iterator:\n", | 
|  | "        yield pandas_df[pandas_df.a == 1]\n", | 
|  | "\n", | 
|  | "df.mapInPandas(pandas_filter_func, schema=df.schema).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## Grouping Data\n", | 
|  | "\n", | 
|  | "PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy.\n", | 
|  | "It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 23, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----+------+---+---+\n", | 
|  | "|color| fruit| v1| v2|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "|  red|banana|  1| 10|\n", | 
|  | "| blue|banana|  2| 20|\n", | 
|  | "|  red|carrot|  3| 30|\n", | 
|  | "| blue| grape|  4| 40|\n", | 
|  | "|  red|carrot|  5| 50|\n", | 
|  | "|black|carrot|  6| 60|\n", | 
|  | "|  red|banana|  7| 70|\n", | 
|  | "|  red| grape|  8| 80|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df = spark.createDataFrame([\n", | 
|  | "    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],\n", | 
|  | "    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],\n", | 
|  | "    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])\n", | 
|  | "df.show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Grouping and then applying the `avg()` function to the resulting groups." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 24, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----+-------+-------+\n", | 
|  | "|color|avg(v1)|avg(v2)|\n", | 
|  | "+-----+-------+-------+\n", | 
|  | "|  red|    4.8|   48.0|\n", | 
|  | "|black|    6.0|   60.0|\n", | 
|  | "| blue|    3.0|   30.0|\n", | 
|  | "+-----+-------+-------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.groupby('color').avg().show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "You can also apply a Python native function against each group by using pandas API." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 25, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----+------+---+---+\n", | 
|  | "|color| fruit| v1| v2|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "|  red|banana| -3| 10|\n", | 
|  | "|  red|carrot| -1| 30|\n", | 
|  | "|  red|carrot|  0| 50|\n", | 
|  | "|  red|banana|  2| 70|\n", | 
|  | "|  red| grape|  3| 80|\n", | 
|  | "|black|carrot|  0| 60|\n", | 
|  | "| blue|banana| -1| 20|\n", | 
|  | "| blue| grape|  1| 40|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "def plus_mean(pandas_df):\n", | 
|  | "    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n", | 
|  | "\n", | 
|  | "df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "Co-grouping and applying a function." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 26, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+--------+---+---+---+\n", | 
|  | "|    time| id| v1| v2|\n", | 
|  | "+--------+---+---+---+\n", | 
|  | "|20000101|  1|1.0|  x|\n", | 
|  | "|20000102|  1|3.0|  x|\n", | 
|  | "|20000101|  2|2.0|  y|\n", | 
|  | "|20000102|  2|4.0|  y|\n", | 
|  | "+--------+---+---+---+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df1 = spark.createDataFrame(\n", | 
|  | "    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],\n", | 
|  | "    ('time', 'id', 'v1'))\n", | 
|  | "\n", | 
|  | "df2 = spark.createDataFrame(\n", | 
|  | "    [(20000101, 1, 'x'), (20000101, 2, 'y')],\n", | 
|  | "    ('time', 'id', 'v2'))\n", | 
|  | "\n", | 
|  | "def merge_ordered(l, r):\n", | 
|  | "    return pd.merge_ordered(l, r)\n", | 
|  | "\n", | 
|  | "df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n", | 
|  | "    merge_ordered, schema='time int, id int, v1 double, v2 string').show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## Getting Data In/Out\n", | 
|  | "\n", | 
|  | "CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.\n", | 
|  | "\n", | 
|  | "There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in Apache Spark documentation." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "### CSV" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 27, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----+------+---+---+\n", | 
|  | "|color| fruit| v1| v2|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "|  red|banana|  1| 10|\n", | 
|  | "| blue|banana|  2| 20|\n", | 
|  | "|  red|carrot|  3| 30|\n", | 
|  | "| blue| grape|  4| 40|\n", | 
|  | "|  red|carrot|  5| 50|\n", | 
|  | "|black|carrot|  6| 60|\n", | 
|  | "|  red|banana|  7| 70|\n", | 
|  | "|  red| grape|  8| 80|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.write.csv('foo.csv', header=True)\n", | 
|  | "spark.read.csv('foo.csv', header=True).show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "### Parquet" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 28, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----+------+---+---+\n", | 
|  | "|color| fruit| v1| v2|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "|  red|banana|  1| 10|\n", | 
|  | "| blue|banana|  2| 20|\n", | 
|  | "|  red|carrot|  3| 30|\n", | 
|  | "| blue| grape|  4| 40|\n", | 
|  | "|  red|carrot|  5| 50|\n", | 
|  | "|black|carrot|  6| 60|\n", | 
|  | "|  red|banana|  7| 70|\n", | 
|  | "|  red| grape|  8| 80|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.write.parquet('bar.parquet')\n", | 
|  | "spark.read.parquet('bar.parquet').show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "### ORC" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 29, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----+------+---+---+\n", | 
|  | "|color| fruit| v1| v2|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "|  red|banana|  1| 10|\n", | 
|  | "| blue|banana|  2| 20|\n", | 
|  | "|  red|carrot|  3| 30|\n", | 
|  | "| blue| grape|  4| 40|\n", | 
|  | "|  red|carrot|  5| 50|\n", | 
|  | "|black|carrot|  6| 60|\n", | 
|  | "|  red|banana|  7| 70|\n", | 
|  | "|  red| grape|  8| 80|\n", | 
|  | "+-----+------+---+---+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.write.orc('zoo.orc')\n", | 
|  | "spark.read.orc('zoo.orc').show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "## Working with SQL\n", | 
|  | "\n", | 
|  | "DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 30, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+--------+\n", | 
|  | "|count(1)|\n", | 
|  | "+--------+\n", | 
|  | "|       8|\n", | 
|  | "+--------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "df.createOrReplaceTempView(\"tableA\")\n", | 
|  | "spark.sql(\"SELECT count(*) from tableA\").show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "In addition, UDFs can be registered and invoked in SQL out of the box:" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 31, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----------+\n", | 
|  | "|add_one(v1)|\n", | 
|  | "+-----------+\n", | 
|  | "|          2|\n", | 
|  | "|          3|\n", | 
|  | "|          4|\n", | 
|  | "|          5|\n", | 
|  | "|          6|\n", | 
|  | "|          7|\n", | 
|  | "|          8|\n", | 
|  | "|          9|\n", | 
|  | "+-----------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "@pandas_udf(\"integer\")\n", | 
|  | "def add_one(s: pd.Series) -> pd.Series:\n", | 
|  | "    return s + 1\n", | 
|  | "\n", | 
|  | "spark.udf.register(\"add_one\", add_one)\n", | 
|  | "spark.sql(\"SELECT add_one(v1) FROM tableA\").show()" | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "markdown", | 
|  | "metadata": {}, | 
|  | "source": [ | 
|  | "These SQL expressions can directly be mixed and used as PySpark columns." | 
|  | ] | 
|  | }, | 
|  | { | 
|  | "cell_type": "code", | 
|  | "execution_count": 32, | 
|  | "metadata": {}, | 
|  | "outputs": [ | 
|  | { | 
|  | "name": "stdout", | 
|  | "output_type": "stream", | 
|  | "text": [ | 
|  | "+-----------+\n", | 
|  | "|add_one(v1)|\n", | 
|  | "+-----------+\n", | 
|  | "|          2|\n", | 
|  | "|          3|\n", | 
|  | "|          4|\n", | 
|  | "|          5|\n", | 
|  | "|          6|\n", | 
|  | "|          7|\n", | 
|  | "|          8|\n", | 
|  | "|          9|\n", | 
|  | "+-----------+\n", | 
|  | "\n", | 
|  | "+--------------+\n", | 
|  | "|(count(1) > 0)|\n", | 
|  | "+--------------+\n", | 
|  | "|          true|\n", | 
|  | "+--------------+\n", | 
|  | "\n" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "source": [ | 
|  | "from pyspark.sql.functions import expr\n", | 
|  | "\n", | 
|  | "df.selectExpr('add_one(v1)').show()\n", | 
|  | "df.select(expr('count(*)') > 0).show()" | 
|  | ] | 
|  | } | 
|  | ], | 
|  | "metadata": { | 
|  | "kernelspec": { | 
|  | "display_name": "Python 3", | 
|  | "language": "python", | 
|  | "name": "python3" | 
|  | }, | 
|  | "language_info": { | 
|  | "codemirror_mode": { | 
|  | "name": "ipython", | 
|  | "version": 3 | 
|  | }, | 
|  | "file_extension": ".py", | 
|  | "mimetype": "text/x-python", | 
|  | "name": "python", | 
|  | "nbconvert_exporter": "python", | 
|  | "pygments_lexer": "ipython3", | 
|  | "version": "3.7.10" | 
|  | }, | 
|  | "name": "quickstart", | 
|  | "notebookId": 1927513300154480 | 
|  | }, | 
|  | "nbformat": 4, | 
|  | "nbformat_minor": 1 | 
|  | } |