license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

  https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Monitoring

There are two ways to monitor Celeborn cluster: Prometheus metrics and REST API.

Metrics

Celeborn has a configurable metrics system based on the Dropwizard Metrics Library. This allows users to report Celeborn metrics to a variety of sinks including HTTP, JMX, CSV files and prometheus servlet. The metrics are generated by sources embedded in the Celeborn code base. They provide instrumentation for specific activities and Celeborn components. The metrics system is configured via a configuration file that Celeborn expects to be present at $CELEBORN_HOME/conf/metrics.properties. A custom file location can be specified via the celeborn.metrics.conf configuration property. Instead of using the configuration file, a set of configuration parameters with prefix celeborn.metrics.conf. can be used.

Celeborn's metrics are divided into two instances corresponding to Celeborn components. The following instances are currently supported:

  • master: The Celeborn cluster master process.
  • worker: The Celeborn cluster worker process.

Each instance can report to zero or more sinks. Sinks are contained in the org.apache.celeborn.common.metrics.sink package:

  • CsvSink: Exports metrics data to CSV files at regular intervals.
  • PrometheusServlet: Adds a servlet within the existing Celeborn REST API to serve metrics data in Prometheus format.
  • JsonServlet: Adds a servlet within the existing Celeborn REST API to serve metrics data in JSON format.
  • GraphiteSink: Sends metrics to a Graphite node.
  • LoggerSink: Scrape metrics periodically and output them to the logger files if you have enabled celeborn.metrics.loggerSink.output.enabled. This is used as safety valve to make sure the metrics data won‘t exist in the memory for a long time. If you don’t have a metrics collector to collect metrics from celeborn periodically, it's important to enable this sink.

The syntax of the metrics configuration file and the parameters available for each sink are defined in an example configuration file, $CELEBORN_HOME/conf/metrics.properties.template.

When using Celeborn configuration parameters instead of the metrics configuration file, the relevant parameter names are composed by the prefix celeborn.metrics.conf. followed by the configuration details, i.e. the parameters take the following form: celeborn.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]. This example shows a list of Celeborn configuration parameters for a CSV sink:

"celeborn.metrics.conf.*.sink.csv.class"="org.apache.celeborn.common.metrics.sink.CsvSink"
"celeborn.metrics.conf.*.sink.csv.period"="1"
"celeborn.metrics.conf.*.sink.csv.unit"=minutes
"celeborn.metrics.conf.*.sink.csv.directory"=/tmp/

Default values of the Celeborn metrics configuration are as follows:

*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
*.sink.loggerSink.class=org.apache.celeborn.common.metrics.sink.LoggerSink

Additional sources can be configured using the metrics configuration file or the configuration parameter celeborn.metrics.conf.[component_name].source.jvm.class=[source_name]. At present the no source is the available optional source. For example the following configuration parameter activates the Example source: "celeborn.metrics.conf.*.source.jvm.class"="org.apache.celeborn.common.metrics.source.ExampleSource"

Available metrics providers

Metrics used by Celeborn are of multiple types: gauge, counter, histogram, meter and timer, see Dropwizard library documentation for details. The following list of components and metrics reports the name and some details about the available metrics, grouped per component instance and source namespace. The most common time of metrics used in Celeborn instrumentation are gauges and counters. Counters can be recognized as they have the .count suffix. Timers, meters and histograms are annotated in the list, the rest of the list elements are metrics of type gauge. The large majority of metrics are active as soon as their parent component instance is configured, some metrics require also to be enabled via an additional configuration parameter, the details are reported in the list.

Master

