layout: global title: Monitoring and Instrumentation description: Monitoring, metrics, and instrumentation guide for Spark SPARK_VERSION_SHORT license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

  • This will become a table of contents (this text will be scraped). {:toc}

There are several ways to monitor Spark applications: web UIs, metrics, and external instrumentation.

Web Interfaces

Every SparkContext launches a Web UI, by default on port 4040, that displays useful information about the application. This includes:

  • A list of scheduler stages and tasks
  • A summary of RDD sizes and memory usage
  • Environmental information.
  • Information about the running executors

You can access this interface by simply opening http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage.

Viewing After the Fact

It is still possible to construct the UI of an application through Spark‘s history server, provided that the application’s event logs exist. You can start the history server by executing:

./sbin/start-history-server.sh

This creates a web interface at http://<server-url>:18080 by default, listing incomplete and completed applications and attempts.

When using the file-system provider class (see spark.history.provider below), the base logging directory must be supplied in the spark.history.fs.logDirectory configuration option, and should contain sub-directories that each represents an application's event logs.

The spark jobs themselves must be configured to log events, and to log them to the same shared, writable directory. For example, if the server was configured with a log directory of hdfs://namenode/shared/spark-logs, then the client-side options would be:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

The history server can be configured as follows:

Environment Variables

Applying compaction on rolling event log files

A long-running application (e.g. streaming) can bring a huge single event log file which may cost a lot to maintain and also requires a bunch of resource to replay per each update in Spark History Server.

Enabling spark.eventLog.rolling.enabled and spark.eventLog.rolling.maxFileSize would let you have rolling event log files instead of single huge event log file which may help some scenarios on its own, but it still doesn't help you reducing the overall size of logs.

Spark History Server can apply compaction on the rolling event log files to reduce the overall size of logs, via setting the configuration spark.history.fs.eventLog.rolling.maxFilesToRetain on the Spark History Server.

Details will be described below, but please note in prior that compaction is LOSSY operation. Compaction will discard some events which will be no longer seen on UI - you may want to check which events will be discarded before enabling the option.

When the compaction happens, the History Server lists all the available event log files for the application, and considers the event log files having less index than the file with smallest index which will be retained as target of compaction. For example, if the application A has 5 event log files and spark.history.fs.eventLog.rolling.maxFilesToRetain is set to 2, then first 3 log files will be selected to be compacted.

Once it selects the target, it analyzes them to figure out which events can be excluded, and rewrites them into one compact file with discarding events which are decided to exclude.

The compaction tries to exclude the events which point to the outdated data. As of now, below describes the candidates of events to be excluded:

  • Events for the job which is finished, and related stage/tasks events
  • Events for the executor which is terminated
  • Events for the SQL execution which is finished, and related job/stage/tasks events

Once rewriting is done, original log files will be deleted, via best-effort manner. The History Server may not be able to delete the original log files, but it will not affect the operation of the History Server.

Please note that Spark History Server may not compact the old event log files if figures out not a lot of space would be reduced during compaction. For streaming query we normally expect compaction will run as each micro-batch will trigger one or more jobs which will be finished shortly, but compaction won't run in many cases for batch query.

Please also note that this is a new feature introduced in Spark 3.0, and may not be completely stable. Under some circumstances, the compaction may exclude more events than you expect, leading some UI issues on History Server for the application. Use it with caution.

Spark History Server Configuration Options

Security options for the Spark History Server are covered more detail in the Security page.

Note that in all of these UIs, the tables are sortable by clicking their headers, making it easy to identify slow tasks, data skew, etc.

Note

  1. The history server displays both completed and incomplete Spark jobs. If an application makes multiple attempts after failures, the failed attempts will be displayed, as well as any ongoing incomplete attempt or the final successful attempt.

  2. Incomplete applications are only updated intermittently. The time between updates is defined by the interval between checks for changed files (spark.history.fs.update.interval). On larger clusters, the update interval may be set to large values. The way to view a running application is actually to view its own web UI.

  3. Applications which exited without registering themselves as completed will be listed as incomplete —even though they are no longer running. This can happen if an application crashes.

  4. One way to signal the completion of a Spark job is to stop the Spark Context explicitly (sc.stop()), or in Python using the with SparkContext() as sc: construct to handle the Spark Context setup and tear down.

REST API

In addition to viewing the metrics in the UI, they are also available as JSON. This gives developers an easy way to create new visualizations and monitoring tools for Spark. The JSON is available for both running applications, and in the history server. The endpoints are mounted at /api/v1. For example, for the history server, they would typically be accessible at http://<server-url>:18080/api/v1, and for a running application, at http://localhost:4040/api/v1.

