| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| package org.apache.griffin.measure.datasource.cache |
| |
| import java.util.concurrent.TimeUnit |
| |
| import scala.util.Random |
| |
| import org.apache.spark.sql._ |
| |
| import org.apache.griffin.measure.Loggable |
| import org.apache.griffin.measure.context.TimeRange |
| import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient |
| import org.apache.griffin.measure.datasource.TimestampStorage |
| import org.apache.griffin.measure.step.builder.ConstantColumns |
| import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} |
| import org.apache.griffin.measure.utils.DataFrameUtil._ |
| import org.apache.griffin.measure.utils.ParamUtil._ |
| |
| |
| /** |
| * data source cache in streaming mode |
| * save data frame into hdfs in dump phase |
| * read data frame from hdfs in calculate phase |
| * with update and clean actions for the cache data |
| */ |
| trait StreamingCacheClient |
| extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable { |
| |
| val sparkSession: SparkSession |
| val param: Map[String, Any] |
| val dsName: String |
| val index: Int |
| |
| val timestampStorage: TimestampStorage |
| protected def fromUntilRangeTmsts(from: Long, until: Long) = |
| timestampStorage.fromUntil(from, until) |
| |
| protected def clearTmst(t: Long) = timestampStorage.remove(t) |
| protected def clearTmstsUntil(until: Long) = { |
| val outDateTmsts = timestampStorage.until(until) |
| timestampStorage.remove(outDateTmsts) |
| } |
| protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1) |
| protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1) |
| |
| val _FilePath = "file.path" |
| val _InfoPath = "info.path" |
| val _ReadyTimeInterval = "ready.time.interval" |
| val _ReadyTimeDelay = "ready.time.delay" |
| val _TimeRange = "time.range" |
| |
| val rdmStr = Random.alphanumeric.take(10).mkString |
| val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}" |
| val defInfoPath = s"${index}" |
| |
| val filePath: String = param.getString(_FilePath, defFilePath) |
| val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) |
| val readyTimeInterval: Long = |
| TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) |
| |
| val readyTimeDelay: Long = |
| TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) |
| |
| val deltaTimeRange: (Long, Long) = { |
| def negative(n: Long): Long = if (n <= 0) n else 0 |
| param.get(_TimeRange) match { |
| case Some(seq: Seq[String]) => |
| val nseq = seq.flatMap(TimeUtil.milliseconds(_)) |
| val ns = negative(nseq.headOption.getOrElse(0)) |
| val ne = negative(nseq.tail.headOption.getOrElse(0)) |
| (ns, ne) |
| case _ => (0, 0) |
| } |
| } |
| |
| val _ReadOnly = "read.only" |
| val readOnly = param.getBoolean(_ReadOnly, false) |
| |
| val _Updatable = "updatable" |
| val updatable = param.getBoolean(_Updatable, false) |
| |
| val newCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.new") |
| val oldCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.old") |
| |
| val newFilePath = s"${filePath}/new" |
| val oldFilePath = s"${filePath}/old" |
| |
| val defOldCacheIndex = 0L |
| |
| protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit |
| protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame |
| |
| private def readDataFrameOpt(dfr: DataFrameReader, path: String): Option[DataFrame] = { |
| val df = readDataFrame(dfr, path) |
| if (df.count() > 0) Some(df) else None |
| } |
| |
| /** |
| * save data frame in dump phase |
| * @param dfOpt data frame to be saved |
| * @param ms timestamp of this data frame |
| */ |
| def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { |
| if (!readOnly) { |
| dfOpt match { |
| case Some(df) => |
| // cache df |
| df.cache |
| |
| // cache df |
| val cnt = df.count |
| info(s"save ${dsName} data count: ${cnt}") |
| |
| if (cnt > 0) { |
| // lock makes it safer when writing new cache data |
| val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) |
| if (newCacheLocked) { |
| try { |
| val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst) |
| writeDataFrame(dfw, newFilePath) |
| } catch { |
| case e: Throwable => error(s"save data error: ${e.getMessage}") |
| } finally { |
| newCacheLock.unlock() |
| } |
| } |
| } |
| |
| // uncache df |
| df.unpersist |
| case _ => |
| info("no data frame to save") |
| } |
| |
| // submit cache time and ready time |
| if (fanIncrement(ms)) { |
| info(s"save data [${ms}] finish") |
| submitCacheTime(ms) |
| submitReadyTime(ms) |
| } |
| |
| } |
| } |
| |
| /** |
| * read data frame in calculation phase |
| * @return data frame to calculate, with the time range of data |
| */ |
| def readData(): (Option[DataFrame], TimeRange) = { |
| // time range: (a, b] |
| val timeRange = OffsetCheckpointClient.getTimeRange |
| val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) |
| |
| // read partition info |
| val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) { |
| info(s"read time range: [${reviseTimeRange._1}]") |
| s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}" |
| } else { |
| info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") |
| |
| s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " + |
| s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}" |
| } |
| |
| // new cache data |
| val newDfOpt = try { |
| val dfr = sparkSession.read |
| readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr)) |
| } catch { |
| case e: Throwable => |
| warn(s"read data source cache warn: ${e.getMessage}") |
| None |
| } |
| |
| // old cache data |
| val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None |
| val oldDfOpt = oldCacheIndexOpt.flatMap { idx => |
| val oldDfPath = s"${oldFilePath}/${idx}" |
| try { |
| val dfr = sparkSession.read |
| readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr)) |
| } catch { |
| case e: Throwable => |
| warn(s"read old data source cache warn: ${e.getMessage}") |
| None |
| } |
| } |
| |
| // whole cache data |
| val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt) |
| |
| // from until tmst range |
| val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) |
| val tmstSet = afterTilRangeTmsts(from, until) |
| |
| val retTimeRange = TimeRange(reviseTimeRange, tmstSet) |
| (cacheDfOpt, retTimeRange) |
| } |
| |
| private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String], |
| func: (Long, Long) => Boolean |
| ): Unit = { |
| val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func) |
| // delete out time data path |
| earlierOrEqPaths.foreach { path => |
| info(s"delete hdfs path: ${path}") |
| HdfsUtil.deleteHdfsPath(path) |
| } |
| } |
| private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String], |
| func: (Long, Long) => Boolean |
| ): Iterable[String] = { |
| val names = HdfsUtil.listSubPathsByType(path, "dir") |
| val regex = partitionOpt match { |
| case Some(partition) => s"^${partition}=(\\d+)$$".r |
| case _ => "^(\\d+)$".r |
| } |
| names.filter { name => |
| name match { |
| case regex(value) => |
| str2Long(value) match { |
| case Some(t) => func(t, bound) |
| case _ => false |
| } |
| case _ => false |
| } |
| }.map(name => s"${path}/${name}") |
| } |
| private def str2Long(str: String): Option[Long] = { |
| try { |
| Some(str.toLong) |
| } catch { |
| case e: Throwable => None |
| } |
| } |
| |
| /** |
| * clean out-time cached data on hdfs |
| */ |
| def cleanOutTimeData(): Unit = { |
| // clean tmst |
| val cleanTime = readCleanTime |
| cleanTime.foreach(clearTmstsTil(_)) |
| |
| if (!readOnly) { |
| // new cache data |
| val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime |
| newCacheCleanTime match { |
| case Some(nct) => |
| // clean calculated new cache data |
| val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) |
| if (newCacheLocked) { |
| try { |
| cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst), |
| (a: Long, b: Long) => (a <= b)) |
| } catch { |
| case e: Throwable => error(s"clean new cache data error: ${e.getMessage}") |
| } finally { |
| newCacheLock.unlock() |
| } |
| } |
| case _ => |
| // do nothing |
| info("should not happen") |
| } |
| |
| // old cache data |
| val oldCacheCleanTime = if (updatable) readCleanTime else None |
| oldCacheCleanTime match { |
| case Some(oct) => |
| val oldCacheIndexOpt = readOldCacheIndex |
| oldCacheIndexOpt.foreach { idx => |
| val oldDfPath = s"${oldFilePath}/${idx}" |
| val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) |
| if (oldCacheLocked) { |
| try { |
| // clean calculated old cache data |
| cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b)) |
| // clean out time old cache data not calculated |
| // cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst)) |
| } catch { |
| case e: Throwable => error(s"clean old cache data error: ${e.getMessage}") |
| } finally { |
| oldCacheLock.unlock() |
| } |
| } |
| } |
| case _ => |
| // do nothing |
| info("should not happen") |
| } |
| } |
| } |
| |
| /** |
| * update old cached data by new data frame |
| * @param dfOpt data frame to update old cached data |
| */ |
| def updateData(dfOpt: Option[DataFrame]): Unit = { |
| if (!readOnly && updatable) { |
| dfOpt match { |
| case Some(df) => |
| // old cache lock |
| val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) |
| if (oldCacheLocked) { |
| try { |
| val oldCacheIndexOpt = readOldCacheIndex |
| val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1 |
| |
| val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}" |
| val cleanTime = getNextCleanTime |
| val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}" |
| val updateDf = df.filter(filterStr) |
| |
| val prlCount = sparkSession.sparkContext.defaultParallelism |
| // repartition |
| val repartitionedDf = updateDf.repartition(prlCount) |
| val dfw = repartitionedDf.write.mode(SaveMode.Overwrite) |
| writeDataFrame(dfw, oldDfPath) |
| |
| submitOldCacheIndex(nextOldCacheIndex) |
| } catch { |
| case e: Throwable => error(s"update data error: ${e.getMessage}") |
| } finally { |
| oldCacheLock.unlock() |
| } |
| } |
| case _ => |
| info("no data frame to update") |
| } |
| } |
| } |
| |
| /** |
| * each time calculation phase finishes, |
| * data source cache needs to submit some cache information |
| */ |
| def processFinish(): Unit = { |
| // next last proc time |
| val timeRange = OffsetCheckpointClient.getTimeRange |
| submitLastProcTime(timeRange._2) |
| |
| // next clean time |
| val nextCleanTime = timeRange._2 + deltaTimeRange._1 |
| submitCleanTime(nextCleanTime) |
| } |
| |
| // read next clean time |
| private def getNextCleanTime(): Long = { |
| val timeRange = OffsetCheckpointClient.getTimeRange |
| val nextCleanTime = timeRange._2 + deltaTimeRange._1 |
| nextCleanTime |
| } |
| |
| } |