These metrics are exposed by Celeborn master.

  • namespace=master

    Metric NameDescription
    RegisteredShuffleCountThe count of registered shuffle.
    DeviceCelebornFreeBytesThe actual usable space of Celeborn available workers for device.
    DeviceCelebornTotalBytesThe total space of Celeborn for device.
    RunningApplicationCountThe count of running applications.
    ActiveShuffleSizeThe active shuffle size of workers.
    ActiveShuffleFileCountThe active shuffle file count of workers.
    ShuffleTotalCountThe total count of shuffle including celeborn shuffle and engine built-in shuffle.
    ShuffleFallbackCountThe count of shuffle fallbacks.
    ApplicationTotalCountThe total count of application running with celeborn shuffle and engine built-in shuffle.
    ApplicationFallbackCountThe count of application fallbacks.
    WorkerCountThe count of active workers.
    LostWorkerCountThe count of workers in lost list.
    ExcludedWorkerCountThe count of workers in excluded list.
    AvailableWorkerCountThe count of workers in available list.
    ShutdownWorkerCountThe count of workers in shutdown list.
    DecommissionWorkerCountThe count of workers in decommission list.
    IsActiveMasterWhether the current master is active.
    RatisApplyCompletedIndexThe ApplyCompletedIndex of the current master node in HA mode.
    RatisApplyCompletedIndexDiffThe difference value of ApplyCompletedIndex of the master nodes in HA mode.
    PartitionSizeThe size of estimated shuffle partition.
    OfferSlotsTimeThe time for masters to handle RequestSlots request when registering shuffle.
  • namespace=CPU

    Metric NameDescription
    JVMCPUTimeThe JVM costs cpu time.
  • namespace=system

    Metric NameDescription
    LastMinuteSystemLoadThe average system load for the last minute.
    AvailableProcessorsThe amount of system available processors.
  • namespace=JVM

  • namespace=ResourceConsumption

    • notes:
      • This metrics data is generated for each user and they are identified using a metric tag.
      • This metrics also include subResourceConsumptions generated for each application of user and they are identified using applicationId tag.
    Metric NameDescription
    diskFileCountThe count of disk files consumption by each user.
    diskBytesWrittenThe amount of disk files consumption by each user.
    hdfsFileCountThe count of hdfs files consumption by each user.
    hdfsBytesWrittenThe amount of hdfs files consumption by each user.
  • namespace=ThreadPool

    • notes:
      • This metrics data is generated for each thread pool and they are identified using a metric tag by thread pool name.
    Metric NameDescription
    active_thread_countThe approximate number of threads that are actively executing tasks.
    pending_task_countThe pending task not executed in block queue.
    pool_sizeThe current number of threads in the pool.
    core_pool_sizeThe core number of threads.
    maximum_pool_sizeThe maximum allowed number of threads.
    largest_pool_sizeThe largest number of threads that have ever simultaneously been in the pool.
    is_terminatingWhether this executor is in the process of terminating after shutdown() or shutdownNow() but has not completely terminated.
    is_terminatedWhether this executor is in the process of terminated after shutdown() or shutdownNow() and has completely terminated.
    is_shutdownWhether this executor is shutdown.
    thread_countThe thread count of current thread group.
    thread_is_terminated_countThe terminated thread count of current thread group.
    thread_is_shutdown_countThe shutdown thread count of current thread group.

Worker