In the API, an application is referenced by its application ID, [app-id]. When running on YARN, each application may have multiple attempts, but there are attempt IDs only for applications in cluster mode, not applications in client mode. Applications in YARN cluster mode can be identified by their [attempt-id]. In the API listed below, when running in YARN cluster mode, [app-id] will actually be [base-app-id]/[attempt-id], where [base-app-id] is the YARN application ID.

The number of jobs and stages which can be retrieved is constrained by the same retention mechanism of the standalone Spark UI; "spark.ui.retainedJobs" defines the threshold value triggering garbage collection on jobs, and spark.ui.retainedStages that for stages. Note that the garbage collection takes place on playback: it is possible to retrieve more entries by increasing these values and restarting the history server.

Executor Task Metrics

The REST API exposes the values of the Task Metrics collected by Spark executors with the granularity of task execution. The metrics can be used for performance troubleshooting and workload characterization. A list of the available metrics, with a short description:

Executor Metrics

Executor-level metrics are sent from each executor to the driver as part of the Heartbeat to describe the performance metrics of Executor itself like JVM heap memory, GC information. Executor metric values and their measured memory peak values per executor are exposed via the REST API in JSON format and in Prometheus format. The JSON end point is exposed at: /applications/[app-id]/executors, and the Prometheus endpoint at: /metrics/executors/prometheus. The Prometheus endpoint is conditional to a configuration parameter: spark.ui.prometheus.enabled=true (the default is false). In addition, aggregated per-stage peak values of the executor memory metrics are written to the event log if spark.eventLog.logStageExecutorMetrics is true.
Executor memory metrics are also exposed via the Spark metrics system based on the Dropwizard metrics library. A list of the available metrics, with a short description:

API Versioning Policy

These endpoints have been strongly versioned to make it easier to develop applications on top. In particular, Spark guarantees:

  • Endpoints will never be removed from one version
  • Individual fields will never be removed for any given endpoint
  • New endpoints may be added
  • New fields may be added to existing endpoints
  • New versions of the api may be added in the future as a separate endpoint (e.g., api/v2). New versions are not required to be backwards compatible.
  • Api versions may be dropped, but only after at least one minor release of co-existing with a new api version.

Note that even when examining the UI of running applications, the applications/[app-id] portion is still required, though there is only one application available. E.g. to see the list of jobs for the running app, you would go to http://localhost:4040/api/v1/applications/[app-id]/jobs. This is to keep the paths consistent in both modes.

Metrics

Spark has a configurable metrics system based on the Dropwizard Metrics Library. This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV files. The metrics are generated by sources embedded in the Spark code base. They provide instrumentation for specific activities and Spark components. The metrics system is configured via a configuration file that Spark expects to be present at $SPARK_HOME/conf/metrics.properties. A custom file location can be specified via the spark.metrics.conf configuration property. Instead of using the configuration file, a set of configuration parameters with prefix spark.metrics.conf. can be used. By default, the root namespace used for driver or executor metrics is the value of spark.app.id. However, often times, users want to be able to track the metrics across apps for driver and executors, which is hard to do with application ID (i.e. spark.app.id) since it changes with every invocation of the app. For such use cases, a custom namespace can be specified for metrics reporting using spark.metrics.namespace configuration property. If, say, users wanted to set the metrics namespace to the name of the application, they can set the spark.metrics.namespace property to a value like ${spark.app.name}. This value is then expanded appropriately by Spark and is used as the root namespace of the metrics system. Non-driver and executor metrics are never prefixed with spark.app.id, nor does the spark.metrics.namespace property have any such affect on such metrics.

Spark's metrics are decoupled into different instances corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported:

  • master: The Spark standalone master process.
  • applications: A component within the master which reports on various applications.
  • worker: A Spark standalone worker process.
  • executor: A Spark executor.
  • driver: The Spark driver process (the process in which your SparkContext is created).
  • shuffleService: The Spark shuffle service.
  • applicationMaster: The Spark ApplicationMaster when running on YARN.
  • mesos_cluster: The Spark cluster scheduler when running on Mesos.

Each instance can report to zero or more sinks. Sinks are contained in the org.apache.spark.metrics.sink package:

  • ConsoleSink: Logs metrics information to the console.
  • CSVSink: Exports metrics data to CSV files at regular intervals.
  • JmxSink: Registers metrics for viewing in a JMX console.
  • MetricsServlet: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
  • PrometheusServlet: (Experimental) Adds a servlet within the existing Spark UI to serve metrics data in Prometheus format.
  • GraphiteSink: Sends metrics to a Graphite node.
  • Slf4jSink: Sends metrics to slf4j as log entries.
  • StatsdSink: Sends metrics to a StatsD node.

