| { |
| "cells": [ |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "1619d229-6f5c-4a31-9992-81bce15f7ef1", |
| "metadata": {}, |
| "source": [ |
| "# Chapter 4: Bug Busting - Debugging PySpark\n", |
| "\n", |
| "PySpark executes applications in a distributed environment, making it challenging to\n", |
| "monitor and debug these applications. It can be difficult to track which nodes are\n", |
| "executing specific code. However, there are multiple methods available within PySpark\n", |
| "to help with debugging. This section will outline how to effectively debug PySpark\n", |
| "applications.\n", |
| "\n", |
| "PySpark operates using Spark as its underlying engine, utilizing Spark Connect server\n", |
| "or Py4J (Spark Classic) to submit and compute jobs in Spark.\n", |
| "\n", |
| "On the driver side, PySpark interacts with the Spark Driver on JVM through Spark\n", |
| "Connect server or Py4J (Spark Classic). When `pyspark.sql.SparkSession` is created and\n", |
| "initialized, PySpark starts to communicate with the Spark Driver.\n", |
| "\n", |
| "On the executor side, Python workers are responsible for executing and managing Python\n", |
| "native functions or data. These workers are only launched if the PySpark application\n", |
| "requires interaction between Python and JVMs such as Python UDF execution. They are\n", |
| "initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "56890562-0151-45ac-903e-45b4f1d40d33", |
| "metadata": {}, |
| "source": [ |
| "## Spark UI\n", |
| "\n", |
| "### Python UDF Execution\n", |
| "\n", |
| "Debugging a Python UDF in PySpark can be done by simply adding print statements, though\n", |
| "the output won't be visible in the client/driver side since the functions are executed\n", |
| "on the executors - they can be seen in Spark UI. For example, if you have a working\n", |
| "Python UDF:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 1, |
| "id": "a9219c08-df6c-40d7-a73d-a5950ee7df0b", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from pyspark.sql.functions import udf\n", |
| "\n", |
| "@udf(\"integer\")\n", |
| "def my_udf(x):\n", |
| " # Do something with x\n", |
| " return x" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "4875da48-03ee-4155-9257-b5514270d591", |
| "metadata": {}, |
| "source": [ |
| "You can add print statements for debugging as shown below:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 2, |
| "id": "2b9102be-e7df-4b80-a70e-87ef7e76a913", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "[Row(my_udf(id)=0)]" |
| ] |
| }, |
| "execution_count": 2, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "@udf(\"integer\")\n", |
| "def my_udf(x):\n", |
| " # Do something with x\n", |
| " print(\"What's going on?\")\n", |
| " return x\n", |
| "\n", |
| "spark.range(1).select(my_udf(\"id\")).collect()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "899b5d67-078a-4a12-8da8-7a841432ace2", |
| "metadata": {}, |
| "source": [ |
| "The output can be viewed in the Spark UI under `stdout`/`stderr` at `Executors` tab.\n", |
| "\n", |
| "" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "154262f2-6f22-483b-9676-75901b325c66", |
| "metadata": {}, |
| "source": [ |
| "### Non-Python UDF\n", |
| "\n", |
| "When running non-Python UDF code, debugging is typically done via the Spark UI or\n", |
| "by using `DataFrame.explain(True)`.\n", |
| "\n", |
| "For instance, the code below performs a join between a large DataFrame (`df1`) and a\n", |
| "smaller one (`df2`):" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 3, |
| "id": "55070bab-5659-4bb1-98ff-1a3eb6231218", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "== Physical Plan ==\n", |
| "AdaptiveSparkPlan isFinalPlan=false\n", |
| "+- Project [_1#6L]\n", |
| " +- SortMergeJoin [_1#6L], [_1#8L], Inner\n", |
| " :- Sort [_1#6L ASC NULLS FIRST], false, 0\n", |
| " : +- Exchange hashpartitioning(_1#6L, 200), ENSURE_REQUIREMENTS, [plan_id=41]\n", |
| " : +- Filter isnotnull(_1#6L)\n", |
| " : +- Scan ExistingRDD[_1#6L]\n", |
| " +- Sort [_1#8L ASC NULLS FIRST], false, 0\n", |
| " +- Exchange hashpartitioning(_1#8L, 200), ENSURE_REQUIREMENTS, [plan_id=42]\n", |
| " +- Filter isnotnull(_1#8L)\n", |
| " +- Scan ExistingRDD[_1#8L]\n", |
| "\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "df1 = spark.createDataFrame([(x,) for x in range(100)])\n", |
| "df2 = spark.createDataFrame([(x,) for x in range(2)])\n", |
| "df1.join(df2, \"_1\").explain()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "b8b7a595-006c-4417-9683-bc002d85b789", |
| "metadata": {}, |
| "source": [ |
| "Using `DataFrame.explain` displays the physical plans, showing how the join will\n", |
| "be executed. Those physical plans represent individual steps for the whole execution.\n", |
| "Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "97fc5dcb-8531-4618-a66f-d84ca5095fdd", |
| "metadata": {}, |
| "source": [ |
| "\n", |
| "After checking how the plans are generated via this method, users can optimize their queries.\n", |
| "For example, because `df2` is very small, it can be broadcasted to executors\n", |
| "and remove the shuffle\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "id": "7a1a985e-d260-49ce-a054-e70e3ed7e9e9", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "== Physical Plan ==\n", |
| "AdaptiveSparkPlan isFinalPlan=false\n", |
| "+- Project [_1#6L]\n", |
| " +- BroadcastHashJoin [_1#6L], [_1#8L], Inner, BuildRight, false\n", |
| " :- Filter isnotnull(_1#6L)\n", |
| " : +- Scan ExistingRDD[_1#6L]\n", |
| " +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=71]\n", |
| " +- Filter isnotnull(_1#8L)\n", |
| " +- Scan ExistingRDD[_1#8L]\n", |
| "\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "from pyspark.sql.functions import broadcast\n", |
| "\n", |
| "df1.join(broadcast(df2), \"_1\").explain()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "510949c3-91d1-475f-948f-34eed8617a41", |
| "metadata": {}, |
| "source": [ |
| "As can be seen the shuffle is removed, and it performs broadcast-hash-join:" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "028aab52-adea-4b5d-806a-de79e9c54e71", |
| "metadata": {}, |
| "source": [ |
| "\n", |
| "These optimizations can also be visualized in the Spark UI under the SQL / DataFrame\n", |
| "tab after execution.\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 5, |
| "id": "5cc67309-cd0b-49dc-b8d9-8d2c3a5aa944", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "[Row(_1=0), Row(_1=1)]" |
| ] |
| }, |
| "execution_count": 5, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df1.join(df2, \"_1\").collect()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "97a09efc-8fcd-400c-a052-8aef3bf7ce15", |
| "metadata": {}, |
| "source": [ |
| "\n", |
| "\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "id": "25b0f45f-26b6-485d-88df-a1fb323fd3f4", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "[Row(_1=0), Row(_1=1)]" |
| ] |
| }, |
| "execution_count": 6, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "df1.join(broadcast(df2), \"_1\").collect()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "657d75db-1a64-4a97-8019-4ad1dd974997", |
| "metadata": {}, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "cf1ab4f6-fe61-409a-9da5-afabdd7c987e", |
| "metadata": {}, |
| "source": [ |
| "## Monitor with `top` and `ps`\n", |
| "\n", |
| "On the driver side, you can obtain the process ID from your PySpark shell to\n", |
| "monitor resources:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 7, |
| "id": "f37891f2-1bfc-4995-ba8e-9ac351935bc8", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "23976" |
| ] |
| }, |
| "execution_count": 7, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "import os; os.getpid()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 8, |
| "id": "cef297f1-3772-40d9-85d9-bdd0c9761c38", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| " UID PID PPID C STIME TTY TIME CMD\n", |
| " 502 23976 21512 0 12:06PM ?? 0:02.30 /opt/miniconda3/envs/python3.11/bin/python -m ipykernel_launcher -f /Users/hyukjin.kwon/Library/Jupyter/runtime/kernel-c8eb73ef-2b21-418e-b770-92b946454606.json\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%bash\n", |
| "ps -fe 23976" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "296a6fbb-7ca8-448b-82f4-e7ee6d4359e2", |
| "metadata": {}, |
| "source": [ |
| "On the executor side, you can use `grep` to find the process IDs and resources for\n", |
| "Python workers, as these are forked from `pyspark.daemon`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "id": "15b6127c-67ef-4b6f-b4be-7c05af5d12bb", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| " 502 23989 23981 0 12:06PM ?? 0:00.59 python3 -m pyspark.daemon pyspark.worker\n", |
| " 502 23990 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", |
| " 502 23991 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", |
| " 502 23992 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n", |
| " 502 23993 23989 0 12:06PM ?? 0:00.19 python3 -m pyspark.daemon pyspark.worker\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%bash\n", |
| "ps -fe | grep pyspark.daemon | head -n 5" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "d4088643-1903-4d24-b4b6-cce097a92124", |
| "metadata": {}, |
| "source": [ |
| "Typically, users leverage top and the identified PIDs to monitor the memory usage\n", |
| "of Python processes in PySpark." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "2949bb68-0570-44b9-af19-d0b3c260dc49", |
| "metadata": {}, |
| "source": [ |
| "## Use PySpark Profilers\n", |
| "\n", |
| "### Memory Profiler\n", |
| "\n", |
| "In order to debug the driver side, users typically can use most of the existing\n", |
| "Python tools such as [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n", |
| "that allow you to check the memory usage line by line. If your driver program\n", |
| "is not running on another machine (e.g., YARN cluster mode), you can use a memory\n", |
| "profiler to debug memory usage on the driver side. For example:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 10, |
| "id": "cee1ae3c-0abe-4e38-b904-7f0c803441a5", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Filename: profile_memory.py\n", |
| "\n", |
| "Line # Mem usage Increment Occurrences Line Contents\n", |
| "=============================================================\n", |
| " 4 80.6 MiB 80.6 MiB 1 @profile\n", |
| " 5 #=====================================================\n", |
| " 6 def my_func():\n", |
| " 7 79.0 MiB -1.7 MiB 1 session = SparkSession.builder.getOrCreate()\n", |
| " 8 80.1 MiB 1.1 MiB 1 df = session.range(10000)\n", |
| " 9 84.1 MiB 4.0 MiB 1 return df.collect()\n", |
| "\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%bash\n", |
| "\n", |
| "echo \"from pyspark.sql import SparkSession\n", |
| "#===Your function should be decorated with @profile===\n", |
| "from memory_profiler import profile\n", |
| "@profile\n", |
| "#=====================================================\n", |
| "def my_func():\n", |
| " session = SparkSession.builder.getOrCreate()\n", |
| " df = session.range(10000)\n", |
| " return df.collect()\n", |
| "if __name__ == '__main__':\n", |
| " my_func()\" > profile_memory.py\n", |
| "\n", |
| "python -m memory_profiler profile_memory.py 2> /dev/null" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "98340ba5-abe0-4f92-ae48-26b63c6f5811", |
| "metadata": {}, |
| "source": [ |
| "It shows which line consumes how much memory properly." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "7f01c836", |
| "metadata": {}, |
| "source": [ |
| "#### Python/Pandas UDF" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "cc3e1ccc", |
| "metadata": {}, |
| "source": [ |
| "<div class=\"alert alert-block alert-info\">\n", |
| "<b>Note:</b> This section applies to Spark 4.0\n", |
| "</div>" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "0b2926cf-df02-42c8-98fa-5822849e901f", |
| "metadata": {}, |
| "source": [ |
| "PySpark provides remote [memory_profiler](https://github.com/pythonprofilers/memory_profiler)\n", |
| "for Python/Pandas UDFs. That can be used on editors with line numbers such as\n", |
| "Jupyter notebooks. SparkSession-based memory profiler can be enabled by setting\n", |
| "the runtime SQL configuration `spark.sql.pyspark.udf.profiler` to `memory`:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 11, |
| "id": "553d9780-b30b-4e96-a134-ca1c06341c89", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "============================================================\n", |
| "Profile of UDF<id=16>\n", |
| "============================================================\n", |
| "Filename: /var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/885006762.py\n", |
| "\n", |
| "Line # Mem usage Increment Occurrences Line Contents\n", |
| "=============================================================\n", |
| " 5 1472.6 MiB 1472.6 MiB 10 @pandas_udf(\"long\")\n", |
| " 6 def add1(x):\n", |
| " 7 1473.9 MiB 1.3 MiB 10 return x + 1\n", |
| "\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "from pyspark.sql.functions import pandas_udf\n", |
| "\n", |
| "df = spark.range(10)\n", |
| "\n", |
| "@pandas_udf(\"long\")\n", |
| "def add1(x):\n", |
| " return x + 1\n", |
| "\n", |
| "spark.conf.set(\"spark.sql.pyspark.udf.profiler\", \"memory\")\n", |
| "\n", |
| "added = df.select(add1(\"id\"))\n", |
| "spark.profile.clear()\n", |
| "added.collect()\n", |
| "spark.profile.show(type=\"memory\")" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "abaf0439-43de-4482-8a80-258be3d98366", |
| "metadata": {}, |
| "source": [ |
| "The UDF IDs (e.g., 16) can be seen in the query plan, for example, `add1(...)#16L` in\n", |
| "`ArrowEvalPython` as shown below." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 12, |
| "id": "607f9f0c-6288-4bd2-99b8-2cc7e62098ff", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "== Physical Plan ==\n", |
| "*(2) Project [pythonUDF0#19L AS add1(id)#17L]\n", |
| "+- ArrowEvalPython [add1(id#14L)#16L], [pythonUDF0#19L], 200\n", |
| " +- *(1) Range (0, 10, step=1, splits=16)\n", |
| "\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "added.explain()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "06e40cdc", |
| "metadata": {}, |
| "source": [ |
| "### Performance Profiler" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "7909aba0", |
| "metadata": {}, |
| "source": [ |
| "<div class=\"alert alert-block alert-info\">\n", |
| "<b>Note:</b> This section applies to Spark 4.0\n", |
| "</div>" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "e66c991c-1b07-45d0-aae2-c79c362e2210", |
| "metadata": {}, |
| "source": [ |
| "[Python Profilers](https://docs.python.org/3/library/profile.html) are useful built-in\n", |
| "features in Python itself for profiling performance. To use this on driver side, you can use it as you would\n", |
| "do for regular Python programs because PySpark on driver side is a regular Python\n", |
| "process unless you are running your driver program in another machine\n", |
| "(e.g., YARN cluster mode)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 13, |
| "id": "8d9ada24-81da-4c31-aaa2-b3578953b07b", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| " 549275 function calls (536745 primitive calls) in 3.447 seconds\n", |
| "\n", |
| " Ordered by: cumulative time\n", |
| "\n", |
| " ncalls tottime percall cumtime percall filename:lineno(function)\n", |
| " 2 0.000 0.000 3.448 1.724 app.py:1(<module>)\n", |
| " 792/1 0.005 0.000 3.447 3.447 {built-in method builtins.exec}\n", |
| " 128 0.000 0.000 2.104 0.016 socket.py:692(readinto)\n", |
| " 128 2.104 0.016 2.104 0.016 {method 'recv_into' of '_socket.socket' objects}\n", |
| " 124 0.000 0.000 2.100 0.017 java_gateway.py:1015(send_command)\n", |
| " 125 0.001 0.000 2.099 0.017 clientserver.py:499(send_command)\n", |
| " 138 0.000 0.000 2.097 0.015 {method 'readline' of '_io.BufferedReader' objects}\n", |
| " 55 0.000 0.000 1.622 0.029 java_gateway.py:1313(__call__)\n", |
| " 95 0.001 0.000 1.360 0.014 __init__.py:1(<module>)\n", |
| " 1 0.000 0.000 1.359 1.359 session.py:438(getOrCreate)\n", |
| " 1 0.000 0.000 1.311 1.311 context.py:491(getOrCreate)\n", |
| " 1 0.000 0.000 1.311 1.311 context.py:169(__init__)\n", |
| " 1 0.000 0.000 0.861 0.861 context.py:424(_ensure_initialized)\n", |
| " 1 0.001 0.001 0.861 0.861 java_gateway.py:39(launch_gateway)\n", |
| " 8 0.840 0.105 0.840 0.105 {built-in method time.sleep}\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%%bash\n", |
| "\n", |
| "echo \"from pyspark.sql import SparkSession\n", |
| "spark = SparkSession.builder.getOrCreate()\n", |
| "spark.range(10).collect()\" > app.py\n", |
| "\n", |
| "python -m cProfile -s cumulative app.py 2> /dev/null | head -n 20" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "3e5ae42d", |
| "metadata": {}, |
| "source": [ |
| "#### Python/Pandas UDF" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "eb137611", |
| "metadata": {}, |
| "source": [ |
| "<div class=\"alert alert-block alert-info\">\n", |
| "<b>Note:</b> This section applies to Spark 4.0\n", |
| "</div>" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "c9b9a26c-56fa-4a14-adfd-2697a87c479e", |
| "metadata": {}, |
| "source": [ |
| "PySpark provides remote Python Profilers for Python/Pandas UDFs. UDFs with\n", |
| "iterators as inputs/outputs are not supported. SparkSession-based performance\n", |
| "profiler can be enabled by setting the runtime SQL configuration\n", |
| "`spark.sql.pyspark.udf.profiler` to `perf`. An example is as shown below." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 14, |
| "id": "fcba873d-7a4f-42f9-b796-3c492c7e8077", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "============================================================\n", |
| "Profile of UDF<id=22>\n", |
| "============================================================\n", |
| " 2130 function calls (2080 primitive calls) in 0.003 seconds\n", |
| "\n", |
| " Ordered by: internal time, cumulative time\n", |
| "\n", |
| " ncalls tottime percall cumtime percall filename:lineno(function)\n", |
| " 10 0.001 0.000 0.003 0.000 common.py:62(new_method)\n", |
| " 10 0.000 0.000 0.000 0.000 {built-in method _operator.add}\n", |
| " 10 0.000 0.000 0.002 0.000 base.py:1371(_arith_method)\n", |
| " 10 0.000 0.000 0.001 0.000 series.py:389(__init__)\n", |
| " 20 0.000 0.000 0.000 0.000 _ufunc_config.py:33(seterr)\n", |
| " 10 0.000 0.000 0.001 0.000 series.py:6201(_construct_result)\n", |
| " 10 0.000 0.000 0.000 0.000 cast.py:1605(maybe_cast_to_integer_array)\n", |
| " 10 0.000 0.000 0.000 0.000 construction.py:517(sanitize_array)\n", |
| " 10 0.000 0.000 0.002 0.000 series.py:6133(_arith_method)\n", |
| " 10 0.000 0.000 0.000 0.000 managers.py:1863(from_array)\n", |
| " 10 0.000 0.000 0.000 0.000 array_ops.py:240(arithmetic_op)\n", |
| " 510 0.000 0.000 0.000 0.000 {built-in method builtins.isinstance}\n" |
| ] |
| } |
| ], |
| "source": [ |
| "import io\n", |
| "from contextlib import redirect_stdout\n", |
| "\n", |
| "from pyspark.sql.functions import pandas_udf\n", |
| "\n", |
| "df = spark.range(10)\n", |
| "@pandas_udf(\"long\")\n", |
| "def add1(x):\n", |
| " return x + 1\n", |
| "\n", |
| "added = df.select(add1(\"id\"))\n", |
| "\n", |
| "spark.conf.set(\"spark.sql.pyspark.udf.profiler\", \"perf\")\n", |
| "spark.profile.clear()\n", |
| "added.collect()\n", |
| "\n", |
| "# Only show top 10 lines\n", |
| "output = io.StringIO()\n", |
| "with redirect_stdout(output):\n", |
| " spark.profile.show(type=\"perf\")\n", |
| "\n", |
| "print(\"\\n\".join(output.getvalue().split(\"\\n\")[0:20]))" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "b35fb6c4-c074-4866-abd6-b1b435721b67", |
| "metadata": {}, |
| "source": [ |
| "The UDF IDs (e.g., 22) can be seen in the query plan, for example, `add1(...)#22L` in\n", |
| "`ArrowEvalPython` below." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 15, |
| "id": "de015526-577b-45ea-a6e5-598cf215ef8b", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "== Physical Plan ==\n", |
| "*(2) Project [pythonUDF0#25L AS add1(id)#23L]\n", |
| "+- ArrowEvalPython [add1(id#20L)#22L], [pythonUDF0#25L], 200\n", |
| " +- *(1) Range (0, 10, step=1, splits=16)\n", |
| "\n", |
| "\n" |
| ] |
| } |
| ], |
| "source": [ |
| "added.explain()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "fe046e5e-73bb-466f-a373-9d5a445b0fa1", |
| "metadata": {}, |
| "source": [ |
| "We can render the result with a preregistered renderer as shown below." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 16, |
| "id": "e2507dc0-ab64-4afe-ae8a-19c52533e57c", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "spark.profile.render(id=2, type=\"perf\") # renderer=\"flameprof\" by default" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "ba892df0-4058-46ad-a952-791559da5259", |
| "metadata": {}, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "09b420ba", |
| "metadata": {}, |
| "source": [ |
| "## Disply Stacktraces" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "9756e41b", |
| "metadata": {}, |
| "source": [ |
| "<div class=\"alert alert-block alert-info\">\n", |
| "<b>Note:</b> This section applies to Spark 4.0\n", |
| "</div>" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "c7bf0b21-f9a8-4cc0-8288-46f7ef4f4f52", |
| "metadata": {}, |
| "source": [ |
| "By default, JVM stacktraces and Python internal tracebacks are hidden especially\n", |
| "in Python UDF executions. For example," |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 17, |
| "id": "bafce04e-5d7c-40e0-9342-0ffb94c858c7", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "ename": "PythonException", |
| "evalue": "\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n", |
| "output_type": "error", |
| "traceback": [ |
| "\u001b[0;31mPythonException\u001b[0m: \n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n" |
| ] |
| } |
| ], |
| "source": [ |
| "from pyspark.sql.functions import udf\n", |
| "\n", |
| "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "78100070-b9db-4efd-ad94-a8e6a00ee68a", |
| "metadata": {}, |
| "source": [ |
| "\n", |
| "To show the whole internal stacktraces, users can enable\n", |
| "`spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled` and `spark.sql.pyspark.jvmStacktrace.enabled`\n", |
| "respectively.\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 18, |
| "id": "425b24b8-acd0-4897-bc8c-75af6316f430", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "ename": "PythonException", |
| "evalue": "\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1898, in main\n process()\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1890, in process\n serializer.dump_stream(out_iter, outfile)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 224, in dump_stream\n self.serializer.dump_stream(self._batched(iterator), stream)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 145, in dump_stream\n for obj in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 213, in _batched\n for item in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in mapper\n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in <genexpr>\n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 114, in <lambda>\n return args_kwargs_offsets, lambda *a: func(*a)\n ^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/util.py\", line 145, in wrapper\n return f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 739, in profiling_func\n ret = f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n", |
| "output_type": "error", |
| "traceback": [ |
| "\u001b[0;31mPythonException\u001b[0m: \n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1898, in main\n process()\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1890, in process\n serializer.dump_stream(out_iter, outfile)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 224, in dump_stream\n self.serializer.dump_stream(self._batched(iterator), stream)\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 145, in dump_stream\n for obj in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/serializers.py\", line 213, in _batched\n for item in iterator:\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in mapper\n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 1798, in <genexpr>\n result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 114, in <lambda>\n return args_kwargs_offsets, lambda *a: func(*a)\n ^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/util.py\", line 145, in wrapper\n return f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/.../python/lib/pyspark.zip/pyspark/worker.py\", line 739, in profiling_func\n ret = f(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n" |
| ] |
| } |
| ], |
| "source": [ |
| "spark.conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", False)\n", |
| "spark.conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", False)\n", |
| "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 19, |
| "id": "5ec62d86-3631-4f48-a10c-4bcd727b1eb6", |
| "metadata": {}, |
| "outputs": [ |
| { |
| "ename": "PythonException", |
| "evalue": "\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\nJVM stacktrace:\norg.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:338)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2887)\n\tat scala.Option.getOrElse(Option.scala:201)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2887)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2879)\n\tat scala.collection.immutable.List.foreach(List.scala:334)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2879)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1283)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1283)\n\tat scala.Option.foreach(Option.scala:437)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1283)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3158)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3092)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3081)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:417)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1056)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4265)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:155)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:222)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4262)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:338)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t... 1 more\n", |
| "output_type": "error", |
| "traceback": [ |
| "\u001b[0;31mPythonException\u001b[0m: \n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\nJVM stacktrace:\norg.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py\", line 3, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)\n\tat org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)\n\t...\n" |
| ] |
| } |
| ], |
| "source": [ |
| "spark.conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", True)\n", |
| "spark.conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", True)\n", |
| "spark.range(1).select(udf(lambda x: x / 0)(\"id\")).collect()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "59bab886-7a57-4736-89b1-e3776b3b991e", |
| "metadata": {}, |
| "source": [ |
| "See also [Stack Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces) for more details." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "0611287d-cb34-457e-9bc3-f5629ddea484", |
| "metadata": {}, |
| "source": [ |
| "\n", |
| "## IDE Debugging" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "id": "80f080c4", |
| "metadata": {}, |
| "source": [ |
| "On the driver side, no additional steps are needed to use IDE for debugging your PySpark application. Refer to the guide below:\n", |
| "\n", |
| "- [Setting up IDEs](https://spark.apache.org/docs/latest/api/python/development/setting_ide.html)\n", |
| "\n", |
| "On the executor side, it requires several steps to set up the remote debugger. Refer to the guide below:\n", |
| "\n", |
| "- [Remote Debugging (PyCharm Professional)](https://spark.apache.org/docs/latest/api/python/development/debugging.html#remote-debugging-pycharm-professional)." |
| ] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 3 (ipykernel)", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.11.9" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 5 |
| } |