These metrics are exposed by Celeborn worker.

  • namespace=worker

    Metric NameDescription
    RegisteredShuffleCountThe count of registered shuffle.
    RunningApplicationCountThe count of running applications.
    ActiveShuffleSizeThe active shuffle size of a worker including master replica and slave replica.
    ActiveShuffleFileCountThe active shuffle file count of a worker including master replica and slave replica.
    OpenStreamTimeThe time for a worker to process openStream RPC and return StreamHandle.
    FetchChunkTimeThe time for a worker to fetch a chunk which is 8MB by default from a reduced partition.
    FetchChunkTransferTimeThe time for a worker to transfer for fetching a chunk from a reduced partition.
    ActiveChunkStreamCountActive stream count for reduce partition reading streams.
    OpenStreamSuccessCountThe count of opening stream succeed in current worker.
    OpenStreamFailCountThe count of opening stream failed in current worker.
    FetchChunkSuccessCountThe count of fetching chunk succeed in current worker.
    FetchChunkFailCountThe count of fetching chunk failed in current worker.
    FetchChunkTransferSizeThe size of transfer for fetching chunk in current worker.
    PrimaryPushDataTimeThe time for a worker to handle a pushData RPC sent from a celeborn client.
    ReplicaPushDataTimeThe time for a worker to handle a pushData RPC sent from a celeborn worker by replicating.
    PrimarySegmentStartTimeThe time for a worker to handle a segmentStart RPC sent from a celeborn client.
    ReplicaSegmentStartTimeThe time for a worker to handle a segmentStart RPC sent from a celeborn worker by replicating.
    WriteDataHardSplitCountThe count of writing PushData or PushMergedData to HARD_SPLIT partition in current worker.
    WriteDataSuccessCountThe count of writing PushData or PushMergedData succeed in current worker.
    WriteDataFailCountThe count of writing PushData or PushMergedData failed in current worker.
    ReplicateDataFailCountThe count of replicating PushData or PushMergedData failed in current worker.
    ReplicateDataWriteFailCountThe count of replicating PushData or PushMergedData failed caused by write failure in peer worker.
    ReplicateDataCreateConnectionFailCountThe count of replicating PushData or PushMergedData failed caused by creating connection failed in peer worker.
    ReplicateDataConnectionExceptionCountThe count of replicating PushData or PushMergedData failed caused by connection exception in peer worker.
    ReplicateDataFailNonCriticalCauseCountThe count of replicating PushData or PushMergedData failed caused by non-critical exception in peer worker.
    ReplicateDataTimeoutCountThe count of replicating PushData or PushMergedData failed caused by push timeout in peer worker.
    PushDataHandshakeFailCountThe count of PushDataHandshake failed in current worker.
    RegionStartFailCountThe count of RegionStart failed in current worker.
    RegionFinishFailCountThe count of RegionFinish failed in current worker.
    SegmentStartFailCountThe count of SegmentStart failed in current worker.
    PrimaryPushDataHandshakeTimePrimaryPushDataHandshake means handle PushData of primary partition location.
    ReplicaPushDataHandshakeTimeReplicaPushDataHandshake means handle PushData of replica partition location.
    PrimaryRegionStartTimePrimaryRegionStart means handle RegionStart of primary partition location.
    ReplicaRegionStartTimeReplicaRegionStart means handle RegionStart of replica partition location.
    PrimaryRegionFinishTimePrimaryRegionFinish means handle RegionFinish of primary partition location.
    ReplicaRegionFinishTimeReplicaRegionFinish means handle RegionFinish of replica partition location.
    PausePushDataStatusThe status for a worker to stop receiving pushData from clients because of back pressure.
    PausePushDataTimeThe time for a worker to stop receiving pushData from clients because of back pressure.
    PausePushDataAndReplicateTimeThe time for a worker to stop receiving pushData from clients and other workers because of back pressure.
    PausePushDataAndReplicateStatusThe status for a worker to stop receiving pushData from clients because of back pressure.
    PausePushDataThe count for a worker to stop receiving pushData from clients because of back pressure.
    PausePushDataAndReplicateThe count for a worker to stop receiving pushData from clients and other workers because of back pressure.
    PartitionFileSizeBytesThe size of partition files committed in current worker.
    TakeBufferTimeThe time for a worker to take out a buffer from a disk flusher.
    FlushDataTimeThe time for a worker to write a buffer which is 256KB by default to storage.
    CommitFilesTimeThe time for a worker to flush buffers and close files related to specified shuffle.
    CommitFilesFailCountThe count of commit files request failed in current worker.
    SlotsAllocatedSlots allocated in last hour.
    ActiveSlotsCountThe number of slots currently being used in a worker.
    ReserveSlotsTimeReserveSlots means acquire a disk buffer and record partition location.
    ActiveConnectionCountThe count of active network connection.
    NettyMemoryThe total amount of off-heap memory used by celeborn worker.
    SortTimeThe time for a worker to sort a shuffle file.
    SortMemoryThe memory used by sorting shuffle files.
    SortingFilesThe count of sorting shuffle files.
    PendingSortTasksThe count of sort tasks waiting to be submitted to FileSorterExecutors.
    SortedFilesThe count of sorted shuffle files.
    SortedFileSizeThe count of sorted shuffle files 's total size.
    DiskBufferThe memory occupied by pushData and pushMergedData which should be written to disk.
    BufferStreamReadBufferThe memory used by credit stream read buffer.
    ReadBufferDispatcherRequestsLengthThe queue size of read buffer allocation requests.
    ReadBufferAllocatedCountAllocated read buffer count.
    ActiveCreditStreamCountActive stream count for map partition reading streams.
    ActiveMapPartitionCountThe count of active map partition reading streams.
    SorterCacheHitRateThe cache hit rate for worker partition sorter index.
    CleanTaskQueueSizeThe count of task for cleaning up expired shuffle keys.
    CleanExpiredShuffleKeysTimeThe time for a worker to clean up shuffle data of expired shuffle keys.
    DeviceOSFreeBytesThe actual usable space of OS for device monitor.
    DeviceOSTotalBytesThe total usable space of OS for device monitor.
    DeviceCelebornFreeBytesThe actual usable space of Celeborn for device.
    DeviceCelebornTotalBytesThe total space of Celeborn for device.
    PotentialConsumeSpeedThe speed of potential consumption for congestion control.
    UserProduceSpeedThe speed of user production for congestion control.
    WorkerConsumeSpeedThe speed of worker consumption for congestion control.
    IsDecommissioningWorker1 means worker decommissioning, 0 means not decommissioning.
    IsHighWorkload1 means worker high workload, 0 means not high workload.
    UnreleasedShuffleCountUnreleased shuffle count when worker is decommissioning.
    UnreleasedPartitionLocationCountUnreleased partition location count when worker is shutting down.
    MemoryStorageFileCountThe count of files in Memory Storage of a worker.
    MemoryFileStorageSizeThe total amount of memory used by Memory Storage.
    EvictedFileCountThe count of files evicted from Memory Storage to Disk.
    EvictedLocalFileCountThe count of files evicted from Memory Storage to LocalDisk.
    EvictedDfsFileCountThe count of files evicted from Memory Storage to Dfs.
    DirectMemoryUsageRatioRatio of direct memory used and max direct memory.
    RegisterWithMasterFailCountThe count of failures in register with master request.
    FlushWorkingQueueSizeThe size of flush working queue for mount point.
    LocalFlushCountThe amount of data flushed to local.
    LocalFlushSizeThe size of data flushed to local.
    HdfsFlushCountThe amount of data flushed to HDFS.
    HdfsFlushSizeThe size of data flushed to HDFS.
    OssFlushCountThe amount of data flushed to OSS.
    OssFlushSizeThe size of data flushed to OSS.
    S3FlushCountThe amount of data flushed to S3.
    S3FlushSizeThe size of data flushed to S3.
    push_usedHeapMemory
    push_usedDirectMemory
    push_numHeapArenas
    push_numDirectArenas
    push_tinyCacheSize
    push_smallCacheSize
    push_normalCacheSize
    push_numThreadLocalCaches
    push_chunkSize
    push_numAllocations
    push_numTinyAllocations
    push_numSmallAllocations
    push_numNormalAllocations
    push_numHugeAllocations
    push_numDeallocations
    push_numTinyDeallocations
    push_numSmallDeallocations
    push_numNormalDeallocations
    push_numHugeDeallocations
    push_numActiveAllocations
    push_numActiveTinyAllocations
    push_numActiveSmallAllocations
    push_numActiveNormalAllocations
    push_numActiveHugeAllocations
    push_numActiveBytes
    replicate_usedHeapMemory
    replicate_usedDirectMemory
    replicate_numHeapArenas
    replicate_numDirectArenas
    replicate_tinyCacheSize
    replicate_smallCacheSize
    replicate_normalCacheSize
    replicate_numThreadLocalCaches
    replicate_chunkSize
    replicate_numAllocations
    replicate_numTinyAllocations
    replicate_numSmallAllocations
    replicate_numNormalAllocations
    replicate_numHugeAllocations
    replicate_numDeallocations
    replicate_numTinyDeallocations
    replicate_numSmallDeallocations
    replicate_numNormalDeallocations
    replicate_numHugeDeallocations
    replicate_numActiveAllocations
    replicate_numActiveTinyAllocations
    replicate_numActiveSmallAllocations
    replicate_numActiveNormalAllocations
    replicate_numActiveHugeAllocations
    replicate_numActiveBytes
    fetch_usedHeapMemory
    fetch_usedDirectMemory
    fetch_numHeapArenas
    fetch_numDirectArenas
    fetch_tinyCacheSize
    fetch_smallCacheSize
    fetch_normalCacheSize
    fetch_numThreadLocalCaches
    fetch_chunkSize
    fetch_numAllocations
    fetch_numTinyAllocations
    fetch_numSmallAllocations
    fetch_numNormalAllocations
    fetch_numHugeAllocations
    fetch_numDeallocations
    fetch_numTinyDeallocations
    fetch_numSmallDeallocations
    fetch_numNormalDeallocations
    fetch_numHugeDeallocations
    fetch_numActiveAllocations
    fetch_numActiveTinyAllocations
    fetch_numActiveSmallAllocations
    fetch_numActiveNormalAllocations
    fetch_numActiveHugeAllocations
    fetch_numActiveBytes
  • namespace=CPU

    Metric NameDescription
    JVMCPUTimeThe JVM costs cpu time.
  • namespace=system

    Metric NameDescription
    LastMinuteSystemLoadReturns the system load average for the last minute.
    AvailableProcessorsThe amount of system available processors.
  • namespace=JVM

  • namespace=ResourceConsumption

    • notes:
      • This metrics data is generated for each user and they are identified using a metric tag.
      • This metrics also include subResourceConsumptions generated for each application of user and they are identified using applicationId tag.
    Metric NameDescription
    diskFileCountThe count of disk files consumption by each user.
    diskBytesWrittenThe amount of disk files consumption by each user.
    hdfsFileCountThe count of hdfs files consumption by each user.
    hdfsBytesWrittenThe amount of hdfs files consumption by each user.
  • namespace=ThreadPool

    • notes:
      • This metrics data is generated for each thread pool and they are identified using a metric tag by thread pool name.
    Metric NameDescription
    active_thread_countThe approximate number of threads that are actively executing tasks.
    pending_task_countThe pending task not executed in block queue.
    pool_sizeThe current number of threads in the pool.
    core_pool_sizeThe core number of threads.
    maximum_pool_sizeThe maximum allowed number of threads.
    largest_pool_sizeThe largest number of threads that have ever simultaneously been in the pool.
    is_terminatingWhether this executor is in the process of terminating after shutdown() or shutdownNow() but has not completely terminated.
    is_terminatedWhether this executor is in the process of terminated after shutdown() or shutdownNow() and has completely terminated.
    is_shutdownWhether this executor is shutdown.