Spark also supports a Ganglia sink which is not included in the default build due to licensing restrictions:

  • GangliaSink: Sends metrics to a Ganglia node or multicast group.

To install the GangliaSink you‘ll need to perform a custom build of Spark. Note that by embedding this library you will include LGPL-licensed code in your Spark package. For sbt users, set the SPARK_GANGLIA_LGPL environment variable before building. For Maven users, enable the -Pspark-ganglia-lgpl profile. In addition to modifying the cluster’s Spark build user applications will need to link to the spark-ganglia-lgpl artifact.

The syntax of the metrics configuration file and the parameters available for each sink are defined in an example configuration file, $SPARK_HOME/conf/metrics.properties.template.

When using Spark configuration parameters instead of the metrics configuration file, the relevant parameter names are composed by the prefix spark.metrics.conf. followed by the configuration details, i.e. the parameters take the following form: spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]. This example shows a list of Spark configuration parameters for a Graphite sink:

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"

Default values of the Spark metrics configuration are as follows:

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

Additional sources can be configured using the metrics configuration file or the configuration parameter spark.metrics.conf.[component_name].source.jvm.class=[source_name]. At present the JVM source is the only available optional source. For example the following configuration parameter activates the JVM source: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

List of available metrics providers

Metrics used by Spark are of multiple types: gauge, counter, histogram, meter and timer, see Dropwizard library documentation for details. The following list of components and metrics reports the name and some details about the available metrics, grouped per component instance and source namespace. The most common time of metrics used in Spark instrumentation are gauges and counters. Counters can be recognized as they have the .count suffix. Timers, meters and histograms are annotated in the list, the rest of the list elements are metrics of type gauge. The large majority of metrics are active as soon as their parent component instance is configured, some metrics require also to be enabled via an additional configuration parameter, the details are reported in the list.

Component instance = Driver

This is the component with the largest amount of instrumented metrics

  • namespace=BlockManager

    • disk.diskSpaceUsed_MB
    • memory.maxMem_MB
    • memory.maxOffHeapMem_MB
    • memory.maxOnHeapMem_MB
    • memory.memUsed_MB
    • memory.offHeapMemUsed_MB
    • memory.onHeapMemUsed_MB
    • memory.remainingMem_MB
    • memory.remainingOffHeapMem_MB
    • memory.remainingOnHeapMem_MB
  • namespace=HiveExternalCatalog

    • note: these metrics are conditional to a configuration parameter: spark.metrics.staticSources.enabled (default is true)
    • fileCacheHits.count
    • filesDiscovered.count
    • hiveClientCalls.count
    • parallelListingJobCount.count
    • partitionsFetched.count
  • namespace=CodeGenerator

    • note: these metrics are conditional to a configuration parameter: spark.metrics.staticSources.enabled (default is true)
    • compilationTime (histogram)
    • generatedClassSize (histogram)
    • generatedMethodSize (histogram)
    • sourceCodeSize (histogram)
  • namespace=DAGScheduler

    • job.activeJobs
    • job.allJobs
    • messageProcessingTime (timer)
    • stage.failedStages
    • stage.runningStages
    • stage.waitingStages
  • namespace=LiveListenerBus

    • listenerProcessingTime.org.apache.spark.HeartbeatReceiver (timer)
    • listenerProcessingTime.org.apache.spark.scheduler.EventLoggingListener (timer)
    • listenerProcessingTime.org.apache.spark.status.AppStatusListener (timer)
    • numEventsPosted.count
    • queue.appStatus.listenerProcessingTime (timer)
    • queue.appStatus.numDroppedEvents.count
    • queue.appStatus.size
    • queue.eventLog.listenerProcessingTime (timer)
    • queue.eventLog.numDroppedEvents.count
    • queue.eventLog.size
    • queue.executorManagement.listenerProcessingTime (timer)
  • namespace=appStatus (all metrics of type=counter)

    • note: Introduced in Spark 3.0. Conditional to a configuration parameter:
      spark.metrics.appStatusSource.enabled (default is false)
    • stages.failedStages.count
    • stages.skippedStages.count
    • stages.completedStages.count
    • tasks.blackListedExecutors.count // deprecated use excludedExecutors instead
    • tasks.excludedExecutors.count
    • tasks.completedTasks.count
    • tasks.failedTasks.count
    • tasks.killedTasks.count
    • tasks.skippedTasks.count
    • tasks.unblackListedExecutors.count // deprecated use unexcludedExecutors instead
    • tasks.unexcludedExecutors.count
    • jobs.succeededJobs
    • jobs.failedJobs
    • jobDuration
  • namespace=AccumulatorSource

    • note: User-configurable sources to attach accumulators to metric system
    • DoubleAccumulatorSource
    • LongAccumulatorSource
  • namespace=spark.streaming

    • note: This applies to Spark Structured Streaming only. Conditional to a configuration parameter: spark.sql.streaming.metricsEnabled=true (default is false)
    • eventTime-watermark
    • inputRate-total
    • latency
    • processingRate-total
    • states-rowsTotal
    • states-usedBytes
  • namespace=JVMCPU

    • jvmCpuTime
  • namespace=executor

    • note: These metrics are available in the driver in local mode only.
    • A full list of available metrics in this namespace can be found in the corresponding entry for the Executor component instance.
  • namespace=ExecutorMetrics

    • note: these metrics are conditional to a configuration parameter: spark.metrics.executorMetricsSource.enabled (default is true)
    • This source contains memory-related metrics. A full list of available metrics in this namespace can be found in the corresponding entry for the Executor component instance.
  • namespace=ExecutorAllocationManager

    • note: these metrics are only emitted when using dynamic allocation. Conditional to a configuration parameter spark.dynamicAllocation.enabled (default is false)
    • executors.numberExecutorsToAdd
    • executors.numberExecutorsPendingToRemove
    • executors.numberAllExecutors
    • executors.numberTargetExecutors
    • executors.numberMaxNeededExecutors
    • executors.numberExecutorsGracefullyDecommissioned.count
    • executors.numberExecutorsDecommissionUnfinished.count
    • executors.numberExecutorsExitedUnexpectedly.count
    • executors.numberExecutorsKilledByDriver.count
  • namespace=plugin.<Plugin Class Name>

    • Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See “Advanced Instrumentation” below for how to load custom plugins into Spark.

