package org.apache.griffin.measure.datasource.cache
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.util.Random
import org.apache.spark.sql._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLock
import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
import org.apache.griffin.measure.utils.DataFrameUtil._
import org.apache.griffin.measure.utils.ParamUtil._
* data source cache in streaming mode
* save data frame into hdfs in dump phase
* read data frame from hdfs in calculate phase
* with update and clean actions for the cache data
trait StreamingCacheClient
extends StreamingOffsetCacheable
with WithFanIn[Long]
with Loggable
with Serializable {
val sparkSession: SparkSession
val param: Map[String, Any]
val dsName: String
val index: Int
val timestampStorage: TimestampStorage
protected def fromUntilRangeTmsts(from: Long, until: Long): Set[Long] =
timestampStorage.fromUntil(from, until)
protected def clearTmst(t: Long): mutable.SortedSet[Long] = timestampStorage.remove(t)
protected def clearTmstsUntil(until: Long): mutable.SortedSet[Long] = {
val outDateTmsts = timestampStorage.until(until)
protected def afterTilRangeTmsts(after: Long, til: Long): Set[Long] =
fromUntilRangeTmsts(after + 1, til + 1)
protected def clearTmstsTil(til: Long): mutable.SortedSet[Long] = clearTmstsUntil(til + 1)
val _FilePath = "file.path"
val _InfoPath = "info.path"
val _ReadyTimeInterval = "ready.time.interval"
val _ReadyTimeDelay = "ready.time.delay"
val _TimeRange = "time.range"
val rdmStr: String = Random.alphanumeric.take(10).mkString
val defFilePath = s"hdfs:///griffin/cache/${dsName}_$rdmStr"
val defInfoPath = s"$index"
val filePath: String = param.getString(_FilePath, defFilePath)
val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
val readyTimeInterval: Long =
TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
val readyTimeDelay: Long =
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
def deltaTimeRange[T <: Seq[String]: Manifest]: (Long, Long) = {
def negative(n: Long): Long = if (n <= 0) n else 0
param.get(_TimeRange) match {
case Some(seq: T) =>
val nseq = seq.flatMap(TimeUtil.milliseconds)
val ns = negative(nseq.headOption.getOrElse(0))
val ne = negative(nseq.tail.headOption.getOrElse(0))
(ns, ne)
case _ => (0, 0)
val _ReadOnly = "read.only"
val readOnly: Boolean = param.getBoolean(_ReadOnly, defValue = false)
val _Updatable = "updatable"
val updatable: Boolean = param.getBoolean(_Updatable, defValue = false)
val newCacheLock: CheckpointLock = OffsetCheckpointClient.genLock(s"$")
val oldCacheLock: CheckpointLock = OffsetCheckpointClient.genLock(s"$cacheInfoPath.old")
val newFilePath = s"$filePath/new"
val oldFilePath = s"$filePath/old"
val defOldCacheIndex = 0L
protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
private def readDataFrameOpt(dfr: DataFrameReader, path: String): Option[DataFrame] = {
val df = readDataFrame(dfr, path)
if (df.count() > 0) Some(df) else None
* save data frame in dump phase
* @param dfOpt data frame to be saved
* @param ms timestamp of this data frame
def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
if (!readOnly) {
dfOpt match {
case Some(df) =>
// cache df
// cache df
val cnt = df.count
info(s"save $dsName data count: $cnt")
if (cnt > 0) {
// lock makes it safer when writing new cache data
val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
if (newCacheLocked) {
try {
val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
writeDataFrame(dfw, newFilePath)
} catch {
case e: Throwable => error(s"save data error: ${e.getMessage}")
} finally {
// uncache df
case _ =>
info("no data frame to save")
// submit cache time and ready time
if (fanIncrement(ms)) {
info(s"save data [$ms] finish")
* read data frame in calculation phase
* @return data frame to calculate, with the time range of data
def readData(): (Option[DataFrame], TimeRange) = {
// time range: (a, b]
val timeRange = OffsetCheckpointClient.getTimeRange
val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
// read partition info
val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
info(s"read time range: [${reviseTimeRange._1}]")
s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
} else {
info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " +
s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
// new cache data
val newDfOpt = try {
val dfr =
readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
} catch {
case e: Throwable =>
warn(s"read data source cache warn: ${e.getMessage}")
// old cache data
val oldCacheIndexOpt = if (updatable) readOldCacheIndex() else None
val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
val oldDfPath = s"$oldFilePath/$idx"
try {
val dfr =
readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
} catch {
case e: Throwable =>
warn(s"read old data source cache warn: ${e.getMessage}")
// whole cache data
val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
// from until tmst range
val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
val tmstSet = afterTilRangeTmsts(from, until)
val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
(cacheDfOpt, retTimeRange)
private def cleanOutTimePartitions(
path: String,
outTime: Long,
partitionOpt: Option[String],
func: (Long, Long) => Boolean): Unit = {
val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
// delete out time data path
earlierOrEqPaths.foreach { path =>
info(s"delete hdfs path: $path")
private def listPartitionsByFunc(
path: String,
bound: Long,
partitionOpt: Option[String],
func: (Long, Long) => Boolean): Iterable[String] = {
val names = HdfsUtil.listSubPathsByType(path, "dir")
val regex = partitionOpt match {
case Some(partition) => s"^$partition=(\\d+)$$".r
case _ => "^(\\d+)$".r
.filter { name =>
name match {
case regex(value) =>
str2Long(value) match {
case Some(t) => func(t, bound)
case _ => false
case _ => false
.map(name => s"$path/$name")
private def str2Long(str: String): Option[Long] = {
try {
} catch {
case _: Throwable => None
* clean out-time cached data on hdfs
def cleanOutTimeData(): Unit = {
// clean tmst
val cleanTime = readCleanTime()
if (!readOnly) {
// new cache data
val newCacheCleanTime = if (updatable) readLastProcTime() else readCleanTime()
newCacheCleanTime match {
case Some(nct) =>
// clean calculated new cache data
val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
if (newCacheLocked) {
try {
(a: Long, b: Long) => a <= b)
} catch {
case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
} finally {
case _ =>
// do nothing
info("should not happen")
// old cache data
val oldCacheCleanTime = if (updatable) readCleanTime() else None
oldCacheCleanTime match {
case Some(_) =>
val oldCacheIndexOpt = readOldCacheIndex()
oldCacheIndexOpt.foreach { idx =>
val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
if (oldCacheLocked) {
try {
// clean calculated old cache data
cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => a < b)
// clean out time old cache data not calculated
// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
} catch {
case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
} finally {
case _ =>
// do nothing
info("should not happen")
* update old cached data by new data frame
* @param dfOpt data frame to update old cached data
def updateData(dfOpt: Option[DataFrame]): Unit = {
if (!readOnly && updatable) {
dfOpt match {
case Some(df) =>
// old cache lock
val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
if (oldCacheLocked) {
try {
val oldCacheIndexOpt = readOldCacheIndex()
val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
val oldDfPath = s"$oldFilePath/$nextOldCacheIndex"
val cleanTime = getNextCleanTime
val filterStr = s"`${ConstantColumns.tmst}` > $cleanTime"
val updateDf = df.filter(filterStr)
val prlCount = sparkSession.sparkContext.defaultParallelism
// repartition
val repartitionedDf = updateDf.repartition(prlCount)
val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
writeDataFrame(dfw, oldDfPath)
} catch {
case e: Throwable => error(s"update data error: ${e.getMessage}")
} finally {
case _ =>
info("no data frame to update")
* each time calculation phase finishes,
* data source cache needs to submit some cache information
def processFinish(): Unit = {
// next last proc time
val timeRange = OffsetCheckpointClient.getTimeRange
// next clean time
val nextCleanTime = timeRange._2 + deltaTimeRange._1
// read next clean time
private def getNextCleanTime: Long = {
val timeRange = OffsetCheckpointClient.getTimeRange
val nextCleanTime = timeRange._2 + deltaTimeRange._1