Note:

The Netty DirectArenaMetrics named like push/fetch/replicate_numXX are not exposed by default, nor in Grafana dashboard. If there is a need, you can enable celeborn.network.memory.allocator.verbose.metric to expose these metrics.

Setup Prometheus dashboard

  1. Install Prometheus (https://prometheus.io/). We provide an example for Prometheus config file:
# Prometheus example config
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: "Celeborn"
    metrics_path: /metrics/prometheus
    scrape_interval: 15s
    static_configs:
      - targets: [ "master-ip:9098","worker1-ip:9096","worker2-ip:9096","worker3-ip:9096","worker4-ip:9096" ]
  1. Install Grafana server (https://grafana.com/grafana/download).

  2. Import Celeborn dashboard into Grafana.

You can find the Celeborn dashboard templates under the assets/grafana directory. celeborn-dashboard.json displays Celeborn internal metrics and celeborn-jvm-dashboard.json displays Celeborn JVM related metrics.

Here is an example of Grafana dashboard importing.

g1

g2

g3

Here are some snapshots:

d1

d2

Optional

We recommend you to install node exporter (https://github.com/prometheus/node_exporter) on every host, and configure Prometheus to scrape information about the host. Grafana will need a dashboard (dashboard id:8919) to display host details.

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: "Celeborn"
    metrics_path: /metrics/prometheus
    scrape_interval: 15s
    static_configs:
      - targets: [ "master-ip:9098","worker1-ip:9096","worker2-ip:9096","worker3-ip:9096","worker4-ip:9096" ]
  - job_name: "node"
    static_configs:
      - targets: [ "master-ip:9100","worker1-ip:9100","worker2-ip:9100","worker3-ip:9100","worker4-ip:9100" ]

REST API

In addition to viewing the metrics, Celeborn also supports REST API. This gives developers an easy way to create new visualizations and monitoring tools for Celeborn and also easy for users to get the running status of the service.