blob: d327b6de99f843338848a10a3339e00f43d92624 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.step.write
import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.utils.JsonUtil
/**
* write records needs to be sink
*/
case class RecordWriteStep(
name: String,
inputName: String,
filterTableNameOpt: Option[String] = None,
writeTimestampOpt: Option[Long] = None)
extends WriteStep {
def execute(context: DQContext): Try[Boolean] = Try {
val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
writeMode match {
case SimpleMode =>
// batch records
val recordsOpt = getBatchRecords(context)
// write records
recordsOpt match {
case Some(records) =>
context.getSinks(timestamp).foreach { sink =>
try {
sink.sinkBatchRecords(records, Option(name))
} catch {
case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
}
}
case _ =>
}
case TimestampMode =>
// streaming records
val (recordsOpt, emptyTimestamps) = getStreamingRecords(context)
// write records
recordsOpt.foreach { records =>
records.foreach { pair =>
val (t, strRecords) = pair
context.getSinks(t).foreach { sink =>
try {
sink.sinkRecords(strRecords, name)
} catch {
case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
}
}
}
}
emptyTimestamps.foreach { t =>
context.getSinks(t).foreach { sink =>
try {
sink.sinkRecords(Nil, name)
} catch {
case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
}
}
}
}
true
}
private def getTmst(row: Row, defTmst: Long): Long = {
try {
row.getAs[Long](ConstantColumns.tmst)
} catch {
case _: Throwable => defTmst
}
}
private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
try {
val df = context.sparkSession.table(s"`$name`")
Some(df)
} catch {
case e: Throwable =>
error(s"get data frame $name fails", e)
None
}
}
private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] =
filterTableNameOpt.flatMap(getDataFrame(context, _))
private def getBatchRecords(context: DQContext): Option[DataFrame] = {
getDataFrame(context, inputName)
}
private def getStreamingRecords(
context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
implicit val encoder: Encoder[(Long, String)] =
Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
val defTimestamp = context.contextId.timestamp
getDataFrame(context, inputName) match {
case Some(df) =>
val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match {
case Some(filterDf) =>
// timestamps with empty flag
val tmsts: Array[(Long, Boolean)] = filterDf.collect.flatMap { row =>
try {
val tmst = getTmst(row, defTimestamp)
val empty = row.getAs[Boolean](ConstantColumns.empty)
Some((tmst, empty))
} catch {
case _: Throwable => None
}
}
val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
val filterFuncOpt: Option[Long => Boolean] = if (recordTmsts.nonEmpty) {
Some((t: Long) => recordTmsts.contains(t))
} else None
(filterFuncOpt, emptyTmsts)
case _ => (Some((_: Long) => true), Set[Long]())
}
// filter timestamps need to record
filterFuncOpt match {
case Some(filterFunc) =>
val records = df.flatMap { row =>
val tmst = getTmst(row, defTimestamp)
if (filterFunc(tmst)) {
try {
val map = SparkRowFormatter.formatRow(row)
val str = JsonUtil.toJson(map)
Some((tmst, str))
} catch {
case _: Throwable => None
}
} else None
}
(Some(records.rdd.groupByKey), emptyTimestamps)
case _ => (None, emptyTimestamps)
}
case _ => (None, Set[Long]())
}
}
}