Component instance = Executor

These metrics are exposed by Spark executors.

  • namespace=executor (metrics are of type counter or gauge)

    • notes:
      • spark.executor.metrics.fileSystemSchemes (default: file,hdfs) determines the exposed file system metrics.
    • bytesRead.count
    • bytesWritten.count
    • cpuTime.count
    • deserializeCpuTime.count
    • deserializeTime.count
    • diskBytesSpilled.count
    • filesystem.file.largeRead_ops
    • filesystem.file.read_bytes
    • filesystem.file.read_ops
    • filesystem.file.write_bytes
    • filesystem.file.write_ops
    • filesystem.hdfs.largeRead_ops
    • filesystem.hdfs.read_bytes
    • filesystem.hdfs.read_ops
    • filesystem.hdfs.write_bytes
    • filesystem.hdfs.write_ops
    • jvmGCTime.count
    • memoryBytesSpilled.count
    • recordsRead.count
    • recordsWritten.count
    • resultSerializationTime.count
    • resultSize.count
    • runTime.count
    • shuffleBytesWritten.count
    • shuffleFetchWaitTime.count
    • shuffleLocalBlocksFetched.count
    • shuffleLocalBytesRead.count
    • shuffleRecordsRead.count
    • shuffleRecordsWritten.count
    • shuffleRemoteBlocksFetched.count
    • shuffleRemoteBytesRead.count
    • shuffleRemoteBytesReadToDisk.count
    • shuffleTotalBytesRead.count
    • shuffleWriteTime.count
    • succeededTasks.count
    • threadpool.activeTasks
    • threadpool.completeTasks
    • threadpool.currentPool_size
    • threadpool.maxPool_size
    • threadpool.startedTasks
  • namespace=ExecutorMetrics

    • notes:
      • These metrics are conditional to a configuration parameter: spark.metrics.executorMetricsSource.enabled (default value is true)
      • ExecutorMetrics are updated as part of heartbeat processes scheduled for the executors and for the driver at regular intervals: spark.executor.heartbeatInterval (default value is 10 seconds)
      • An optional faster polling mechanism is available for executor memory metrics, it can be activated by setting a polling interval (in milliseconds) using the configuration parameter spark.executor.metrics.pollingInterval
    • JVMHeapMemory
    • JVMOffHeapMemory
    • OnHeapExecutionMemory
    • OnHeapStorageMemory
    • OnHeapUnifiedMemory
    • OffHeapExecutionMemory
    • OffHeapStorageMemory
    • OffHeapUnifiedMemory
    • DirectPoolMemory
    • MappedPoolMemory
    • MinorGCCount
    • MinorGCTime
    • MajorGCCount
    • MajorGCTime
    • “ProcessTree*” metric counters:
      • ProcessTreeJVMVMemory
      • ProcessTreeJVMRSSMemory
      • ProcessTreePythonVMemory
      • ProcessTreePythonRSSMemory
      • ProcessTreeOtherVMemory
      • ProcessTreeOtherRSSMemory
      • note: “ProcessTree*” metrics are collected only under certain conditions. The conditions are the logical AND of the following: /proc filesystem exists, spark.executor.processTreeMetrics.enabled=true. “ProcessTree*” metrics report 0 when those conditions are not met.
  • namespace=JVMCPU

    • jvmCpuTime
  • namespace=NettyBlockTransfer

    • shuffle-client.usedDirectMemory
    • shuffle-client.usedHeapMemory
    • shuffle-server.usedDirectMemory
    • shuffle-server.usedHeapMemory
  • namespace=HiveExternalCatalog

    • note: these metrics are conditional to a configuration parameter: spark.metrics.staticSources.enabled (default is true)
    • fileCacheHits.count
    • filesDiscovered.count
    • hiveClientCalls.count
    • parallelListingJobCount.count
    • partitionsFetched.count
  • namespace=CodeGenerator

    • note: these metrics are conditional to a configuration parameter: spark.metrics.staticSources.enabled (default is true)
    • compilationTime (histogram)
    • generatedClassSize (histogram)
    • generatedMethodSize (histogram)
    • sourceCodeSize (histogram)
  • namespace=plugin.<Plugin Class Name>

    • Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See “Advanced Instrumentation” below for how to load custom plugins into Spark.

