blob: 6bc89647d67ee5940dad3d107277e29db49cb840 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.metrics
import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}
import com.typesafe.config.Config
import org.apache.gearpump.cluster.ClientToMaster.ReadOption
import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
import org.apache.gearpump.metrics.MetricsAggregator
import org.apache.gearpump.util.{Constants, LogUtil}
/**
* Filters the latest metrics data by specifying a
* processor Id range, and taskId range.
*/
class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator {
import org.apache.gearpump.streaming.metrics.TaskFilterAggregator._
def this(config: Config) = {
this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT))
}
override def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
: List[HistoryMetricsItem] = {
if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) {
// Returns empty set
List.empty[HistoryMetricsItem]
} else {
val parsed = Options.parse(options)
if (parsed != null) {
aggregate(parsed, inputs)
} else {
List.empty[HistoryMetricsItem]
}
}
}
def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem])
: List[HistoryMetricsItem] = {
val result = new ListBuffer[HistoryMetricsItem]
val effectiveLimit = Math.min(options.limit, maxLimit)
var count = 0
val taskIdentity = new TaskIdentity(0, 0)
while (inputs.hasNext && count < effectiveLimit) {
val item = inputs.next()
if (parseName(item.value.name, taskIdentity)) {
if (taskIdentity.processor >= options.startProcessor &&
taskIdentity.processor < options.endProcessor &&
taskIdentity.task >= options.startTask &&
taskIdentity.task < options.endTask) {
result.prepend(item)
count += 1
}
}
}
result.toList
}
// Assume the name format is: "app0.processor0.task0:sendThroughput", returns
// (processorId, taskId)
//
// returns true if success
private def parseName(name: String, result: TaskIdentity): Boolean = {
val processorStart = name.indexOf(PROCESSOR_TAG)
if (processorStart != -1) {
val taskStart = name.indexOf(TASK_TAG, processorStart + 1)
if (taskStart != -1) {
val processorId = name.substring(processorStart, taskStart).substring(PROCESSOR_TAG.length)
.toInt
result.processor = processorId
val taskEnd = name.indexOf(":", taskStart + 1)
if (taskEnd != -1) {
val taskId = name.substring(taskStart, taskEnd).substring(TASK_TAG.length).toInt
result.task = taskId
true
} else {
false
}
} else {
false
}
} else {
false
}
}
}
object TaskFilterAggregator {
val StartTask = "startTask"
val EndTask = "endTask"
val StartProcessor = "startProcessor"
val EndProcessor = "endProcessor"
val Limit = "limit"
val TASK_TAG = ".task"
val PROCESSOR_TAG = ".processor"
private class TaskIdentity(var processor: Int, var task: Int)
case class Options(
limit: Int, startTask: Int, endTask: Int, startProcessor: Int, endProcessor: Int)
private val LOG = LogUtil.getLogger(getClass)
object Options {
def acceptAll: Options = {
new Options(Int.MaxValue, 0, Int.MaxValue, 0, Int.MaxValue)
}
def parse(options: Map[String, String]): Options = {
// Do sanity check
val optionTry = Try {
val startTask = options.get(StartTask).map(_.toInt).getOrElse(0)
val endTask = options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE)
val startProcessor = options.get(StartProcessor).map(_.toInt).getOrElse(0)
val endProcessor = options.get(EndProcessor).map(_.toInt).getOrElse(Integer.MAX_VALUE)
val limit = options.get(Limit).map(_.toInt).getOrElse(DEFAULT_LIMIT)
new Options(limit, startTask, endTask, startProcessor, endProcessor)
}
optionTry match {
case Success(options) => options
case Failure(ex) =>
LOG.error("Failed to parse the options in TaskFilterAggregator. Error msg: " +
ex.getMessage)
null
}
}
}
val DEFAULT_LIMIT = 1000
val MAX_LIMIT = 1000
}