{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1619d229-6f5c-4a31-9992-81bce15f7ef1",
   "metadata": {},
   "source": [
    "# Chapter 4: Bug Busting - Debugging PySpark\n",
    "\n",
    "PySpark executes applications in a distributed environment, making it challenging to\n",
    "monitor and debug these applications. It can be difficult to track which nodes are\n",
    "executing specific code. However, there are multiple methods available within PySpark\n",
    "to help with debugging. This section will outline how to effectively debug PySpark\n",
    "applications.\n",
    "\n",
    "PySpark operates using Spark as its underlying engine, utilizing Spark Connect server\n",
    "or Py4J (Spark Classic) to submit and compute jobs in Spark.\n",
    "\n",
    "On the driver side, PySpark interacts with the Spark Driver on JVM through Spark\n",
    "Connect server or Py4J (Spark Classic). When `pyspark.sql.SparkSession` is created and\n",
    "initialized, PySpark starts to communicate with the Spark Driver.\n",
    "\n",
    "On the executor side, Python workers are responsible for executing and managing Python\n",
    "native functions or data. These workers are only launched if the PySpark application\n",
    "requires interaction between Python and JVMs such as Python UDF execution. They are\n",
    "initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "56890562-0151-45ac-903e-45b4f1d40d33",
   "metadata": {},
   "source": [
    "## Spark UI\n",
    "\n",
    "### Python UDF Execution\n",
    "\n",
    "Debugging a Python UDF in PySpark can be done by simply adding print statements, though\n",
    "the output won't be visible in the client/driver side since the functions are executed\n",
    "on the executors - they can be seen in Spark UI. For example, if you have a working\n",
    "Python UDF:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "a9219c08-df6c-40d7-a73d-a5950ee7df0b",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "\n",
    "@udf(\"integer\")\n",
    "def my_udf(x):\n",
    "    # Do something with x\n",
    "    return x"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "4875da48-03ee-4155-9257-b5514270d591",
   "metadata": {},
   "source": [
    "You can add print statements for debugging as shown below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "2b9102be-e7df-4b80-a70e-87ef7e76a913",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(my_udf(id)=0)]"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "@udf(\"integer\")\n",
    "def my_udf(x):\n",
    "    # Do something with x\n",
    "    print(\"What's going on?\")\n",
    "    return x\n",
    "\n",
    "spark.range(1).select(my_udf(\"id\")).collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "899b5d67-078a-4a12-8da8-7a841432ace2",
   "metadata": {},
   "source": [
    "The output can be viewed in the Spark UI under `stdout`/`stderr` at  `Executors` tab.\n",
    "\n",
    "![Spark UI print](./assets/pyspark-ui-print.png)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "154262f2-6f22-483b-9676-75901b325c66",
   "metadata": {},
   "source": [
    "### Non-Python UDF\n",
    "\n",
    "When running non-Python UDF code, debugging is typically done via the Spark UI or\n",
    "by using `DataFrame.explain(True)`.\n",
    "\n",
    "For instance, the code below performs a join between a large DataFrame (`df1`) and a\n",
    "smaller one (`df2`):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "55070bab-5659-4bb1-98ff-1a3eb6231218",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Physical Plan ==\n",
      "AdaptiveSparkPlan isFinalPlan=false\n",
      "+- Project [_1#6L]\n",
      "   +- SortMergeJoin [_1#6L], [_1#8L], Inner\n",
      "      :- Sort [_1#6L ASC NULLS FIRST], false, 0\n",
      "      :  +- Exchange hashpartitioning(_1#6L, 200), ENSURE_REQUIREMENTS, [plan_id=41]\n",
      "      :     +- Filter isnotnull(_1#6L)\n",
      "      :        +- Scan ExistingRDD[_1#6L]\n",
      "      +- Sort [_1#8L ASC NULLS FIRST], false, 0\n",
      "         +- Exchange hashpartitioning(_1#8L, 200), ENSURE_REQUIREMENTS, [plan_id=42]\n",
      "            +- Filter isnotnull(_1#8L)\n",
      "               +- Scan ExistingRDD[_1#8L]\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df1 = spark.createDataFrame([(x,) for x in range(100)])\n",
    "df2 = spark.createDataFrame([(x,) for x in range(2)])\n",
    "df1.join(df2, \"_1\").explain()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b8b7a595-006c-4417-9683-bc002d85b789",
   "metadata": {},
   "source": [
    "Using `DataFrame.explain` displays the physical plans, showing how the join will\n",
    "be executed. Those physical plans represent individual steps for the whole execution.\n",
    "Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "97fc5dcb-8531-4618-a66f-d84ca5095fdd",
   "metadata": {},
   "source": [
    "\n",
    "After checking how the plans are generated via this method, users can optimize their queries.\n",
    "For example, because `df2` is very small, it can be broadcasted to executors\n",
    "and remove the shuffle\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "7a1a985e-d260-49ce-a054-e70e3ed7e9e9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Physical Plan ==\n",
      "AdaptiveSparkPlan isFinalPlan=false\n",
      "+- Project [_1#6L]\n",
      "   +- BroadcastHashJoin [_1#6L], [_1#8L], Inner, BuildRight, false\n",
      "      :- Filter isnotnull(_1#6L)\n",
      "      :  +- Scan ExistingRDD[_1#6L]\n",
      "      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=71]\n",
      "         +- Filter isnotnull(_1#8L)\n",
      "            +- Scan ExistingRDD[_1#8L]\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import broadcast\n",
    "\n",
    "df1.join(broadcast(df2), \"_1\").explain()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "510949c3-91d1-475f-948f-34eed8617a41",
   "metadata": {},
   "source": [
    "As can be seen the shuffle is removed, and it performs broadcast-hash-join:"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "028aab52-adea-4b5d-806a-de79e9c54e71",
   "metadata": {},
   "source": [
    "\n",
    "These optimizations can also be visualized in the Spark UI under the SQL / DataFrame\n",
    "tab after execution.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "5cc67309-cd0b-49dc-b8d9-8d2c3a5aa944",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(_1=0), Row(_1=1)]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df1.join(df2, \"_1\").collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "97a09efc-8fcd-400c-a052-8aef3bf7ce15",
   "metadata": {},
   "source": [
    "\n",
    "![PySpark UI SQL](./assets/pyspark-ui-sql.png)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "25b0f45f-26b6-485d-88df-a1fb323fd3f4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(_1=0), Row(_1=1)]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df1.join(broadcast(df2), \"_1\").collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "657d75db-1a64-4a97-8019-4ad1dd974997",
   "metadata": {},
   "source": [
    "![PySpark UI SQL broadcast](./assets/pyspark-ui-sql-broadcast.png)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "cf1ab4f6-fe61-409a-9da5-afabdd7c987e",
   "metadata": {},
   "source": [
    "## Monitor with `top` and `ps`\n",
    "\n",
    "On the driver side, you can obtain the process ID from your PySpark shell to\n",
    "monitor resources:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "f37891f2-1bfc-4995-ba8e-9ac351935bc8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "23976"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os; os.getpid()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "cef297f1-3772-40d9-85d9-bdd0c9761c38",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "  UID   PID  PPID   C STIME   TTY           TIME CMD\n",
      "  502 23976 21512   0 12:06PM ??         0:02.30 /opt/miniconda3/envs/python3.11/bin/python -m ipykernel_launcher -f /Users/hyukjin.kwon/Library/Jupyter/runtime/kernel-c8eb73ef-2b21-418e-b770-92b946454606.json\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "ps -fe 23976"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "296a6fbb-7ca8-448b-82f4-e7ee6d4359e2",
   "metadata": {},
   "source": [
    "On the executor side, you can use `grep` to find the process IDs and resources for\n",
    "Python workers, as these are forked from `pyspark.daemon`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "15b6127c-67ef-4b6f-b4be-7c05af5d12bb",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "  502 23989 23981   0 12:06PM ??         0:00.59 python3 -m pyspark.daemon pyspark.worker\n",
      "  502 23990 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker\n",
      "  502 23991 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker\n",
      "  502 23992 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker\n",
      "  502 23993 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "ps -fe | grep pyspark.daemon | head -n 5"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "d4088643-1903-4d24-b4b6-cce097a92124",
   "metadata": {},
   "source": [
    "Typically, users leverage top and the identified PIDs to monitor the memory usage\n",
    "of Python processes in PySpark."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "2949bb68-0570-44b9-af19-d0b3c260dc49",
   "metadata": {},
   "source": [
    "## Use PySpark Profilers\n",
    "\n",
    "### Memory Profiler\n",
    "\n",
    "In order to debug the driver side, users typically can use most of the existing\n",
    "Python tools such as [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n",
    "that allow you to check the memory usage line by line. If your driver program\n",
    "is not running on another machine (e.g., YARN cluster mode), you can use a memory\n",
    "profiler to debug memory usage on the driver side. For example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "cee1ae3c-0abe-4e38-b904-7f0c803441a5",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Filename: profile_memory.py\n",
      "\n",
      "Line #    Mem usage    Increment  Occurrences   Line Contents\n",
      "=============================================================\n",
      "     4     80.6 MiB     80.6 MiB           1   @profile\n",
      "     5                                         #=====================================================\n",
      "     6                                         def my_func():\n",
      "     7     79.0 MiB     -1.7 MiB           1       session = SparkSession.builder.getOrCreate()\n",
      "     8     80.1 MiB      1.1 MiB           1       df = session.range(10000)\n",
      "     9     84.1 MiB      4.0 MiB           1       return df.collect()\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "\n",
    "echo \"from pyspark.sql import SparkSession\n",
    "#===Your function should be decorated with @profile===\n",
    "from memory_profiler import profile\n",
    "@profile\n",
    "#=====================================================\n",
    "def my_func():\n",
    "    session = SparkSession.builder.getOrCreate()\n",
    "    df = session.range(10000)\n",
    "    return df.collect()\n",
    "if __name__ == '__main__':\n",
    "    my_func()\" > profile_memory.py\n",
    "\n",
    "python -m memory_profiler profile_memory.py 2> /dev/null"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "98340ba5-abe0-4f92-ae48-26b63c6f5811",
   "metadata": {},
   "source": [
    "It shows which line consumes how much memory properly."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7f01c836",
   "metadata": {},
   "source": [
    "#### Python/Pandas UDF"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "cc3e1ccc",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-info\">\n",
    "<b>Note:</b> This section applies to Spark 4.0\n",
    "</div>"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "0b2926cf-df02-42c8-98fa-5822849e901f",
   "metadata": {},
   "source": [
    "PySpark provides remote [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n",
    "for Python/Pandas UDFs. That can be used on editors with line numbers such as\n",
    "Jupyter notebooks. SparkSession-based memory profiler can be enabled by setting\n",
    "the runtime SQL configuration `spark.sql.pyspark.udf.profiler` to `memory`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "553d9780-b30b-4e96-a134-ca1c06341c89",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "============================================================\n",
      "Profile of UDF<id=16>\n",
      "============================================================\n",
      "Filename: /var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/885006762.py\n",
      "\n",
      "Line #    Mem usage    Increment  Occurrences   Line Contents\n",
      "=============================================================\n",
      "     5   1472.6 MiB   1472.6 MiB          10   @pandas_udf(\"long\")\n",
      "     6                                         def add1(x):\n",
      "     7   1473.9 MiB      1.3 MiB          10     return x + 1\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import pandas_udf\n",
    "\n",
    "df = spark.range(10)\n",
    "\n",
    "@pandas_udf(\"long\")\n",
    "def add1(x):\n",
    "  return x + 1\n",
    "\n",
    "spark.conf.set(\"spark.sql.pyspark.udf.profiler\", \"memory\")\n",
    "\n",
    "added = df.select(add1(\"id\"))\n",
    "spark.profile.clear()\n",
    "added.collect()\n",
    "spark.profile.show(type=\"memory\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "abaf0439-43de-4482-8a80-258be3d98366",
   "metadata": {},
   "source": [
    "The UDF IDs (e.g., 16) can be seen in the query plan, for example, `add1(...)#16L` in\n",
    "`ArrowEvalPython` as shown below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "607f9f0c-6288-4bd2-99b8-2cc7e62098ff",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Physical Plan ==\n",
      "*(2) Project [pythonUDF0#19L AS add1(id)#17L]\n",
      "+- ArrowEvalPython [add1(id#14L)#16L], [pythonUDF0#19L], 200\n",
      "   +- *(1) Range (0, 10, step=1, splits=16)\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "added.explain()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "06e40cdc",
   "metadata": {},
   "source": [
    "### Performance Profiler"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7909aba0",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-info\">\n",
    "<b>Note:</b> This section applies to Spark 4.0\n",
    "</div>"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e66c991c-1b07-45d0-aae2-c79c362e2210",
   "metadata": {},
   "source": [
    "[Python Profilers](https://docs.python.org/3/library/profile.html) are useful built-in\n",
    "features in Python itself for profiling performance. To use this on driver side, you can use it as you would\n",
    "do for regular Python programs because PySpark on driver side is a regular Python\n",
    "process unless you are running your driver program in another machine\n",
    "(e.g., YARN cluster mode)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "8d9ada24-81da-4c31-aaa2-b3578953b07b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "         549275 function calls (536745 primitive calls) in 3.447 seconds\n",
      "\n",
      "   Ordered by: cumulative time\n",
      "\n",
      "   ncalls  tottime  percall  cumtime  percall filename:lineno(function)\n",
      "        2    0.000    0.000    3.448    1.724 app.py:1(<module>)\n",
      "    792/1    0.005    0.000    3.447    3.447 {built-in method builtins.exec}\n",
      "      128    0.000    0.000    2.104    0.016 socket.py:692(readinto)\n",
      "      128    2.104    0.016    2.104    0.016 {method 'recv_into' of '_socket.socket' objects}\n",
      "      124    0.000    0.000    2.100    0.017 java_gateway.py:1015(send_command)\n",
      "      125    0.001    0.000    2.099    0.017 clientserver.py:499(send_command)\n",
      "      138    0.000    0.000    2.097    0.015 {method 'readline' of '_io.BufferedReader' objects}\n",
      "       55    0.000    0.000    1.622    0.029 java_gateway.py:1313(__call__)\n",
      "       95    0.001    0.000    1.360    0.014 __init__.py:1(<module>)\n",
      "        1    0.000    0.000    1.359    1.359 session.py:438(getOrCreate)\n",
      "        1    0.000    0.000    1.311    1.311 context.py:491(getOrCreate)\n",
      "        1    0.000    0.000    1.311    1.311 context.py:169(__init__)\n",
      "        1    0.000    0.000    0.861    0.861 context.py:424(_ensure_initialized)\n",
      "        1    0.001    0.001    0.861    0.861 java_gateway.py:39(launch_gateway)\n",
      "        8    0.840    0.105    0.840    0.105 {built-in method time.sleep}\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "\n",
    "echo \"from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder.getOrCreate()\n",
    "spark.range(10).collect()\" > app.py\n",
    "\n",
    "python -m cProfile -s cumulative app.py  2> /dev/null | head -n 20"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "3e5ae42d",
   "metadata": {},
   "source": [
    "#### Python/Pandas UDF"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "eb137611",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-info\">\n",
    "<b>Note:</b> This section applies to Spark 4.0\n",
    "</div>"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c9b9a26c-56fa-4a14-adfd-2697a87c479e",
   "metadata": {},
   "source": [
    "PySpark provides remote Python Profilers for Python/Pandas UDFs. UDFs with\n",
    "iterators as inputs/outputs are not supported. SparkSession-based performance\n",
    "profiler can be enabled by setting the runtime SQL configuration\n",
    "`spark.sql.pyspark.udf.profiler` to `perf`. An example is as shown below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "fcba873d-7a4f-42f9-b796-3c492c7e8077",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "============================================================\n",
      "Profile of UDF<id=22>\n",
      "============================================================\n",
      "         2130 function calls (2080 primitive calls) in 0.003 seconds\n",
      "\n",
      "   Ordered by: internal time, cumulative time\n",
      "\n",
      "   ncalls  tottime  percall  cumtime  percall filename:lineno(function)\n",
      "       10    0.001    0.000    0.003    0.000 common.py:62(new_method)\n",
      "       10    0.000    0.000    0.000    0.000 {built-in method _operator.add}\n",
      "       10    0.000    0.000    0.002    0.000 base.py:1371(_arith_method)\n",
      "       10    0.000    0.000    0.001    0.000 series.py:389(__init__)\n",
      "       20    0.000    0.000    0.000    0.000 _ufunc_config.py:33(seterr)\n",
      "       10    0.000    0.000    0.001    0.000 series.py:6201(_construct_result)\n",
      "       10    0.000    0.000    0.000    0.000 cast.py:1605(maybe_cast_to_integer_array)\n",
      "       10    0.000    0.000    0.000    0.000 construction.py:517(sanitize_array)\n",
      "       10    0.000    0.000    0.002    0.000 series.py:6133(_arith_method)\n",
      "       10    0.000    0.000    0.000    0.000 managers.py:1863(from_array)\n",
      "       10    0.000    0.000    0.000    0.000 array_ops.py:240(arithmetic_op)\n",
      "      510    0.000    0.000    0.000    0.000 {built-in method builtins.isinstance}\n"
     ]
    }
   ],
   "source": [
    "import io\n",
    "from contextlib import redirect_stdout\n",
    "\n",
    "from pyspark.sql.functions import pandas_udf\n",
    "\n",
    "df = spark.range(10)\n",
    "@pandas_udf(\"long\")\n",
    "def add1(x):\n",
    "    return x + 1\n",
    "\n",
    "added = df.select(add1(\"id\"))\n",
    "\n",
    "spark.conf.set(\"spark.sql.pyspark.udf.profiler\", \"perf\")\n",
    "spark.profile.clear()\n",
    "added.collect()\n",
    "\n",
    "# Only show top 10 lines\n",
    "output = io.StringIO()\n",
    "with redirect_stdout(output):\n",
    "    spark.profile.show(type=\"perf\")\n",
    "\n",
    "print(\"\\n\".join(output.getvalue().split(\"\\n\")[0:20]))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b35fb6c4-c074-4866-abd6-b1b435721b67",
   "metadata": {},
   "source": [
    "The UDF IDs (e.g., 22) can be seen in the query plan, for example, `add1(...)#22L` in\n",
    "`ArrowEvalPython` below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "de015526-577b-45ea-a6e5-598cf215ef8b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Physical Plan ==\n",
      "*(2) Project [pythonUDF0#25L AS add1(id)#23L]\n",
      "+- ArrowEvalPython [add1(id#20L)#22L], [pythonUDF0#25L], 200\n",
      "   +- *(1) Range (0, 10, step=1, splits=16)\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "added.explain()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "fe046e5e-73bb-466f-a373-9d5a445b0fa1",
   "metadata": {},
   "source": [
    "We can render the result with a preregistered renderer as shown below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "e2507dc0-ab64-4afe-ae8a-19c52533e57c",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.profile.render(id=2, type=\"perf\")  # renderer=\"flameprof\" by default"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "ba892df0-4058-46ad-a952-791559da5259",
   "metadata": {},
   "source": [
    "![PySpark UDF profiling](./assets/pyspark-udf-profile.png)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "09b420ba",
   "metadata": {},
   "source": [
    "## Display Stacktraces"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "9756e41b",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-info\">\n",
    "<b>Note:</b> This section applies to Spark 4.0\n",
    "</div>"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c7bf0b21-f9a8-4cc0-8288-46f7ef4f4f52",
   "metadata": {},
   "source": [
    "By default, JVM stacktraces and Python internal tracebacks are hidden especially\n",
    "in Python UDF executions. For example,"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "bafce04e-5d7c-40e0-9342-0ffb94c858c7",
   "metadata": {},
   "outputs": [
    {
     "ename": "PythonException",
     "evalue": "\n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31mPythonException\u001b[0m: \n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "\n",
    "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "78100070-b9db-4efd-ad94-a8e6a00ee68a",
   "metadata": {},
   "source": [
    "\n",
    "To show the whole internal stacktraces, users can enable\n",
    "`spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled` and `spark.sql.pyspark.jvmStacktrace.enabled`\n",
    "respectively.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "425b24b8-acd0-4897-bc8c-75af6316f430",
   "metadata": {},
   "outputs": [
    {
     "ename": "PythonException",
     "evalue": "\n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1898, in main\n    process()\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1890, in process\n    serializer.dump_stream(out_iter, outfile)\n  File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 224, in dump_stream\n    self.serializer.dump_stream(self._batched(iterator), stream)\n  File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 145, in dump_stream\n    for obj in iterator:\n  File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 213, in _batched\n    for item in iterator:\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in mapper\n    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in <genexpr>\n    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 114, in <lambda>\n    return args_kwargs_offsets, lambda *a: func(*a)\n                                           ^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/util.py\", line 145, in wrapper\n    return f(*args, **kwargs)\n           ^^^^^^^^^^^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 739, in profiling_func\n    ret = f(*args, **kwargs)\n          ^^^^^^^^^^^^^^^^^^\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31mPythonException\u001b[0m: \n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1898, in main\n    process()\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1890, in process\n    serializer.dump_stream(out_iter, outfile)\n  File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 224, in dump_stream\n    self.serializer.dump_stream(self._batched(iterator), stream)\n  File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 145, in dump_stream\n    for obj in iterator:\n  File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 213, in _batched\n    for item in iterator:\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in mapper\n    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in <genexpr>\n    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 114, in <lambda>\n    return args_kwargs_offsets, lambda *a: func(*a)\n                                           ^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/util.py\", line 145, in wrapper\n    return f(*args, **kwargs)\n           ^^^^^^^^^^^^^^^^^^\n  File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 739, in profiling_func\n    ret = f(*args, **kwargs)\n          ^^^^^^^^^^^^^^^^^^\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n"
     ]
    }
   ],
   "source": [
    "spark.conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", False)\n",
    "spark.conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", False)\n",
    "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "5ec62d86-3631-4f48-a10c-4bcd727b1eb6",
   "metadata": {},
   "outputs": [
    {
     "ename": "PythonException",
     "evalue": "\n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\nJVM stacktrace:\norg.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:338)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2887)\n\tat scala.Option.getOrElse(Option.scala:201)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2887)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2879)\n\tat scala.collection.immutable.List.foreach(List.scala:334)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2879)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1283)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1283)\n\tat scala.Option.foreach(Option.scala:437)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1283)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3158)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3092)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3081)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:417)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1056)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4265)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:155)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:222)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4262)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:338)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t... 1 more\n",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31mPythonException\u001b[0m: \n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\nJVM stacktrace:\norg.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\t...\n"
     ]
    }
   ],
   "source": [
    "spark.conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", True)\n",
    "spark.conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", True)\n",
    "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "59bab886-7a57-4736-89b1-e3776b3b991e",
   "metadata": {},
   "source": [
    "See also [Stack Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces) for more details."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cff22ba8",
   "metadata": {},
   "source": [
    "## Python Worker Logging\n",
    "\n",
    "<div class=\"alert alert-block alert-info\">\n",
    "<b>Note:</b> This section applies to Spark 4.1\n",
    "</div>\n",
    "\n",
    "PySpark provides a logging mechanism for Python workers that execute UDFs, UDTFs, Pandas UDFs, and Python data sources. When enabled, all logging output (including `print` statements, standard logging, and exceptions) is captured and made available for querying and analysis.\n",
    "\n",
    "### Enabling Worker Logging\n",
    "\n",
    "Worker logging is **disabled by default**. Enable it by setting the Spark SQL configuration:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "74786d45",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0f23fee2",
   "metadata": {},
   "source": [
    "### Accessing Logs\n",
    "\n",
    "All captured logs can be queried as a DataFrame:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "9db0c509",
   "metadata": {},
   "outputs": [],
   "source": [
    "logs = spark.tvf.python_worker_logs()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "34bca836",
   "metadata": {},
   "source": [
    "The logs DataFrame contains the following columns:\n",
    "\n",
    "- **ts**: Timestamp of the log entry\n",
    "- **level**: Log level (e.g., `\"INFO\"`, `\"WARNING\"`, `\"ERROR\"`)\n",
    "- **logger**: Logger name (e.g., custom logger name, `\"stdout\"`, `\"stderr\"`)\n",
    "- **msg**: The log message\n",
    "- **context**: A map containing contextual information (e.g., `func_name`, `class_name`, custom fields)\n",
    "- **exception**: Exception details (if an exception was logged)\n",
    "\n",
    "### Examples\n",
    "\n",
    "#### Basic UDF Logging"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "4cb5bbca",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+\n",
      "|my_udf(text)|\n",
      "+------------+\n",
      "|       HELLO|\n",
      "|       WORLD|\n",
      "+------------+\n",
      "\n",
      "+-------+------------------------+----------------+---------------------+\n",
      "|level  |msg                     |logger          |context              |\n",
      "+-------+------------------------+----------------+---------------------+\n",
      "|INFO   |Processing value: hello |my_custom_logger|{func_name -> my_udf}|\n",
      "|WARNING|This is a warning       |my_custom_logger|{func_name -> my_udf}|\n",
      "|INFO   |This is a stdout message|stdout          |{func_name -> my_udf}|\n",
      "|ERROR  |This is a stderr message|stderr          |{func_name -> my_udf}|\n",
      "|INFO   |Processing value: world |my_custom_logger|{func_name -> my_udf}|\n",
      "|WARNING|This is a warning       |my_custom_logger|{func_name -> my_udf}|\n",
      "|INFO   |This is a stdout message|stdout          |{func_name -> my_udf}|\n",
      "|ERROR  |This is a stderr message|stderr          |{func_name -> my_udf}|\n",
      "+-------+------------------------+----------------+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "import logging\n",
    "import sys\n",
    "\n",
    "@udf(\"string\")\n",
    "def my_udf(value):\n",
    "    logger = logging.getLogger(\"my_custom_logger\")\n",
    "    logger.setLevel(logging.INFO)  # Set level to INFO to capture info messages\n",
    "    logger.info(f\"Processing value: {value}\")\n",
    "    logger.warning(\"This is a warning\")\n",
    "    print(\"This is a stdout message\")  # INFO level, logger=stdout\n",
    "    print(\"This is a stderr message\", file=sys.stderr)  # ERROR level, logger=stderr\n",
    "    return value.upper()\n",
    "\n",
    "# Enable logging and execute\n",
    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
    "df = spark.createDataFrame([(\"hello\",), (\"world\",)], [\"text\"])\n",
    "df.select(my_udf(\"text\")).show()\n",
    "\n",
    "# Query the logs\n",
    "logs = spark.tvf.python_worker_logs()\n",
    "logs.select(\"level\", \"msg\", \"logger\", \"context\").show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "15a80ffb",
   "metadata": {},
   "source": [
    "#### Logging with Custom Context\n",
    "\n",
    "You can add custom context information to your logs:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "427a06c5",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+\n",
      "|contextual_udf(test)|\n",
      "+--------------------+\n",
      "|                test|\n",
      "+--------------------+\n",
      "\n",
      "+-----------------------------+---------------------------------------------------------------------+\n",
      "|msg                          |context                                                              |\n",
      "+-----------------------------+---------------------------------------------------------------------+\n",
      "|Processing with extra context|{func_name -> contextual_udf, user_id -> 123, operation -> transform}|\n",
      "+-----------------------------+---------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import lit, udf\n",
    "import logging\n",
    "\n",
    "@udf(\"string\")\n",
    "def contextual_udf(value):\n",
    "    logger = logging.getLogger(\"contextual\")\n",
    "    logger.warning(\n",
    "        \"Processing with extra context\",\n",
    "        extra={\"context\": {\"user_id\": 123, \"operation\": \"transform\"}}\n",
    "    )\n",
    "    return value\n",
    "\n",
    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
    "spark.range(1).select(contextual_udf(lit(\"test\"))).show()\n",
    "\n",
    "logs = spark.tvf.python_worker_logs()\n",
    "logs.filter(\"logger = 'contextual'\").select(\"msg\", \"context\").show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a19db296",
   "metadata": {},
   "source": [
    "The context includes both automatic fields (like `func_name`) and custom fields (like `user_id`, `operation`).\n",
    "\n",
    "#### Exception Logging\n",
    "\n",
    "Exceptions are automatically captured with full stack traces:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "3ab34a4c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|failing_udf(value)|\n",
      "+------------------+\n",
      "|                -1|\n",
      "|                20|\n",
      "+------------------+\n",
      "\n",
      "+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
      "|msg                      |exception                                                                                                                                     |\n",
      "+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
      "|Division by zero occurred|{ZeroDivisionError, division by zero, [{NULL, failing_udf, /var/folders/r8/0v7zwfbd59q4ym2gn6kxjq8h0000gp/T/ipykernel_79089/916837455.py, 8}]}|\n",
      "+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "import logging\n",
    "\n",
    "@udf(\"int\")\n",
    "def failing_udf(x):\n",
    "    logger = logging.getLogger(\"error_handler\")\n",
    "    try:\n",
    "        result = 100 / x\n",
    "    except ZeroDivisionError:\n",
    "        logger.exception(\"Division by zero occurred\")\n",
    "        return -1\n",
    "    return int(result)\n",
    "\n",
    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
    "spark.createDataFrame([(0,), (5,)], [\"value\"]).select(failing_udf(\"value\")).show()\n",
    "\n",
    "logs = spark.tvf.python_worker_logs()\n",
    "logs.filter(\"logger = 'error_handler'\").select(\"msg\", \"exception\").show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e54f6ac3",
   "metadata": {},
   "source": [
    "#### UDTF and Python Data Source Logging\n",
    "\n",
    "Worker logging also works with UDTFs and Python Data Sources, capturing both the class and function names:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "02d454b0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------+-----+------+\n",
      "|       text| word|length|\n",
      "+-----------+-----+------+\n",
      "|hello world|hello|     5|\n",
      "|hello world|world|     5|\n",
      "+-----------+-----+------+\n",
      "\n",
      "+-----------------------------+---------------------------------------------------------------------+\n",
      "|msg                          |context                                                              |\n",
      "+-----------------------------+---------------------------------------------------------------------+\n",
      "|Processing 2 words           |{func_name -> eval, class_name -> WordSplitter}                      |\n",
      "+-----------------------------+---------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import col, udtf\n",
    "import logging\n",
    "\n",
    "@udtf(returnType=\"word: string, length: int\")\n",
    "class WordSplitter:\n",
    "    def eval(self, text: str):\n",
    "        logger = logging.getLogger(\"udtf_logger\")\n",
    "        logger.setLevel(logging.INFO)  # Set level to INFO to capture info messages\n",
    "        words = text.split()\n",
    "        logger.info(f\"Processing {len(words)} words\")\n",
    "        for word in words:\n",
    "            yield (word, len(word))\n",
    "\n",
    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
    "df = spark.createDataFrame([(\"hello world\",)], [\"text\"])\n",
    "df.lateralJoin(WordSplitter(col(\"text\").outer())).show()\n",
    "\n",
    "logs = spark.tvf.python_worker_logs()\n",
    "logs.filter(\"logger = 'udtf_logger'\").select(\"msg\", \"context\").show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9d4c119b",
   "metadata": {},
   "source": [
    "### Querying and Analyzing Logs\n",
    "\n",
    "You can use standard DataFrame operations to analyze logs:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "5b061011",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----+\n",
      "|  level|count|\n",
      "+-------+-----+\n",
      "|   INFO|    5|\n",
      "|WARNING|    3|\n",
      "|  ERROR|    3|\n",
      "+-------+-----+\n",
      "\n",
      "...\n",
      "\n"
     ]
    }
   ],
   "source": [
    "logs = spark.tvf.python_worker_logs()\n",
    "\n",
    "# Count logs by level\n",
    "logs.groupBy(\"level\").count().show()\n",
    "\n",
    "# Find all errors\n",
    "logs.filter(\"level = 'ERROR'\").show()\n",
    "\n",
    "# Logs from a specific function\n",
    "logs.filter(\"context.func_name = 'my_udf'\").show()\n",
    "\n",
    "# Logs with exceptions\n",
    "logs.filter(\"exception is not null\").show()\n",
    "\n",
    "# Time-based analysis\n",
    "logs.orderBy(\"ts\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7eaa72b9",
   "metadata": {},
   "source": [
    "\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "0611287d-cb34-457e-9bc3-f5629ddea484",
   "metadata": {},
   "source": [
    "\n",
    "## IDE Debugging"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "80f080c4",
   "metadata": {},
   "source": [
    "On the driver side, no additional steps are needed to use IDE for debugging your PySpark application. Refer to the guide below:\n",
    "\n",
    "- [Setting up IDEs](https://spark.apache.org/docs/latest/api/python/development/setting_ide.html)\n",
    "\n",
    "On the executor side, it requires several steps to set up the remote debugger. Refer to the guide below:\n",
    "\n",
    "- [Remote Debugging (PyCharm Professional)](https://spark.apache.org/docs/latest/api/python/development/debugging.html#remote-debugging-pycharm-professional)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
