| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# HTML Render" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%html\n", |
| "<!-- Silencing error messages in the notebook -->\n", |
| "<style>\n", |
| "div.output_stderr {\n", |
| "background: #ffdd;\n", |
| "display: none;\n", |
| "}\n", |
| "</style>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%html\n", |
| "<!-- Making the cells take up the full width of the window -->\n", |
| "<style>\n", |
| ".container { width:100% !important; }\n", |
| "</style>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%html\n", |
| "<!-- Changing the font of code cells -->\n", |
| "<style>\n", |
| ".CodeMirror{font-family: \"Courier New\";font-size: 12pt;}\n", |
| "</style>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%html\n", |
| "<!-- Changing the size of tables to 20px -->\n", |
| "<style>\n", |
| ".rendered_html table, .rendered_html td, .rendered_html th {font-size: 20px;}\n", |
| "</style>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# System Settings" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import os\n", |
| "from pathlib import Path\n", |
| "home = os.path.realpath(str(Path.home()))\n", |
| "cwd = os.getcwd()\n", |
| "print(f'home: {home}')\n", |
| "print(f'cwd: {cwd}')" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import os\n", |
| "import pandas as pd\n", |
| "\n", |
| "pd.set_option('display.max_rows', None)\n", |
| "\n", |
| "# Convert the os.environ object to a dictionary and then to a DataFrame\n", |
| "env_df = pd.DataFrame(list(dict(os.environ).items()), columns=['Environment Variable', 'Value'])\n", |
| "\n", |
| "# Display the DataFrame\n", |
| "from IPython.display import display\n", |
| "\n", |
| "display(env_df)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import socket\n", |
| "localhost=socket.gethostname()\n", |
| "local_ip=socket.gethostbyname(localhost)\n", |
| "\n", |
| "print(f'localhost: {localhost}')\n", |
| "print(f'ip: {local_ip}')" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "spark_version=!head -n1 $SPARK_HOME/RELEASE | awk '{print $2}'\n", |
| "spark_version = spark_version[0]\n", |
| "\n", |
| "print(f\"Spark version from SPARK_HOME: {spark_version}\")\n", |
| "spark_version_short=''.join(spark_version.split('.'))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import logging\n", |
| "import sys\n", |
| "\n", |
| "logging.basicConfig(format='%(levelname)s : %(message)s', level=logging.ERROR, stream=sys.stdout)\n", |
| "logger = logging.getLogger()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import os\n", |
| "\n", |
| "hdfs_event_dir=''\n", |
| "local_event_dir=''\n", |
| "\n", |
| "def get_spark_eventlog_dir(path):\n", |
| " eventlog_dir = None\n", |
| " eventlog_enabled = False\n", |
| " try:\n", |
| " with open(path, 'r') as f:\n", |
| " for line in f:\n", |
| " if line.startswith('spark.eventLog.dir'):\n", |
| " eventlog_dir = line.split(' ')[-1].strip()\n", |
| " elif line.startswith('spark.eventLog.enabled'):\n", |
| " eventlog_enabled = line.split(' ')[-1].strip().lower() == 'true'\n", |
| " except FileNotFoundError:\n", |
| " raise SystemExit(f\"'spark-defaults.conf' not found: {path}\")\n", |
| " if not eventlog_enabled:\n", |
| " raise SystemExit(\"'spark.eventLog.enabled' must be enabled.\")\n", |
| " return eventlog_dir\n", |
| "\n", |
| "spark_defaults_conf = None\n", |
| "\n", |
| "if 'SPARK_CONF_DIR' in os.environ:\n", |
| " spark_defaults_conf = os.path.join(os.environ['SPARK_CONF_DIR'], 'spark-defaults.conf')\n", |
| "elif 'SPARK_HOME' in os.environ:\n", |
| " spark_defaults_conf = os.path.join(os.environ['SPARK_HOME'], 'conf', 'spark-defaults.conf')\n", |
| "\n", |
| "if spark_defaults_conf:\n", |
| " event_log_dir = get_spark_eventlog_dir(spark_defaults_conf)\n", |
| " if event_log_dir:\n", |
| " print(f\"spark.eventLog.dir: {event_log_dir}\")\n", |
| " if event_log_dir[:7] == 'hdfs://':\n", |
| " hdfs_event_dir = event_log_dir\n", |
| " elif event_log_dir[:6] == 'file:/':\n", |
| " local_event_dir = event_log_dir[6:]\n", |
| " else:\n", |
| " raise SystemExit(f\"'spark.eventLog.dir' is not configured in {spark_defaults_conf}\")\n", |
| "else:\n", |
| " raise SystemExit(\"Cannot get `spark.eventLog.dir`. Neither SPARK_CONF_DIR nor SPARK_HOME defined in envrionment variables.\")\n", |
| " " |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Monitor" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import findspark\n", |
| "import os\n", |
| "\n", |
| "findspark.init(os.environ['SPARK_HOME'])\n", |
| "os.environ.setdefault('SPARK_SUBMIT_OPTS', '-Dscala.usejavacp=true')" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import warnings\n", |
| "warnings.filterwarnings('ignore')\n", |
| "\n", |
| "import atexit\n", |
| "import collections\n", |
| "import gzip\n", |
| "import importlib\n", |
| "import json\n", |
| "import logging\n", |
| "import math\n", |
| "import os\n", |
| "import pathlib\n", |
| "import shutil\n", |
| "import signal\n", |
| "import subprocess\n", |
| "import tempfile\n", |
| "import threading\n", |
| "import time\n", |
| "import timeit\n", |
| "import traceback\n", |
| "\n", |
| "import matplotlib\n", |
| "import matplotlib.colors as colors\n", |
| "import matplotlib.pyplot as plt\n", |
| "import matplotlib.ticker as mtick\n", |
| "import numpy as np\n", |
| "import pandas as pd\n", |
| "import platform\n", |
| "import pyspark\n", |
| "import pyspark.sql.functions as F\n", |
| "import pyspark.sql.types as T\n", |
| "import spylon_kernel\n", |
| "from collections import namedtuple\n", |
| "from concurrent.futures import ThreadPoolExecutor\n", |
| "from datetime import date, datetime\n", |
| "from functools import reduce\n", |
| "from IPython.display import display, HTML\n", |
| "from matplotlib import rcParams\n", |
| "from pyspark import SparkConf, SparkContext\n", |
| "from pyspark.ml import Pipeline\n", |
| "from pyspark.ml.feature import StringIndexer, VectorAssembler\n", |
| "from pyspark.sql import SparkSession, SQLContext, Window\n", |
| "from pyspark.sql.functions import col, floor, lit, rank, to_date\n", |
| "from pyspark.sql.types import (DoubleType, FloatType, IntegerType,\n", |
| " StringType, StructField, StructType,\n", |
| " TimestampType)\n", |
| "\n", |
| "from spylon_kernel import register_ipython_magics\n", |
| "from spylon.spark.utils import SparkJVMHelpers\n", |
| "\n", |
| "register_ipython_magics()\n", |
| "\n", |
| "rcParams['font.sans-serif'] = 'Courier New'\n", |
| "rcParams['font.family'] = 'Courier New'\n", |
| "rcParams['font.size'] = '12'\n", |
| "\n", |
| "%matplotlib inline\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "code_folding": [] |
| }, |
| "outputs": [], |
| "source": [ |
| "import socket\n", |
| "import os\n", |
| "import sys\n", |
| "import json\n", |
| "\n", |
| "def upload_profile(server, base_dir, appid):\n", |
| " local_profile_dir = os.path.join(home, 'profile')\n", |
| " !mkdir -p {local_profile_dir}\n", |
| " !(cd {local_profile_dir}; rm -f {appid}.tar.gz; tar zcvf {appid}.tar.gz {appid}) >/dev/null 2>&1\n", |
| " \n", |
| " server_local_dir=os.path.join('PAUS', base_dir)\n", |
| " server_local_profile_dir=os.path.join(server_local_dir, 'profile')\n", |
| " server_hdfs_dir=f'/{base_dir}/'\n", |
| "\n", |
| " !ssh {server} \"mkdir -p {server_local_profile_dir}\"\n", |
| " !ssh {server} \"cd {server_local_profile_dir} && rm {appid}.tar.gz >/dev/null 2>&1\"\n", |
| " !ssh {server} \"cd {server_local_profile_dir} && rm -r {appid} >/dev/null 2>&1\"\n", |
| " !scp {local_profile_dir}/{appid}.tar.gz {server}:{server_local_profile_dir}/\n", |
| " !ssh {server} \"cd {server_local_profile_dir} && tar zxf {appid}.tar.gz\"\n", |
| " !ssh {server} \"hdfs dfs -mkdir -p {server_hdfs_dir}\"\n", |
| " !ssh {server} \"hdfs dfs -rm -r {server_hdfs_dir}{appid} >/dev/null 2>&1\"\n", |
| " !ssh {server} \"hdfs dfs -put {server_local_profile_dir}/{appid} {server_hdfs_dir}\"\n", |
| " !ssh {server} \"cd {server_local_profile_dir}; rm {appid}.tar.gz; rm -r {appid}\"\n", |
| "\n", |
| "def killsar(clients):\n", |
| " for l in clients:\n", |
| " out=!ssh $l \"ps aux | grep -w sar | grep -v grep | tr -s ' ' | cut -d' ' -f2\"\n", |
| " for x in out:\n", |
| " !ssh $l \"kill $x > /dev/null 2>&1\"\n", |
| " for l in clients:\n", |
| " out=!ssh $l \"ps aux | grep -w pidstat | grep -v grep | tr -s ' ' | cut -d' ' -f2\"\n", |
| " for x in out:\n", |
| " !ssh $l \"kill $x > /dev/null 2>&1\"\n", |
| " for l in clients:\n", |
| " out=!ssh $l \"ps aux | grep -w perf | grep -v grep | tr -s ' ' | cut -d' ' -f2\"\n", |
| " for x in out:\n", |
| " !ssh root@$l \"kill $x > /dev/null 2>&1\"\n", |
| " for l in clients:\n", |
| " !ssh $l \"emon -stop > /dev/null 2>&1\"\n", |
| "\n", |
| "def killnumactl(clients):\n", |
| " for l in clients:\n", |
| " out =!ssh $l \"ps aux | grep numactl | grep bash | tr -s ' ' | cut -d' ' -f2\"\n", |
| " for x in out:\n", |
| " !ssh $l \"kill $x > /dev/null 2>&1\"\n", |
| "\n", |
| "def startmonitor(clients, appid, collect_emon, **kwargs):\n", |
| " local_profile_dir=os.path.join(home, 'profile')\n", |
| " prof=os.path.join(local_profile_dir, appid)\n", |
| " !mkdir -p {prof}\n", |
| " \n", |
| " for l in clients:\n", |
| " !ssh root@{l} date\n", |
| " \n", |
| " killsar(clients)\n", |
| " \n", |
| " if collect_emon:\n", |
| " !cp -f {emon_list} {home}/emon.list\n", |
| " for l in clients:\n", |
| " !scp {home}/emon.list {l}:{home}/emon.list > /dev/null 2>&1\n", |
| " \n", |
| " perfsyscalls=kwargs.get(\"collect_perf_syscall\",None)\n", |
| " \n", |
| " for l in clients:\n", |
| " prof_client=os.path.join(prof, l)\n", |
| " !mkdir -p {prof_client}\n", |
| " !ssh {l} mkdir -p {prof_client}\n", |
| " !ssh {l} \"sar -o {prof_client}/sar.bin -r -u -d -B -n DEV 1 >/dev/null 2>&1 &\"\n", |
| " !ssh root@{l} \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f 1 | xargs -I % bash -c '(cat /proc/%/status >> {prof_client}/%.stat; cat /proc/%/io >> {prof_client}/%.stat)'\"\n", |
| " if collect_emon:\n", |
| " !ssh {l} \"emon -i {home}/emon.list -f {prof_client}/emon.rst >/dev/null 2>&1 & \"\n", |
| " else:\n", |
| " !ssh root@{l} \"perf stat -e 'instructions,cycles,cpu_clk_unhalted.thread,cpu_clk_unhalted.ref_tsc' -a -I 500 -o {prof_client}/perfstat.txt >/dev/null 2>&1 & \"\n", |
| " !ssh {l} \"cat /sys/devices/system/cpu/cpu0/tsc_freq_khz | xargs -I% echo %000 > {prof_client}/tsc_freq 2>/dev/null &\"\n", |
| " !ssh {l} \"lscpu | grep '^CPU(s):' | cut -d ':' -f 2 | tr -d ' ' > {prof_client}/totalcores 2>/dev/null &\"\n", |
| " if kwargs.get(\"collect_pid\",False):\n", |
| " !ssh {l} \"jps | grep CoarseGrainedExecutorBackend | head -n 1 | cut -d' ' -f 1 | xargs -I % pidstat -h -t -p % 1 > {prof_client}/pidstat.out 2>/dev/null &\"\n", |
| " !ssh root@{l} 'cat /proc/uptime | cut -d\" \" -f 1 | xargs -I ^ date -d \"- ^ seconds\" +%s.%N' > $prof/$l/uptime.txt\n", |
| " if kwargs.get(\"collect_sched\",False):\n", |
| " !ssh root@{l} 'perf trace -e \"sched:sched_switch\" -C 8-15 -o {prof_client}/sched.txt -T -- sleep 10000 >/dev/null 2>/dev/null &'\n", |
| " if perfsyscalls is not None:\n", |
| " !ssh root@{l} \"perf stat -e 'syscalls:sys_exit_poll,syscalls:sys_exit_epoll_wait' -a -I 1000 -o {prof_client}/perfsyscalls.txt >/dev/null 2>&1 & \"\n", |
| " if kwargs.get(\"collect_hbm\",False):\n", |
| " hbm_nodes = kwargs.get(\"hbm_nodes\")\n", |
| " if hbm_nodes is not None:\n", |
| " print(\"collect_hbm\")\n", |
| " hbm_nodes = '\\|'.join([\"node \" + str(i) for i in hbm_nodes])\n", |
| " %env hbm_numa_nodes={hbm_nodes}\n", |
| " %env hbm_l = {l}\n", |
| " %env hbm_prof = {prof}\n", |
| " !ssh $hbm_l \"echo timestamp, size, free > $hbm_prof/$hbm_l/numactl.csv\"\n", |
| " !ssh $hbm_l \"while :; do echo \\$(numactl -H | grep '$hbm_numa_nodes' | grep 'size' | awk '{ print \\$4 }' | awk '{ s += \\$1 } END { print s }'), \\$(numactl -H | grep '$hbm_numa_nodes' | grep 'free' | awk '{ print \\$4 }' | awk '{ s += \\$1 } END { print s }') | ts '%Y-%m-%d %H:%M:%S,' >> $hbm_prof/$hbm_l/numactl.csv; sleep 1; done >/dev/null 2>&1 &\"\n", |
| " else:\n", |
| " print(\"Missing argument: hbm_nodes. e.g. hbm_nodes = list(range(8,16))\")\n", |
| "\n", |
| "def stopmonitor(clients, sc, appid, result, collect_emon, **kwargs):\n", |
| " %cd ~\n", |
| " \n", |
| " local_profile_dir=os.path.join(home, 'profile')\n", |
| " prof=os.path.join(local_profile_dir, appid)\n", |
| " !mkdir -p {prof}\n", |
| "\n", |
| " killsar(clients)\n", |
| " killnumactl(clients)\n", |
| " \n", |
| " for l in clients:\n", |
| " prof_client=os.path.join(prof, l)\n", |
| " !ssh {l} \"sar -f {prof_client}/sar.bin -r > {prof_client}/sar_mem.sar;sar -f {prof_client}/sar.bin -u > {prof_client}/sar_cpu.sar;sar -f {prof_client}/sar.bin -d -p > {prof_client}/sar_disk.sar;sar -f {prof_client}/sar.bin -n DEV > {prof_client}/sar_nic.sar;sar -f {prof_client}/sar.bin -B > {prof_client}/sar_page.sar;\" \n", |
| " !ssh root@{l} \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f 1 | xargs -I % bash -c '(cat /proc/%/status >> {prof_client}/%.stat; cat /proc/%/io >> {prof_client}/%.stat)'\"\n", |
| " if collect_emon:\n", |
| " !ssh {l} \"source ~/sep_install/sep_vars.sh>/dev/null 2>&1; emon -v \" > {prof}/{l}/emonv.txt\n", |
| " !ssh {l} \"sar -V \" > {prof_client}/sarv.txt\n", |
| " !ssh {l} \"test -f {prof_client}/perfstat.txt && head -n 1 {prof_client}/perfstat.txt > {prof_client}/perfstarttime\"\n", |
| " if l!= socket.gethostname():\n", |
| " !scp -r {l}:{prof_client} {prof}/ > /dev/null 2>&1\n", |
| " \n", |
| " if sc is not None:\n", |
| " sc.stop()\n", |
| " \n", |
| " !git --git-dir=\"{gluten_home}/.git\" log --format=\"commit %H%nAuthor: %an <%ae>%nDate: %cs%n %n %s %n\" --since=`date --date='2 days ago' +'%m/%d/%Y'` > {prof}/changelog_gluten\n", |
| " !git --git-dir=\"{gluten_home}/ep/build-velox/build/velox_ep/.git\" log --format=\"commit %H%nAuthor: %an <%ae>%nDate: %cs%n %n %s %n\" --since=`date --date='2 days ago' +'%m/%d/%Y'` > {prof}/changelog_velox\n", |
| " \n", |
| " with open(f\"{prof}/starttime\",\"w\") as f:\n", |
| " f.write(\"{:d}\".format(int(time.time()*1000)))\n", |
| " \n", |
| " with open(f'{prof}/query_time.json', 'w') as f:\n", |
| " json.dump(result, f)\n", |
| "\n", |
| " if hdfs_event_dir != '':\n", |
| " !hadoop fs -copyToLocal {hdfs_event_dir}/{appid} {prof}/app.log\n", |
| " elif local_event_dir != '':\n", |
| " !cp {local_event_dir}/{appid} {prof}/app.log" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def pinexecutor_numa(clients):\n", |
| " cpunum = !ssh {clients[0]} \"grep 'processor' /proc/cpuinfo | wc -l\"\n", |
| " cpunum = int(cpunum[0])\n", |
| " \n", |
| " numanodes=!ssh {clients[0]} \"cat /sys/devices/system/node/node*/cpulist\"\n", |
| " numanodes = list(filter(lambda x: x != '', numanodes))\n", |
| " print(numanodes)\n", |
| " for client in clients:\n", |
| " pids=!ssh {client} \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1\"\n", |
| " print(client,\":\",len(pids),\" \",\"\\t\".join(map(str,pids)))\n", |
| " \n", |
| " cpunum_c = !ssh {client} \"grep 'processor' /proc/cpuinfo | wc -l\"\n", |
| " cpunum_c = int(cpunum_c[0])\n", |
| " if cpunum_c != cpunum:\n", |
| " print(f\"client {client} cpunum not match!\")\n", |
| " return\n", |
| " numanodes_c=!ssh {client} \"cat /sys/devices/system/node/node*/cpulist\"\n", |
| " numanodes_c = list(filter(lambda x: x != '', numanodes))\n", |
| " time.sleep(1)\n", |
| " print(numanodes_c)\n", |
| " if numanodes_c != numanodes:\n", |
| " print(f\"client {client} numanodes not match!\")\n", |
| " return\n", |
| " \n", |
| " idx = 0\n", |
| " nodes=len(numanodes)\n", |
| " for i in range(nodes):\n", |
| " cpus = numanodes[i]\n", |
| " for l in pids[idx:idx+int(len(pids)/nodes)]: # executors on 1 numanode\n", |
| " print(f\" {cpus} {l}\")\n", |
| " !ssh {client} \"taskset -a -p -c $cpus $l > /dev/null 2>&1 \"\n", |
| " idx += int(len(pids)/nodes)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def config_pagecache(clients, run_gluten=True):\n", |
| " for l in clients:\n", |
| " if run_gluten:\n", |
| " !ssh root@$l \"echo 80 > /proc/sys/vm/dirty_ratio\"\n", |
| " !ssh root@$l \"echo 50 > /proc/sys/vm/dirty_background_ratio\"\n", |
| " !ssh root@$l \"echo 360000 > /proc/sys/vm/dirty_expire_centisecs\"\n", |
| " !ssh root@$l \"echo 3000 > /proc/sys/vm/dirty_writeback_centisecs\"\n", |
| "\n", |
| " else:\n", |
| " !ssh root@$l \"echo 10 > /proc/sys/vm/dirty_ratio\"\n", |
| " !ssh root@$l \"echo 20 > /proc/sys/vm/dirty_background_ratio\"\n", |
| " !ssh root@$l \"echo 3000 > /proc/sys/vm/dirty_expire_centisecs\"\n", |
| " !ssh root@$l \"echo 500 > /proc/sys/vm/dirty_writeback_centisecs\"" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def print_kernel_params(clietns):\n", |
| " params = {\n", |
| " 'transparent hugepage': '/sys/kernel/mm/transparent_hugepage/enabled',\n", |
| " 'auto numa balancing': '/proc/sys/kernel/numa_balancing',\n", |
| " 'scaling governor': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor',\n", |
| " 'scaling max freq': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_max_freq',\n", |
| " 'scaling cur freq': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_cur_freq',\n", |
| " 'power & perf policy': '/sys/devices/system/cpu/cpu*/power/energy_perf_bias',\n", |
| " 'dirty_ratio': '/proc/sys/vm/dirty_ratio',\n", |
| " 'dirty_background_ratio': '/proc/sys/vm/dirty_background_ratio',\n", |
| " 'dirty_expire_centisecs': '/proc/sys/vm/dirty_expire_centisecs',\n", |
| " 'dirty_writeback_centisecs': '/proc/sys/vm/dirty_writeback_centisecs'\n", |
| " }\n", |
| " for k, param in params.items():\n", |
| " print()\n", |
| " print(f'{k} ({param})')\n", |
| " for l in clients:\n", |
| " print(l + \": \", end='')\n", |
| " res = !ssh root@$l \"cat {param}\"\n", |
| " print(*res)\n", |
| " # print numactl\n", |
| " print()\n", |
| " print(\"numactl -H\")\n", |
| " for l in clients:\n", |
| " print(l + \":\")\n", |
| " res = !ssh $l \"numactl -H\"\n", |
| " print('\\n'.join(res))\n", |
| " # print memory freq\n", |
| " print()\n", |
| " print(\"Memory Frequency\")\n", |
| " for l in clients:\n", |
| " print(l + \":\")\n", |
| " res= !ssh root@$l \"dmidecode -t memory | grep Speed\"\n", |
| " print('\\n'.join(res))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "code_folding": [] |
| }, |
| "outputs": [], |
| "source": [ |
| "def dropcache(clients):\n", |
| " for l in clients:\n", |
| " !ssh root@$l \"sync && echo 3 > /proc/sys/vm/drop_caches; echo 1 >/proc/sys/vm/compact_memory; free -h\"" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def config_mem_cgroup(clients):\n", |
| " mem_cgroup = \"\"\"\n", |
| "CGROUP_ROOT=/sys/fs/cgroup/gluten\n", |
| "\n", |
| "if [ ! -d $CGROUP_ROOT ] ; then\n", |
| " sudo mkdir $CGROUP_ROOT\n", |
| " # enable memory for subtree\n", |
| " sudo bash -c \"echo '+memory' >> $CGROUP_ROOT/cgroup.subtree_control\"\n", |
| "fi\n", |
| "\n", |
| "# move each process to sub memory group\n", |
| "index=0\n", |
| "for pid in `jps | grep Coarse | awk '{print $1}'` ; do\n", |
| " target_cgroup=$CGROUP_ROOT/mem-${index}\n", |
| " if [ ! -d $target_cgroup ] ; then\n", |
| " sudo mkdir $target_cgroup\n", |
| " fi\n", |
| " proc_file=$target_cgroup/cgroup.procs\n", |
| " sudo bash -c \"echo $pid >> $proc_file\"\n", |
| " index=`expr $index + 1`\n", |
| "done\n", |
| " \"\"\"\n", |
| " with open(f'{home}/mem-cgroup.sh', 'w+') as f:\n", |
| " f.writelines(mem_cgroup)\n", |
| " for l in clients:\n", |
| " !scp {home}/mem-cgroup.sh {l}:{home}/ >/dev/null 2>&1\n", |
| " !ssh {l} \"bash {home}/mem-cgroup.sh >/dev/null 2>&1 &\"" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import pandas as pd\n", |
| "import matplotlib.pyplot as plt\n", |
| "import os\n", |
| "\n", |
| "\n", |
| "from IPython.display import display, HTML\n", |
| "\n", |
| "def get_io_stats(appid, client):\n", |
| " file_path = os.path.join(home,'profile',appid,client)\n", |
| " statf = [f for f in os.listdir(file_path) if f.endswith('.stat')]\n", |
| " statmap=[]\n", |
| " for f in statf:\n", |
| " statmap.append({'pid':f[:-len(\".stat\")]})\n", |
| " with open(os.path.join(file_path, f),\"r\") as fi:\n", |
| " cnts=fi.readlines()\n", |
| " for l in cnts:\n", |
| " for fld in ['rchar','wchar','syscr','syscw','read_bytes','write_bytes','cancelled_write_bytes']:\n", |
| " if l.startswith(fld):\n", |
| " if not fld in statmap[-1]:\n", |
| " statmap[-1][fld]=int(l.split(\" \")[-1].strip())\n", |
| " else:\n", |
| " statmap[-1][fld]=(int(l.split(\" \")[-1].strip())-statmap[-1][fld])/1024/1024/1024\n", |
| "\n", |
| " df = pd.DataFrame(statmap).drop('pid', axis=1).sum().to_frame()\n", |
| " df.columns = ['sum']\n", |
| " return df\n", |
| "\n", |
| "# Preprocess 'time' column\n", |
| "def process_time(dataframes):\n", |
| " for df in dataframes:\n", |
| " df.columns=['time']+list(df.columns[1:])\n", |
| " df = df[df.time != 'Average:']\n", |
| " df['time'] = pd.to_datetime(df['time'], format='%H:%M:%S').dt.time\n", |
| " df['time'] = df['time'].apply(lambda dt: dt.hour*3600 + dt.minute*60 + dt.second)\n", |
| "\n", |
| " offset = 12 * 3600 # half-day seconds\n", |
| " for i in range(1, len(df)):\n", |
| " if df['time'].iloc[i] < df['time'].iloc[i-1]: # Detect AM->PM or PM->AM\n", |
| " for j in range(i, len(df)): # Apply offset until end\n", |
| " df['time'].iloc[j] += offset\n", |
| "\n", |
| " df['time'] = df['time'].astype(int)\n", |
| " yield df\n", |
| "\n", |
| "def draw_sar(appid, qtime=None, disk_dev=None, nic_dev=None, client=None):\n", |
| " if client is None:\n", |
| " client = clients[0]\n", |
| "\n", |
| " display(HTML('<font size=6pt color=red>{:s}</font>'.format(client)))\n", |
| "\n", |
| " display(get_io_stats(appid, client))\n", |
| "\n", |
| " # Read data\n", |
| " profile_dir = os.path.join(home,'profile',appid,client)\n", |
| " datafiles = [os.path.join(profile_dir, datafile) for datafile in ['sar_cpu.sar', 'sar_mem.sar', 'sar_disk.sar', 'sar_nic.sar', 'sar_page.sar']]\n", |
| " dataframes = [pd.read_csv(datafile, header=1, delim_whitespace=True, parse_dates=True) for datafile in datafiles]\n", |
| " \n", |
| " num_figs=5\n", |
| " fig, axs=plt.subplots(num_figs,1,sharex=True,figsize=(30,5*4))\n", |
| "\n", |
| " [cpu_df, mem_df, disk_df, nic_df, page_df] = process_time(dataframes)\n", |
| "\n", |
| " # CPU usage\n", |
| " cpu_df['total'] = cpu_df['%user'] + cpu_df['%system'] + cpu_df['%iowait']\n", |
| "\n", |
| " starttime = cpu_df[cpu_df['total'] > 50]['time'].min() - 1\n", |
| " cpu_df['time'] -= starttime\n", |
| "\n", |
| " axs[4].stackplot(cpu_df['time'], cpu_df['%user'], cpu_df['%system'], cpu_df['%iowait'], labels=['user','system','iowait'])\n", |
| " axs[4].legend(loc='upper left')\n", |
| "\n", |
| " # Memory usage\n", |
| " mem_df['dirty_cached'] = mem_df['kbdirty'] * mem_df['%memused'] / mem_df['kbmemused']\n", |
| " mem_df['clean_cached'] = (mem_df['kbcached'] - mem_df['kbdirty']) * mem_df['%memused'] / mem_df['kbmemused']\n", |
| " mem_df['used'] = mem_df['kbmemused'] * mem_df['%memused'] / mem_df['kbmemused']\n", |
| "# mem_df['used'] = (mem_df['kbmemused'] - mem_df['kbbuffers'] - mem_df['kbcached'])* mem_df['%memused'] / mem_df['kbmemused']\n", |
| "\n", |
| " mem_df['time'] -= starttime\n", |
| "\n", |
| " axs[0].stackplot(mem_df['time'], mem_df['used'], mem_df['clean_cached'], mem_df['dirty_cached'], labels=['used','clean cached','dirty cached'])\n", |
| " axs[0].legend(loc='upper left')\n", |
| " axs[0].grid(axis = 'y')\n", |
| "\n", |
| " # Disk usage\n", |
| " if disk_dev is not None:\n", |
| " disk_df = disk_df[disk_df['DEV'].isin(disk_dev)]\n", |
| " disk_df['rkB/s'] = disk_df['rkB/s'].astype(float)\n", |
| " disk_df['wkB/s'] = disk_df['wkB/s'].astype(float)\n", |
| " disk_df['%util'] = disk_df['%util'].astype(float)\n", |
| "\n", |
| "\n", |
| " disk_df = disk_df.groupby('time').agg({'rkB/s': 'sum', 'wkB/s': 'sum', '%util':'mean'}).reset_index()\n", |
| " disk_df['read'] = disk_df['rkB/s'] / 1024\n", |
| " disk_df['write'] = disk_df['wkB/s'] / 1024\n", |
| "\n", |
| " disk_df['time'] -= starttime\n", |
| "\n", |
| " axs[1].stackplot(disk_df['time'], disk_df['read'], disk_df['write'], labels=['read MB/s','write MB/s'])\n", |
| " axs[1].grid(axis = 'y')\n", |
| "\n", |
| " ax2 = axs[1].twinx()\n", |
| "\n", |
| " ax2.plot(disk_df['time'], disk_df['%util'],'g-')\n", |
| " axs[1].legend(loc='upper left')\n", |
| "\n", |
| " \n", |
| " # Nic usage\n", |
| " if nic_dev is not None:\n", |
| " nic_df = nic_df[nic_df['IFACE'].isin(nic_dev)]\n", |
| " nic_df['rxkB/s'] = nic_df['rxkB/s'].astype(float)\n", |
| " nic_df['txkB/s'] = nic_df['txkB/s'].astype(float)\n", |
| " \n", |
| " nic_df = nic_df.groupby('time').agg({'rxkB/s': 'sum', 'txkB/s': \"sum\"}).reset_index()\n", |
| " nic_df['rx'] = nic_df['rxkB/s'] / 1024\n", |
| " nic_df['tx'] = nic_df['txkB/s'] / 1024\n", |
| " \n", |
| " nic_df['time'] -= starttime\n", |
| " \n", |
| " axs[2].stackplot(nic_df['time'], nic_df['rx'], nic_df['tx'], labels=['rx MB/s','tx MB/s'])\n", |
| " axs[2].legend(loc='upper left')\n", |
| " axs[2].grid(axis = 'y')\n", |
| "\n", |
| " # Pagefaults\n", |
| " page_df['minflt/s'] = page_df['fault/s'] - page_df['majflt/s']\n", |
| " \n", |
| " page_df['time'] -= starttime\n", |
| "\n", |
| " axs[3].stackplot(page_df['time'], page_df['minflt/s'], page_df['majflt/s'], labels=['minor_fault/s','major_fault/s'])\n", |
| " axs[3].legend(loc='upper left')\n", |
| " axs[3].grid(axis = 'y')\n", |
| "\n", |
| " # Add vertical lines and text for qtime, and calculate per query cpu%\n", |
| " if qtime is not None:\n", |
| " for ax in axs:\n", |
| " x = 0\n", |
| " ax.axvline(x = x, color = 'b')\n", |
| " for k, v in qtime.items():\n", |
| " x += v\n", |
| " ax.axvline(x = x, color = 'b')\n", |
| "\n", |
| " tx = 0\n", |
| " for k, v in qtime.items():\n", |
| " if v / x > 15 / 772:\n", |
| " ax.text(tx + v / 2 - 6 * x / 772, ax.get_ylim()[1] * 1.05, k)\n", |
| " tx += v\n", |
| "\n", |
| " x = 0\n", |
| " qtime_se = {}\n", |
| " cols = ['%user','%system','%iowait']\n", |
| " for k, v in qtime.items():\n", |
| " filtered_df = cpu_df[(cpu_df['time'] >= x) & (cpu_df['time'] <= x+v)]\n", |
| " averages = filtered_df[cols].mean()\n", |
| " qtime_se[k] = averages.tolist()\n", |
| " x += v\n", |
| " if qtime_se:\n", |
| " perqcpu = pd.DataFrame(qtime_se).T\n", |
| " perqcpu.columns = cols\n", |
| " display(perqcpu)\n", |
| "\n", |
| " plt.show()\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def convert_to_etc_gmt(tz_offset=None):\n", |
| " # Run the 'date +%z' command and get the output\n", |
| " if not tz_offset:\n", |
| " tz_offset = !date +%z\n", |
| " tz_offset = tz_offset[0]\n", |
| " \n", |
| " # Extract the sign and the hour/minute offset\n", |
| " sign = tz_offset[0]\n", |
| " hours = int(tz_offset[1:3])\n", |
| " minutes = int(tz_offset[3:])\n", |
| "\n", |
| " # Convert the offset to a GMT value\n", |
| " gmt_offset = hours + (minutes / 60)\n", |
| " if sign == '+':\n", |
| " gmt_offset = -gmt_offset\n", |
| " else:\n", |
| " gmt_offset = abs(gmt_offset)\n", |
| "\n", |
| " # Construct the Etc/GMT string\n", |
| " etc_gmt = f\"Etc/GMT{int(gmt_offset):+d}\"\n", |
| " return etc_gmt" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def get_last_run(records_file, appid=''):\n", |
| " if os.path.exists(records_file):\n", |
| " if appid:\n", |
| " lines=!tail -n100 $records_file\n", |
| " if len(lines) > 1:\n", |
| " # Check appid match\n", |
| " last_appid = lines[-1].split('\\t')[1]\n", |
| " if last_appid != appid:\n", |
| " print(f'appid not match. Required {appid}. Got {last_appid}')\n", |
| " else:\n", |
| " for line in lines[:-1][::-1]:\n", |
| " l=line.split('\\t')\n", |
| " if 'main' in l[3]:\n", |
| " return l[1],l[2],l[3]\n", |
| " else:\n", |
| " lines=!tail -n1 $records_file\n", |
| " if len(lines) == 1:\n", |
| " l=lines[0].split('\\t')\n", |
| " return l[1],l[2],l[3]\n", |
| " return None, None, None" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# TestTPC" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "code_folding": [] |
| }, |
| "outputs": [], |
| "source": [ |
| "import os\n", |
| "import socket\n", |
| "from dataclasses import dataclass\n", |
| "from functools import wraps\n", |
| "from pathlib import Path\n", |
| "from typing import List \n", |
| "\n", |
| "class TestTPC:\n", |
| " @dataclass\n", |
| " class query_info:\n", |
| " tables: List[str]\n", |
| " sql: List[str]\n", |
| "\n", |
| " query_infos = {}\n", |
| " query_ids =[]\n", |
| "\n", |
| " tpctables=[]\n", |
| " tpc_query_path = ''\n", |
| " \n", |
| " RECORDS_SPARK_TPCH = f\"records_spark_tpch_spark{spark_version_short}.csv\"\n", |
| " RECORDS_SPARK_TPCDS = f\"records_spark_tpcds_spark{spark_version_short}.csv\"\n", |
| " RECORDS_GLUTEN_TPCH = f\"records_gluten_tpch_spark{spark_version_short}.csv\"\n", |
| " RECORDS_GLUTEN_TPCDS = f\"records_gluten_tpcds_spark{spark_version_short}.csv\"\n", |
| " \n", |
| " def __init__(self, spark, table_dir, run_gluten, workload, server, base_dir, nb_name, data_source = 'parquet'):\n", |
| " self.spark = spark\n", |
| " self.sc = spark.sparkSession.sparkContext\n", |
| " self.appid = self.sc.applicationId\n", |
| " self.app_name = '_'.join(self.sc.appName.split(' '))\n", |
| " self.run_gluten = run_gluten\n", |
| " self.workload = workload\n", |
| " self.table_dir = table_dir\n", |
| " self.server = server\n", |
| " self.base_dir = base_dir\n", |
| " self.nb_name = nb_name\n", |
| " self.data_source = data_source\n", |
| " self.table_loaded = False\n", |
| " self.result = {}\n", |
| " self.duration = 0\n", |
| " self.stopped = False\n", |
| " self.collect_emon = False\n", |
| " self.perf_html = ''\n", |
| " self.finished_nb = ''\n", |
| " for l in os.listdir(self.tpc_query_path):\n", |
| " if (l[-3:] == 'sql'):\n", |
| " with open(self.tpc_query_path+l,\"r\") as f:\n", |
| " self.query_infos[l.split(\".\")[0]]=self.query_info(self.tpctables,[\"\\n\".join(f.readlines())])\n", |
| " self.query_ids = sorted(self.query_infos.keys(), key=lambda x: str(len(x))+x if x[-1] != 'a' and x[-1] != 'b' else str(len(x)-1) + x)\n", |
| " print(\"http://{}:18080/history/{}/jobs/\".format(local_ip, self.sc.applicationId))\n", |
| " \n", |
| " def start_monitor(self, clients, emon_list='', **kw):\n", |
| " if emon_list:\n", |
| " self.collect_emon = True\n", |
| " startmonitor(clients, self.appid, self.collect_emon, **kw)\n", |
| " \n", |
| " def stop_monitor(self, clients, **kw):\n", |
| " if self.stopped:\n", |
| " return\n", |
| " stopmonitor(clients, self.sc, self.appid, self.result, self.collect_emon, **kw)\n", |
| "\n", |
| " output_nb = f'{self.nb_name[:-6]}-{self.appid}.ipynb'\n", |
| " \n", |
| " record_file = ''\n", |
| " if self.workload == 'tpch':\n", |
| " if self.run_gluten:\n", |
| " record_file = self.RECORDS_GLUTEN_TPCH\n", |
| " else:\n", |
| " record_file = self.RECORDS_SPARK_TPCH\n", |
| " else:\n", |
| " if self.run_gluten:\n", |
| " record_file = self.RECORDS_GLUTEN_TPCDS\n", |
| " else:\n", |
| " record_file = self.RECORDS_SPARK_TPCDS\n", |
| " record_file = os.path.join(cwd, record_file)\n", |
| " with open(record_file, 'a+') as f:\n", |
| " f.write(f'{datetime.now()}\\t{self.appid}\\t{self.base_dir}\\t{self.app_name}\\t{output_nb}\\t{self.duration}\\n')\n", |
| "\n", |
| " if self.server:\n", |
| " if output_nb.startswith(cwd):\n", |
| " output_nb = os.path.relpath(output_nb, cwd)\n", |
| " self.finished_nb = f\"http://{localhost}:8888/tree/{output_nb}\"\n", |
| " upload_profile(self.server, self.base_dir, self.appid)\n", |
| " \n", |
| " self.stopped = True\n", |
| "\n", |
| " def run_perf_analysis(self, server_gluten_home, disk_dev, nic_dev, proxy='', emails=[], pr=''):\n", |
| " if not self.server:\n", |
| " return\n", |
| "\n", |
| " run_script=f'{server_gluten_home}/tools/workload/benchmark_velox/analysis/run_perf_analysis.sh'\n", |
| "\n", |
| " disk=','.join(disk_dev)\n", |
| " nic=','.join(nic_dev)\n", |
| "\n", |
| " command =' '.join(['bash', run_script, '--base-dir', self.base_dir, '--name', self.app_name, '--appid', self.appid, '--disk', disk, '--nic', nic, '--tz', convert_to_etc_gmt(), '--proxy', proxy if proxy != '' else \"''\", '--emails', ','.join(emails) if emails else \"''\", '--pr', pr if pr != '' and pr.isdigit() else \"''\"])\n", |
| " \n", |
| " if self.run_gluten:\n", |
| " if self.workload == 'tpch':\n", |
| " comp_file = os.path.join(cwd, self.RECORDS_GLUTEN_TPCH)\n", |
| " baseline_file = os.path.join(cwd, self.RECORDS_SPARK_TPCH)\n", |
| " else:\n", |
| " comp_file = os.path.join(cwd, self.RECORDS_GLUTEN_TPCDS)\n", |
| " baseline_file = os.path.join(cwd, self.RECORDS_SPARK_TPCDS)\n", |
| " comp_appid, comp_base_dir, comp_name = get_last_run(comp_file, self.appid)\n", |
| " if comp_appid:\n", |
| " command += ' '.join(['', '--comp-appid', comp_appid, '--comp-base-dir', comp_base_dir, '--comp-name', comp_name])\n", |
| " baseline_appid, baseline_base_dir, _ = get_last_run(baseline_file, '')\n", |
| " if baseline_appid:\n", |
| " command += ' '.join(['', '--baseline-appid', baseline_appid, '--baseline-base-dir', baseline_base_dir])\n", |
| " print(command)\n", |
| "\n", |
| " # Block if running on local cluster.\n", |
| " if self.server == localhost:\n", |
| " !ssh {self.server} \"{command} > /dev/null 2>&1\"\n", |
| " else:\n", |
| " !ssh {self.server} \"{command} > /dev/null 2>&1 &\"\n", |
| "\n", |
| " self.perf_html=f'http://{self.server}:8889/view/{self.base_dir}/html/{self.app_name}_{self.appid}.html'\n", |
| " display(HTML(f'<a href=\"{self.perf_html}\">{self.perf_html}</a>'))\n", |
| " \n", |
| " def load_table(self, table):\n", |
| " if type(self.table_dir)==list:\n", |
| " return self.spark.read.format(self.data_source).load([os.path.join(t, table) for t in self.table_dir])\n", |
| " else:\n", |
| " return self.spark.read.format(self.data_source).load(os.path.join(self.table_dir, table))\n", |
| " \n", |
| " def load_tables_as_tempview(self, tables):\n", |
| " for table in tables:\n", |
| " df = self.load_table(table)\n", |
| " df.createOrReplaceTempView(table)\n", |
| " \n", |
| " def load_all_tables_as_tempview(self):\n", |
| " print(f\"Loading all tables: {self.tpctables}\")\n", |
| " self.load_tables_as_tempview(self.tpctables)\n", |
| " \n", |
| " def load_query(self, query):\n", |
| " info = self.query_infos[query]\n", |
| " return [self.spark.sql(q) for q in info.sql]\n", |
| " \n", |
| " def run_query(self, query, explain = False, print_result=False, load_table=True):\n", |
| " if load_table:\n", |
| " self.load_all_tables_as_tempview()\n", |
| " start_time = timeit.default_timer()\n", |
| " print(\"start query \" + query + \", application id \" + self.sc.applicationId)\n", |
| " print(\"{} : {}\".format(\"Start time\", start_time))\n", |
| " self.sc.setJobDescription(query)\n", |
| "\n", |
| " queries = self.load_query(query)\n", |
| " for q in queries:\n", |
| " if explain: q.explain()\n", |
| " collect=q.collect()\n", |
| " end_time = timeit.default_timer()\n", |
| " duration = end_time - start_time\n", |
| " display(HTML(('Completed Query. Time(sec): <font size=6pt color=red>{:f}</font>'.format(duration))))\n", |
| " \n", |
| " self.result[query] = duration\n", |
| " self.duration += float(duration)\n", |
| " if print_result:\n", |
| " print(collect)\n", |
| "\n", |
| " def power_run(self, explain=False, print_result=False, load_table=True):\n", |
| " if load_table:\n", |
| " self.load_all_tables_as_tempview()\n", |
| " for l in self.query_ids:\n", |
| " self.run_query(l, explain=explain, print_result=print_result, load_table=False)\n", |
| "\n", |
| " def print_result(self):\n", |
| " print(self.result)\n", |
| " print()\n", |
| " print(f\"total duration:\\n{self.duration}\\n\")\n", |
| " if self.server:\n", |
| " print(self.finished_nb)\n", |
| " print(f\"http://{self.server}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{self.appid}.json\")\n", |
| " print(f\"http://{self.server}:18080/history/{self.appid}\")\n", |
| " print(self.perf_html)\n", |
| " print(self.appid)\n", |
| " for t in self.result.values():\n", |
| " print(t)\n", |
| " \n", |
| "class TestTPCH(TestTPC):\n", |
| " tpctables = ['customer', 'lineitem', 'nation', 'orders', 'part', 'partsupp', 'region', 'supplier']\n", |
| " tpc_query_path = f'{gluten_home}/tools/gluten-it/common/src/main/resources/tpch-queries/'\n", |
| " \n", |
| " def __init__(self, spark, table_dir, run_gluten, server, base_dir, nb_name, data_source = 'parquet'):\n", |
| " TestTPC.__init__(self,spark, table_dir, run_gluten, 'tpch', server, base_dir, nb_name, data_source)\n", |
| " \n", |
| "class TestTPCDS(TestTPC):\n", |
| " tpctables = [ 'call_center',\n", |
| " 'catalog_page',\n", |
| " 'catalog_returns',\n", |
| " 'catalog_sales',\n", |
| " 'customer',\n", |
| " 'customer_address',\n", |
| " 'customer_demographics',\n", |
| " 'date_dim',\n", |
| " 'household_demographics',\n", |
| " 'income_band',\n", |
| " 'inventory',\n", |
| " 'item',\n", |
| " 'promotion',\n", |
| " 'reason',\n", |
| " 'ship_mode',\n", |
| " 'store',\n", |
| " 'store_returns',\n", |
| " 'store_sales',\n", |
| " 'time_dim',\n", |
| " 'warehouse',\n", |
| " 'web_page',\n", |
| " 'web_returns',\n", |
| " 'web_sales',\n", |
| " 'web_site']\n", |
| " tpc_query_path = f'{gluten_home}/tools/gluten-it/common/src/main/resources/tpcds-queries/'\n", |
| " \n", |
| " def __init__(self, spark, table_dir, run_gluten, server, base_dir, nb_name, data_source = 'parquet'):\n", |
| " TestTPC.__init__(self,spark, table_dir, run_gluten, 'tpcds', server, base_dir, nb_name, data_source)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Create SparkContext" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## default config" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "code_folding": [] |
| }, |
| "outputs": [], |
| "source": [ |
| "import os\n", |
| "\n", |
| "def findjemalloc():\n", |
| " l = clients[0]\n", |
| " jemallocDir = !ssh $l \"whereis libjemalloc.so.2\"\n", |
| " libjemalloc = jemallocDir[0].split(' ')\n", |
| " return libjemalloc[1]\n", |
| "\n", |
| "def get_py4jzip():\n", |
| " spark_home=os.environ['SPARK_HOME']\n", |
| " py4jzip = !ls {spark_home}/python/lib/py4j*.zip\n", |
| " return py4jzip[0]\n", |
| "\n", |
| "def default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars='', app_name='', master='yarn', run_gluten=False):\n", |
| " # Create a temp directory that gets cleaned up on exit\n", |
| " output_dir = os.path.abspath(tempfile.mkdtemp())\n", |
| " def cleanup():\n", |
| " shutil.rmtree(output_dir, True)\n", |
| " atexit.register(cleanup)\n", |
| " signal.signal(signal.SIGTERM, cleanup)\n", |
| "\n", |
| "##################################################\n", |
| " def convert_to_bytes(size):\n", |
| " units = {'k': 1, 'm': 2, 'g': 3}\n", |
| " size = size.lower()\n", |
| " if size[-1] in units:\n", |
| " return int(size[:-1]) * 1024 ** units[size[-1]]\n", |
| " else:\n", |
| " return int(size)\n", |
| "\n", |
| " def yarn_padding(size):\n", |
| " min_size = convert_to_bytes('1g')\n", |
| " step = min_size\n", |
| " while size > min_size:\n", |
| " min_size += step\n", |
| " return min_size - size\n", |
| " \n", |
| " num_nodes = len(clients)\n", |
| " num_executors = num_nodes*executors_per_node\n", |
| " parallelism = num_executors*cores_per_executor*task_per_core\n", |
| "\n", |
| " if run_gluten:\n", |
| " offheap_ratio = gluten_offheap_ratio\n", |
| " else:\n", |
| " offheap_ratio = spark_offheap_ratio\n", |
| " driver_memory = convert_to_bytes('20g')\n", |
| " executor_memory_overhead = convert_to_bytes('1g')\n", |
| " \n", |
| " # Minimun executor memory\n", |
| " min_memory = convert_to_bytes('1g')\n", |
| "\n", |
| " # Calculate executor onheap memory\n", |
| " num_driver = 1 if localhost in clients else 0\n", |
| " executor_memory = math.floor((convert_to_bytes(memory_per_node) - (executor_memory_overhead + min_memory)*executors_per_node - (driver_memory + min_memory)*num_driver)/(offheap_ratio*num_driver + (1+offheap_ratio)*executors_per_node))\n", |
| " executor_memory = max(executor_memory, min_memory)\n", |
| " # Calculate driver/executor offheap memory in MB\n", |
| " #offheap_memory_per_node = convert_to_bytes(memory_per_node) - (executor_memory + executor_memory_overhead) * executors_per_node\n", |
| " if offheap_ratio > 0:\n", |
| " enable_offheap = True\n", |
| " offheap_memory = math.floor(executor_memory*offheap_ratio)\n", |
| " else:\n", |
| " enable_offheap = False\n", |
| " offheap_memory = 0\n", |
| "\n", |
| " byte_to_mb = lambda x: int(x/(1024 ** 2))\n", |
| " driver_memory_mb = byte_to_mb(driver_memory)\n", |
| " executor_memory_overhead_mb = byte_to_mb(executor_memory_overhead)\n", |
| " executor_memory_mb = byte_to_mb(executor_memory)\n", |
| " offheap_memory_mb = byte_to_mb(offheap_memory)\n", |
| " \n", |
| " executor_totalmem_mb = executor_memory_overhead_mb + executor_memory_mb + offheap_memory_mb\n", |
| " executor_totalmem_mb = yarn_padding(executor_totalmem_mb)\n", |
| " if byte_to_mb(convert_to_bytes(memory_per_node)) - executor_totalmem_mb*executors_per_node > executor_totalmem_mb:\n", |
| " executor_memory_overhead_mb += 1024\n", |
| " \n", |
| " print('''\n", |
| " executors per node: {:d}\n", |
| " parallelism: {:d}\n", |
| " executor memory: {:d}m\n", |
| " offheap memory: {:d}m\n", |
| " '''.format(executors_per_node, parallelism, executor_memory_mb, offheap_memory_mb))\n", |
| "\n", |
| " conf = SparkConf() \\\n", |
| " .set('spark.app.name', app_name)\\\n", |
| " .set('spark.master',master)\\\n", |
| " .set('spark.executor.memory', '{:d}m'.format(executor_memory_mb))\\\n", |
| " .set('spark.memory.offHeap.enabled', enable_offheap)\\\n", |
| " .set('spark.memory.offHeap.size','{:d}m'.format(offheap_memory_mb))\\\n", |
| " .set('spark.sql.shuffle.partitions', parallelism)\\\n", |
| " .set('spark.executor.instances', '{:d}'.format(num_executors))\\\n", |
| " .set('spark.executor.cores','{:d}'.format(cores_per_executor))\\\n", |
| " .set('spark.task.cpus','{:d}'.format(1))\\\n", |
| " .set('spark.driver.memory', '{:d}m'.format(driver_memory_mb))\\\n", |
| " .set('spark.executor.memoryOverhead', '{:d}m'.format(executor_memory_overhead_mb))\\\n", |
| " .set('spark.driver.maxResultSize', '4g')\\\n", |
| " .set('spark.executor.extraJavaOptions',\\\n", |
| " f'-XX:+UseParallelOldGC -XX:ParallelGCThreads=2 -XX:NewRatio=1 -XX:SurvivorRatio=1 -XX:+UseCompressedOops -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:ErrorFile={home}/logs/java/hs_err_pid%p.log')\\\n", |
| " .set('spark.driver.extraClassPath', extra_jars) \\\n", |
| " .set('spark.executor.extraClassPath', extra_jars) \\\n", |
| " .set('spark.executorEnv.PYTHONPATH',f\"{os.environ['SPARK_HOME']}python:{get_py4jzip()}\") \\\n", |
| " .set(\"spark.repl.class.outputDir\", output_dir) \\\n", |
| " .set(\"spark.sql.broadcastTimeout\", \"4800\") \\\n", |
| " .set('spark.serializer','org.apache.spark.serializer.KryoSerializer')\\\n", |
| " .set('spark.kryoserializer.buffer.max','512m')\\\n", |
| " .set('spark.kryo.unsafe',False)\\\n", |
| " .set('spark.sql.adaptive.enabled',True)\\\n", |
| " .set('spark.sql.autoBroadcastJoinThreshold',\"10m\")\\\n", |
| " .set('spark.sql.catalogImplementation','hive')\\\n", |
| " .set('spark.sql.optimizer.dynamicPartitionPruning.enabled',True)\\\n", |
| " .set('spark.cleaner.periodicGC.interval', '10s')\n", |
| "\n", |
| " return conf\n", |
| "\n", |
| "\n", |
| "def create_cntx_with_config(conf,conf_overwrite=None):\n", |
| "\n", |
| " importlib.reload(pyspark.java_gateway)\n", |
| "\n", |
| " def Popen(*args, **kwargs):\n", |
| " \"\"\"Wraps subprocess.Popen to force stdout and stderr from the child process\n", |
| " to pipe to this process without buffering.\n", |
| " \"\"\"\n", |
| " global spark_jvm_proc\n", |
| " # Override these in kwargs to avoid duplicate value errors\n", |
| " # Set streams to unbuffered so that we read whatever bytes are available\n", |
| " # when ready, https://docs.python.org/3.6/library/subprocess.html#popen-constructor\n", |
| " kwargs['bufsize'] = 0\n", |
| " # Capture everything from stdout for display in the notebook\n", |
| " kwargs['stdout'] = subprocess.PIPE\n", |
| " print(\"java proc gateway popen\")\n", |
| " spark_jvm_proc = subprocess.Popen(*args, **kwargs)\n", |
| " return spark_jvm_proc\n", |
| " pyspark.java_gateway.Popen = Popen\n", |
| "\n", |
| " spylon_kernel.scala_interpreter.scala_intp=None\n", |
| " \n", |
| " if conf_overwrite is not None:\n", |
| " conf=conf_overwrite(conf)\n", |
| " print(\"spark.serializer: \",conf.get(\"spark.serializer\"))\n", |
| " print(\"master: \",conf.get(\"spark.master\"))\n", |
| " \n", |
| " sc = SparkContext(conf = conf,master=conf.get(\"spark.master\"))\n", |
| " sc.setLogLevel('ERROR')\n", |
| " \n", |
| " sc.addPyFile(f\"{os.environ['SPARK_HOME']}/python/lib/pyspark.zip\")\n", |
| " sc.addPyFile(get_py4jzip())\n", |
| " \n", |
| " spark = SQLContext(sc)\n", |
| " \n", |
| " time.sleep(30)\n", |
| " \n", |
| " for client in clients:\n", |
| " pids=!ssh $client \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1\"\n", |
| " print(client,\":\",len(pids),\" \",\"\\t\".join(map(str,pids)))\n", |
| " \n", |
| " spark_session = SparkSession(sc)\n", |
| " spark_jvm_helpers = SparkJVMHelpers(spark_session._sc)\n", |
| " spylon_kernel.scala_interpreter.spark_state = spylon_kernel.scala_interpreter.SparkState(spark_session, spark_jvm_helpers, spark_jvm_proc)\n", |
| " \n", |
| " print(\"appid: \",sc.applicationId)\n", |
| " print(\"SparkConf:\")\n", |
| "\n", |
| " df = pd.DataFrame(sc.getConf().getAll(), columns=['key', 'value'])\n", |
| " display(df)\n", |
| "\n", |
| " return sc, spark" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Spark" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def spark_tpch_conf_overwrite(conf):\n", |
| " return conf\n", |
| "\n", |
| "def spark_tpcds_conf_overwrite(conf):\n", |
| " conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\\\n", |
| " .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')\n", |
| " return conf" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def create_cntx_spark(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name='', master='yarn', conf_overwrite=None):\n", |
| " conf = default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, run_gluten=False)\n", |
| " conf.set(\"spark.sql.execution.arrow.maxRecordsPerBatch\",20480)\\\n", |
| " .set(\"spark.sql.parquet.columnarReaderBatchSize\",20480)\\\n", |
| " .set(\"spark.sql.inMemoryColumnarStorage.batchSize\",20480)\n", |
| " return create_cntx_with_config(conf,conf_overwrite)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Gluten" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def gluten_tpch_conf_overwrite(conf):\n", |
| " return conf\n", |
| "\n", |
| "def gluten_tpcds_conf_overwrite(conf):\n", |
| " conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\\\n", |
| " .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')\\\n", |
| " .set('spark.gluten.sql.columnar.joinOptimizationLevel', '18')\\\n", |
| " .set('spark.gluten.sql.columnar.physicalJoinOptimizeEnable', 'true')\\\n", |
| " .set('spark.gluten.sql.columnar.physicalJoinOptimizationLevel', '18')\\\n", |
| " return conf" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def create_cntx_gluten(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name='', master='yarn', conf_overwrite=None):\n", |
| " conf = default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, run_gluten=True)\n", |
| " conf.set('spark.sql.files.maxPartitionBytes', '4g')\\\n", |
| " .set('spark.plugins','org.apache.gluten.GlutenPlugin')\\\n", |
| " .set('spark.shuffle.manager','org.apache.spark.shuffle.sort.ColumnarShuffleManager')\\\n", |
| " .set('spark.gluten.sql.columnar.backend.lib','velox')\\\n", |
| " .set('spark.gluten.sql.columnar.maxBatchSize',4096)\\\n", |
| " .set('spark.gluten.sql.columnar.forceShuffledHashJoin',True)\\\n", |
| " .set('spark.executorEnv.LD_PRELOAD', findjemalloc())\\\n", |
| " .set('spark.gluten.sql.columnar.coalesce.batches', 'true')\n", |
| " \n", |
| " return create_cntx_with_config(conf,conf_overwrite)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Create Context" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "def create_cntx(run_gluten=False, workload='tpch', app_conf_overwrite=None, server='', base_dir='', nb_name='tpc_workload.ipynb', app_name=''):\n", |
| " table_dir=''\n", |
| " extra_jars = ''\n", |
| " is_tpch_workload=False\n", |
| " is_tpcds_workload=False\n", |
| " app_name_suffix=''\n", |
| " workload_conf_overwrite=None\n", |
| " create_cntx_func=None\n", |
| " test_tpc=None\n", |
| "\n", |
| " if workload.lower() == 'tpch':\n", |
| " app_name_suffix = f\"tpch_spark{spark_version_short}\"\n", |
| " tabledir = tpch_tabledir\n", |
| " is_tpch_workload=True\n", |
| " elif workload.lower() == 'tpcds':\n", |
| " app_name_suffix = f\"tpcds_spark{spark_version_short}\"\n", |
| " tabledir = tpcds_tabledir\n", |
| " is_tpcds_workload=True\n", |
| " else:\n", |
| " raise ValueError(f\"Unknown workload: {workload}\")\n", |
| "\n", |
| " lastgit=!git --git-dir {gluten_home}/.git log --format=\"%H\" -n 1\n", |
| " lastgit = lastgit[0]\n", |
| " print(f'lastgit: {lastgit}')\n", |
| "\n", |
| " nodes=len(clients)\n", |
| "\n", |
| " if run_gluten:\n", |
| " jars_base=f\"{home}/jars/\"+lastgit\n", |
| " \n", |
| " for target_jar in gluten_target_jar.split(\",\"):\n", |
| " !ls -l {target_jar}\n", |
| " !mkdir -p {jars_base}\n", |
| " !rm -rf {jars_base}/*\n", |
| " !cp {target_jar} {jars_base}/\n", |
| " if target_jar[-4:] != '.jar':\n", |
| " !cp -f {target_jar} {jars_base}/gluten-{lastgit}.jar\n", |
| "\n", |
| " jars=!ls -d {jars_base}/*.jar\n", |
| " extra_jars=\":\".join([\"file://\"+j for j in jars])\n", |
| " print(f'extra_jars: {extra_jars}')\n", |
| "\n", |
| " for c in clients:\n", |
| " if c!=localhost:\n", |
| " !ssh {c} \"rm -rf {jars_base}\"\n", |
| " !ssh {c} \"mkdir -p {jars_base}\"\n", |
| " !scp {jars_base}/*.jar {c}:{jars_base} >/dev/null 2>&1\n", |
| "\n", |
| " app_name_suffix = '_'.join(['gluten', app_name_suffix, lastgit[:6]])\n", |
| " create_cntx_func=create_cntx_gluten\n", |
| " if is_tpch_workload:\n", |
| " task_per_core = gluten_tpch_task_per_core\n", |
| " workload_conf_overwrite = gluten_tpch_conf_overwrite\n", |
| " elif is_tpcds_workload:\n", |
| " task_per_core = gluten_tpcds_task_per_core\n", |
| " workload_conf_overwrite = gluten_tpcds_conf_overwrite\n", |
| " else:\n", |
| " app_name_suffix = '_'.join(['spark', app_name_suffix, lastgit[:6]])\n", |
| " create_cntx_func=create_cntx_spark\n", |
| " if is_tpch_workload:\n", |
| " task_per_core = spark_tpch_task_per_core\n", |
| " workload_conf_overwrite = spark_tpch_conf_overwrite\n", |
| " elif is_tpcds_workload:\n", |
| " task_per_core = spark_tpcds_task_per_core\n", |
| " workload_conf_overwrite = spark_tpcds_conf_overwrite\n", |
| " \n", |
| " if app_name:\n", |
| " app_name = app_name + ' ' + app_name_suffix\n", |
| " else:\n", |
| " app_name = app_name_suffix\n", |
| "\n", |
| " conf_overwrite = lambda conf: app_conf_overwrite(workload_conf_overwrite(conf))\n", |
| " \n", |
| " sc, spark = create_cntx_func(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, conf_overwrite)\n", |
| " \n", |
| " # Pin executors to numa nodes for Gluten\n", |
| " if run_gluten:\n", |
| " pinexecutor_numa(clients)\n", |
| "\n", |
| " appid = sc.applicationId\n", |
| " print(\"start run: \", appid)\n", |
| " \n", |
| " if is_tpch_workload:\n", |
| " test_tpc = TestTPCH(spark, tabledir, run_gluten, server, base_dir, nb_name)\n", |
| " elif is_tpcds_workload:\n", |
| " test_tpc = TestTPCDS(spark, tabledir, run_gluten, server, base_dir, nb_name)\n", |
| " \n", |
| " return sc, spark, appid, test_tpc" |
| ] |
| } |
| ], |
| "metadata": { |
| "hide_input": false, |
| "kernelspec": { |
| "display_name": "Python 3 (ipykernel)", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.10.12" |
| }, |
| "nbTranslate": { |
| "displayLangs": [ |
| "*" |
| ], |
| "hotkey": "alt-t", |
| "langInMainMenu": true, |
| "sourceLang": "en", |
| "targetLang": "fr", |
| "useGoogleTranslate": true |
| }, |
| "toc": { |
| "base_numbering": 1, |
| "nav_menu": {}, |
| "number_sections": true, |
| "sideBar": false, |
| "skip_h1_title": false, |
| "title_cell": "Table of Contents", |
| "title_sidebar": "Contents", |
| "toc_cell": false, |
| "toc_position": { |
| "height": "364.469px", |
| "left": "2086.8px", |
| "top": "150.516px", |
| "width": "375px" |
| }, |
| "toc_section_display": true, |
| "toc_window_display": true |
| }, |
| "toc-autonumbering": true, |
| "varInspector": { |
| "cols": { |
| "lenName": 16, |
| "lenType": 16, |
| "lenVar": 40 |
| }, |
| "kernels_config": { |
| "python": { |
| "delete_cmd_postfix": "", |
| "delete_cmd_prefix": "del ", |
| "library": "var_list.py", |
| "varRefreshCmd": "print(var_dic_list())" |
| }, |
| "r": { |
| "delete_cmd_postfix": ") ", |
| "delete_cmd_prefix": "rm(", |
| "library": "var_list.r", |
| "varRefreshCmd": "cat(var_dic_list()) " |
| } |
| }, |
| "types_to_exclude": [ |
| "module", |
| "function", |
| "builtin_function_or_method", |
| "instance", |
| "_Feature" |
| ], |
| "window_display": false |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 4 |
| } |