| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.spark.status |
| |
| import java.util.concurrent.atomic.AtomicLong |
| |
| import AppStatusSource.getCounter |
| import com.codahale.metrics.{Counter, Gauge, MetricRegistry} |
| |
| import org.apache.spark.SparkConf |
| import org.apache.spark.internal.config.Status.METRICS_APP_STATUS_SOURCE_ENABLED |
| import org.apache.spark.metrics.source.Source |
| |
| private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { |
| override def getValue: Long = value.get() |
| } |
| |
| private[spark] class AppStatusSource extends Source { |
| |
| override implicit val metricRegistry = new MetricRegistry() |
| |
| override val sourceName = "appStatus" |
| |
| val jobDuration = new JobDuration(new AtomicLong(0L)) |
| |
| // Duration of each job in milliseconds |
| val JOB_DURATION = metricRegistry |
| .register(MetricRegistry.name("jobDuration"), jobDuration) |
| |
| val FAILED_STAGES = getCounter("stages", "failedStages") |
| |
| val SKIPPED_STAGES = getCounter("stages", "skippedStages") |
| |
| val COMPLETED_STAGES = getCounter("stages", "completedStages") |
| |
| val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") |
| |
| val FAILED_JOBS = getCounter("jobs", "failedJobs") |
| |
| val COMPLETED_TASKS = getCounter("tasks", "completedTasks") |
| |
| val FAILED_TASKS = getCounter("tasks", "failedTasks") |
| |
| val KILLED_TASKS = getCounter("tasks", "killedTasks") |
| |
| val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") |
| |
| val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") |
| |
| val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") |
| } |
| |
| private[spark] object AppStatusSource { |
| |
| def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { |
| metricRegistry.counter(MetricRegistry.name(prefix, name)) |
| } |
| |
| def createSource(conf: SparkConf): Option[AppStatusSource] = { |
| Option(conf.get(METRICS_APP_STATUS_SOURCE_ENABLED)) |
| .filter(identity) |
| .map { _ => new AppStatusSource() } |
| } |
| } |