blob: 34f4a2b90224dfdb59b0e45548a52a2009273bac [file] [log] [blame]
package org.apache.s2graph.s2jobs.wal.udfs
import com.google.common.hash.Hashing
import org.apache.s2graph.core.JSONParser
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row}
import play.api.libs.json._
import scala.reflect.ClassTag
object WalLogUDF {
import scala.collection.mutable
type MergedProps = Map[String, Seq[String]]
type MutableMergedProps = mutable.Map[String, mutable.Map[String, Int]]
type MutableMergedPropsInner = mutable.Map[String, Int]
def initMutableMergedPropsInner = mutable.Map.empty[String, Int]
def initMutableMergedProps = mutable.Map.empty[String, mutable.Map[String, Int]]
// //TODO:
// def toDimension(rawActivity: RawActivity, propertyKey: String): String = {
// // val (ts, dst, label, _) = rawActivity
// // label + "." + propertyKey
// propertyKey
// }
//
// def updateMutableMergedProps(mutableMergedProps: MutableMergedProps)(dimension: String,
// dimensionValue: String,
// count: Int = 1): Unit = {
// val buffer = mutableMergedProps.getOrElseUpdate(dimension, initMutableMergedPropsInner)
// val newCount = buffer.getOrElse(dimensionValue, 0) + count
// buffer += (dimensionValue -> newCount)
// }
//
// def groupByDimensionValues(rawActivity: RawActivity,
// propsJson: JsObject,
// mergedProps: MutableMergedProps,
// toDimensionFunc: (RawActivity, String) => String,
// excludePropKeys: Set[String] = Set.empty): Unit = {
// propsJson.fields.filter(t => !excludePropKeys(t._1)).foreach { case (propertyKey, jsValue) =>
// val values = jsValue match {
// case JsString(s) => Seq(s)
// case JsArray(arr) => arr.map(JSONParser.jsValueToString)
// case _ => Seq(jsValue.toString())
// }
// val dimension = toDimensionFunc(rawActivity, propertyKey)
//
// values.foreach { value =>
// updateMutableMergedProps(mergedProps)(dimension, value)
// }
// }
// }
//
// def buildMergedProps(rawActivities: Seq[RawActivity],
// toDimensionFunc: (RawActivity, String) => String,
// defaultTopKs: Int = 100,
// dimTopKs: Map[String, Int] = Map.empty,
// excludePropKeys: Set[String] = Set.empty,
// dimValExtractors: Seq[Extractor] = Nil): MergedProps = {
// val mergedProps = initMutableMergedProps
//
// rawActivities.foreach { case rawActivity@(_, _, _, rawProps) =>
// val propsJson = Json.parse(rawProps).as[JsObject]
// groupByDimensionValues(rawActivity, propsJson, mergedProps, toDimensionFunc, excludePropKeys)
// }
// // work on extra dimVals.
// dimValExtractors.foreach { extractor =>
// extractor.extract(rawActivities, mergedProps)
// }
//
// mergedProps.map { case (key, values) =>
// val topK = dimTopKs.getOrElse(key, defaultTopKs)
//
// key -> values.toSeq.sortBy(-_._2).take(topK).map(_._1)
// }.toMap
// }
//
// def rowToRawActivity(row: Row): RawActivity = {
// (row.getAs[Long](0), row.getAs[String](1), row.getAs[String](2), row.getAs[String](3))
// }
//
// def appendMergeProps(toDimensionFunc: (RawActivity, String) => String = toDimension,
// defaultTopKs: Int = 100,
// dimTopKs: Map[String, Int] = Map.empty,
// excludePropKeys: Set[String] = Set.empty,
// dimValExtractors: Seq[Extractor] = Nil,
// minTs: Long = 0,
// maxTs: Long = Long.MaxValue) = udf((acts: Seq[Row]) => {
// val rows = acts.map(rowToRawActivity).filter(act => act._1 >= minTs && act._1 < maxTs)
//
// buildMergedProps(rows, toDimensionFunc, defaultTopKs, dimTopKs, excludePropKeys, dimValExtractors)
// })
val extractDimensionValues = {
udf((dimensionValues: Map[String, Seq[String]]) => {
dimensionValues.toSeq.flatMap { case (dimension, values) =>
values.map { value => dimension -> value }
}
})
}
def toHash(dimension: String, dimensionValue: String): Long = {
val key = s"$dimension.$dimensionValue"
Hashing.murmur3_128().hashBytes(key.toString.getBytes("UTF-8")).asLong()
}
def filterDimensionValues(validDimValues: Broadcast[Set[Long]]) = {
udf((dimensionValues: Map[String, Seq[String]]) => {
dimensionValues.map { case (dimension, values) =>
val filtered = values.filter { value =>
val hash = toHash(dimension, value)
validDimValues.value(hash)
}
dimension -> filtered
}
})
}
def appendRank[K1: ClassTag, K2: ClassTag, V: ClassTag](ds: Dataset[((K1, K2), V)],
numOfPartitions: Option[Int] = None,
samplePointsPerPartitionHint: Option[Int] = None)(implicit ordering: Ordering[(K1, K2)]) = {
import org.apache.spark.RangePartitioner
val rdd = ds.rdd
val partitioner = new RangePartitioner(numOfPartitions.getOrElse(rdd.partitions.size),
rdd,
true,
samplePointsPerPartitionHint = samplePointsPerPartitionHint.getOrElse(20)
)
val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = {
val initialK1: K1 = null.asInstanceOf[K1]
val initialK2: K2 = null.asInstanceOf[K2]
val initialV: V = null.asInstanceOf[V]
val zero = List((1L, initialK1, initialK2, initialV))
def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) =
(acc.head, x) match {
case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: V)) => {
val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L
(newOffset, curKey1, curKey2, curVal) :: acc
}
}
iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, key2, value) =>
(idx, offset, key1, key2, value)
}.toIterator
}
def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = {
val buffer = mutable.Map.empty[K1, (Int, Long)]
if (!iter.hasNext) buffer.toIterator
else {
val ((k1, k2), v) = iter.next()
var prevKey1: K1 = k1
var size = 1L
iter.foreach { case ((k1, k2), v) =>
if (prevKey1 != k1) {
buffer += prevKey1 -> (idx, size)
prevKey1 = k1
size = 0L
}
size += 1L
}
if (size > 0) buffer += prevKey1 -> (idx, size)
buffer.iterator
}
}
val partRanks = sorted.mapPartitionsWithIndex(rank)
val _offsets = sorted.mapPartitionsWithIndex(getOffset)
val offsets = _offsets.groupBy(_._1).flatMap { case (k1, partitionWithSize) =>
val ls = partitionWithSize.toSeq.map(_._2).sortBy(_._1)
var sum = ls.head._2
val lss = ls.tail.map { case (partition, size) =>
val x = (partition, sum)
sum += size
x
}
lss.map { case (partition, offset) =>
(k1, partition) -> offset
}
}.collect()
println(offsets)
val offsetsBCast = ds.sparkSession.sparkContext.broadcast(offsets)
def adjust(iter: Iterator[(Int, Long, K1, K2, V)], startOffsets: Map[(K1, Int), Long]) = {
iter.map { case (partition, rankInPartition, key1, key2, value) =>
val startOffset = startOffsets.getOrElse((key1, partition), 0L)
val rank = startOffset + rankInPartition
(partition, rankInPartition, rank, (key1, key2), value)
}
}
val withRanks = partRanks
.mapPartitions { iter =>
val startOffsets = offsetsBCast.value.toMap
adjust(iter, startOffsets)
}.map { case (_, _, rank, (key1, key2), value) =>
(rank, (key1, key2), value)
}
withRanks
}
}