blob: 79018c89a0f6f3388d3f0f195312cdafa825b2cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark
import org.apache.spark.SparkContext
import org.slf4j.{Logger, LoggerFactory}
object JobProgressUtil {
protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
def progress(sc: SparkContext, jobGroup : String):Int = {
// Each paragraph has one unique jobGroup, and one paragraph may run multiple times.
// So only look for the first job which match the jobGroup
val jobInfo = sc.statusTracker
.getJobIdsForGroup(jobGroup)
.headOption
.flatMap(jobId => sc.statusTracker.getJobInfo(jobId))
val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo)))
stagesInfoOption match {
case None => 0
case Some(stagesInfo) =>
val taskCount = stagesInfo.map(_.numTasks).sum
val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum
LOGGER.debug("Total TaskCount: " + taskCount)
LOGGER.debug("Completed TaskCount: " + completedTaskCount)
if (taskCount == 0) {
0
} else {
(100 * completedTaskCount.toDouble / taskCount).toInt
}
}
}
}