blob: c9be1b01fe3e727d9677ed2321c2e8c89c782542 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.sink
import java.util.Date
import org.apache.spark.rdd.RDD
import org.apache.griffin.measure.utils.HdfsUtil
import org.apache.griffin.measure.utils.JsonUtil
import org.apache.griffin.measure.utils.ParamUtil._
/**
* sink metric and record to hdfs
*/
case class HdfsSink(
config: Map[String, Any],
metricName: String,
timeStamp: Long) extends Sink {
val block: Boolean = true
val PathKey = "path"
val MaxPersistLines = "max.persist.lines"
val MaxLinesPerFile = "max.lines.per.file"
val parentPath = config.getOrElse(PathKey, "").toString
val maxPersistLines = config.getInt(MaxPersistLines, -1)
val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 1000000)
val StartFile = filePath("_START")
val FinishFile = filePath("_FINISH")
val MetricsFile = filePath("_METRICS")
val LogFile = filePath("_LOG")
var _init = true
def available(): Boolean = {
parentPath.nonEmpty
}
private def logHead: String = {
if (_init) {
_init = false
val dt = new Date(timeStamp)
s"================ log of ${dt} ================\n"
} else ""
}
private def timeHead(rt: Long): String = {
val dt = new Date(rt)
s"--- ${dt} ---\n"
}
private def logWrap(rt: Long, msg: String): String = {
logHead + timeHead(rt) + s"${msg}\n\n"
}
protected def filePath(file: String): String = {
HdfsUtil.getHdfsFilePath(parentPath, s"${metricName}/${timeStamp}/${file}")
}
protected def withSuffix(path: String, suffix: String): String = {
s"${path}.${suffix}"
}
def start(msg: String): Unit = {
try {
HdfsUtil.writeContent(StartFile, msg)
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
def finish(): Unit = {
try {
HdfsUtil.createEmptyFile(FinishFile)
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
def log(rt: Long, msg: String): Unit = {
try {
val logStr = logWrap(rt, msg)
HdfsUtil.withHdfsFile(LogFile) { out =>
out.write(logStr.getBytes("utf-8"))
}
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
private def getHdfsPath(path: String, groupId: Int): String = {
HdfsUtil.getHdfsFilePath(path, s"${groupId}")
}
private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
}
private def clearOldRecords(path: String): Unit = {
HdfsUtil.deleteHdfsPath(path)
}
def sinkRecords(records: RDD[String], name: String): Unit = {
val path = filePath(name)
clearOldRecords(path)
try {
val recordCount = records.count
val count =
if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
if (count > 0) {
val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
if (groupCount <= 1) {
val recs = records.take(count.toInt)
sinkRecords2Hdfs(path, recs)
} else {
val groupedRecords: RDD[(Long, Iterable[String])] =
records.zipWithIndex.flatMap { r =>
val gid = r._2 / maxLinesPerFile
if (gid < groupCount) Some((gid, r._1)) else None
}.groupByKey()
groupedRecords.foreach { group =>
val (gid, recs) = group
val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
sinkRecords2Hdfs(hdfsPath, recs)
}
}
}
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
def sinkRecords(records: Iterable[String], name: String): Unit = {
val path = filePath(name)
clearOldRecords(path)
try {
val recordCount = records.size
val count =
if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
if (count > 0) {
val groupCount = (count - 1) / maxLinesPerFile + 1
if (groupCount <= 1) {
val recs = records.take(count.toInt)
sinkRecords2Hdfs(path, recs)
} else {
val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex
groupedRecords.take(groupCount).foreach { group =>
val (recs, gid) = group
val hdfsPath = getHdfsPath(path, gid)
sinkRecords2Hdfs(hdfsPath, recs)
}
}
}
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
def sinkMetrics(metrics: Map[String, Any]): Unit = {
try {
val json = JsonUtil.toJson(metrics)
sinkRecords2Hdfs(MetricsFile, json :: Nil)
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
private def sinkRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = {
try {
HdfsUtil.withHdfsFile(hdfsPath, false) { out =>
records.map { record =>
out.write((record + "\n").getBytes("utf-8"))
}
}
} catch {
case e: Throwable => error(e.getMessage, e)
}
}
}