Source = JVM Source

Notes:

  • Activate this source by setting the relevant metrics.properties file entry or the configuration parameter:spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
  • These metrics are conditional to a configuration parameter: spark.metrics.staticSources.enabled (default is true)
  • This source is available for driver and executor instances and is also available for other instances.
  • This source provides information on JVM metrics using the Dropwizard/Codahale Metric Sets for JVM instrumentation and in particular the metric sets BufferPoolMetricSet, GarbageCollectorMetricSet and MemoryUsageGaugeSet.

Component instance = applicationMaster

Note: applies when running on YARN

  • numContainersPendingAllocate
  • numExecutorsFailed
  • numExecutorsRunning
  • numLocalityAwareTasks
  • numReleasedContainers

Component instance = mesos_cluster

Note: applies when running on mesos

  • waitingDrivers
  • launchedDrivers
  • retryDrivers

Component instance = master

Note: applies when running in Spark standalone as master

  • workers
  • aliveWorkers
  • apps
  • waitingApps

Component instance = ApplicationSource

Note: applies when running in Spark standalone as master

  • status
  • runtime_ms
  • cores

Component instance = worker

Note: applies when running in Spark standalone as worker

  • executors
  • coresUsed
  • memUsed_MB
  • coresFree
  • memFree_MB

Component instance = shuffleService

Note: applies to the shuffle service

  • blockTransferRate (meter) - rate of blocks being transferred
  • blockTransferMessageRate (meter) - rate of block transfer messages, i.e. if batch fetches are enabled, this represents number of batches rather than number of blocks
  • blockTransferRateBytes (meter)
  • blockTransferAvgTime_1min (gauge - 1-minute moving average)
  • numActiveConnections.count
  • numRegisteredConnections.count
  • numCaughtExceptions.count
  • openBlockRequestLatencyMillis (histogram)
  • registerExecutorRequestLatencyMillis (histogram)
  • registeredExecutorsSize
  • shuffle-server.usedDirectMemory
  • shuffle-server.usedHeapMemory

Advanced Instrumentation

Several external tools can be used to help profile the performance of Spark jobs:

  • Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.
  • OS profiling tools such as dstat, iostat, and iotop can provide fine-grained profiling on individual nodes.
  • JVM utilities such as jstack for providing stack traces, jmap for creating heap-dumps, jstat for reporting time-series statistics and jconsole for visually exploring various JVM properties are useful for those comfortable with JVM internals.

Spark also provides a plugin API so that custom instrumentation code can be added to Spark applications. There are two configuration keys available for loading plugins into Spark:

  • spark.plugins
  • spark.plugins.defaultList

Both take a comma-separated list of class names that implement the org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it‘s possible for one list to be placed in the Spark default config file, allowing users to easily add other plugins from the command line without overwriting the config file’s list. Duplicate plugins are ignored.