clean up WalLogAggregateProcess.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index dc628ec..f4a670b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,7 +1,7 @@
package org.apache.s2graph.s2jobs.wal
import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.s2jobs.wal.process.AggregateParam
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
@@ -36,6 +36,8 @@
}
}
+case class DimVal(dim: String, value: String)
+
case class WalLogAgg(from: String,
logs: Seq[WalLog],
maxTs: Long,
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
new file mode 100644
index 0000000..7d04334
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
@@ -0,0 +1,146 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import play.api.libs.json.{JsObject, Json}
+
+import scala.collection.mutable
+
+object FeatureIndexProcess {
+ def extractDimValues(transformers: Seq[Transformer]) = {
+ udf((rows: Seq[Row]) => {
+ val logs = rows.map(WalLog.fromRow)
+ // TODO: this can be changed into Map to count how many times each dimVal exist in sequence of walLog
+ // then change this to mutable.Map.empty[DimVal, Int], then aggregate.
+ val distinctDimValues = mutable.Set.empty[DimVal]
+
+ logs.foreach { walLog =>
+ walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+ transformers.foreach { transformer =>
+ transformer.toDimValLs(walLog, propsKey, propsValue).foreach(distinctDimValues += _)
+ }
+ }
+ }
+
+ distinctDimValues.toSeq
+ })
+ }
+
+ def buildDictionary(ss: SparkSession,
+ allDimVals: DataFrame,
+ param: FeatureIndexParam,
+ dimValColumnName: String = "dimVal"): DataFrame = {
+ import ss.implicits._
+
+ val rawFeatures = allDimVals
+ .select(col(param._countColumnName), col(s"$dimValColumnName.dim").as("dim"), col(s"$dimValColumnName.value").as("value"))
+ .groupBy("dim", "value")
+ .agg(countDistinct(param._countColumnName).as("count"))
+ .filter(s"count > ${param._minUserCount}")
+
+ val ds: Dataset[((String, Long), String)] =
+ rawFeatures.select("dim", "value", "count").as[(String, String, Long)]
+ .map { case (dim, value, uv) =>
+ (dim, uv) -> value
+ }
+
+
+ implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse)
+
+ val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, param.numOfPartitions, param.samplePointsPerPartitionHint)
+
+ rdd.toDF("rank", "dim_count", "value")
+ .withColumn("dim", col("dim_count._1"))
+ .withColumn("count", col("dim_count._2"))
+ .select("dim", "value", "count", "rank")
+ }
+
+ def toFeatureHash(dim: String, value: String): Long = {
+ Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
+ }
+
+ def collectDistinctFeatureHashes(ss: SparkSession,
+ filteredDict: DataFrame): Array[Long] = {
+ import ss.implicits._
+
+ val featureHashUDF = udf((dim: String, value: String) => toFeatureHash(dim, value))
+
+ filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value")))
+ .select("featureHash")
+ .distinct().as[Long].collect()
+ }
+
+ def filterTopKsPerDim(dict: DataFrame,
+ maxRankPerDim: Broadcast[Map[String, Int]],
+ defaultMaxRank: Int): DataFrame = {
+ val filterUDF = udf((dim: String, rank: Long) => {
+ rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
+ })
+
+ dict.filter(filterUDF(col("dim"), col("rank")))
+ }
+}
+
+case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+ import FeatureIndexProcess._
+
+ override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+ val countColumnName = taskConf.options.getOrElse("countColumnName", "from")
+ val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+ val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
+ val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
+ val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
+ val json = Json.parse(s).as[JsObject]
+ json.fieldSet.map { case (key, jsValue) => key -> jsValue.as[Int] }.toMap
+ }
+ val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
+ val dictPath = taskConf.options.get("dictPath")
+
+ numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
+
+ // val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
+
+ val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName = Option(countColumnName),
+ numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint,
+ maxRankPerDim = maxRankPerDim, defaultMaxRank = defaultMaxRank, dictPath = dictPath
+ )
+
+ val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) =>
+ prev.union(inputMap(cur))
+ }
+
+ //TODO: user expect to inject transformers that transform (WalLog, propertyKey, propertyValue) to Seq[DimVal].
+ val transformers = Seq(DefaultTransformer())
+ val dimValExtractUDF = extractDimValues(transformers)
+ val dimValColumnName = "dimVal"
+
+ val rawFeatures = edges
+ .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs"))))
+
+ val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
+
+ dict
+ //TODO: filter topKs per dim, then build valid dimValLs.
+ // then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal.
+
+ // dictPath.foreach { path => dict.write.mode(SaveMode.Overwrite).parquet(path) }
+ //
+ // val filteredDict = filterTopKsPerDim(dict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue))
+ // val distinctFeatureHashes = collectDistinctFeatureHashes(ss, filteredDict)
+ // val distinctFeatureHashesBCast = ss.sparkContext.broadcast(distinctFeatureHashes)
+
+ // filteredDict
+ }
+
+
+ override def mandatoryOptions: Set[String] = Set.empty
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index 0c9829c..2b42e20 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -1,29 +1,10 @@
package org.apache.s2graph.s2jobs.wal.process
import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg}
import org.apache.spark.sql._
-object AggregateParam {
- val defaultGroupByKeys = Seq("from")
- val defaultTopK = 1000
- val defaultIsArrayType = false
- val defaultShouldSortTopItems = true
-}
-
-case class AggregateParam(groupByKeys: Option[Seq[String]],
- topK: Option[Int],
- isArrayType: Option[Boolean],
- shouldSortTopItems: Option[Boolean]) {
-
- import AggregateParam._
-
- val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
- val heapSize = topK.getOrElse(defaultTopK)
- val arrayType = isArrayType.getOrElse(defaultIsArrayType)
- val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
-}
-
object WalLogAggregateProcess {
def aggregate(ss: SparkSession,
dataset: Dataset[WalLogAgg],
@@ -64,8 +45,8 @@
val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false)
val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false)
-
- taskConf.options.get("parallelism").foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
+ val numOfPartitions = taskConf.options.get("numOfPartitions")
+ numOfPartitions.foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
implicit val ord = WalLog.orderByTsAsc
val walLogs = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
new file mode 100644
index 0000000..89c36f4
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -0,0 +1,22 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+
+object AggregateParam {
+ val defaultGroupByKeys = Seq("from")
+ val defaultTopK = 1000
+ val defaultIsArrayType = false
+ val defaultShouldSortTopItems = true
+}
+
+case class AggregateParam(groupByKeys: Option[Seq[String]],
+ topK: Option[Int],
+ isArrayType: Option[Boolean],
+ shouldSortTopItems: Option[Boolean]) {
+
+ import AggregateParam._
+
+ val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
+ val heapSize = topK.getOrElse(defaultTopK)
+ val arrayType = isArrayType.getOrElse(defaultIsArrayType)
+ val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
new file mode 100644
index 0000000..ddf7037
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+object FeatureIndexParam {
+ val defaultMinUserCount = 0L
+ val defaultCountColumnName = "from"
+}
+
+case class FeatureIndexParam(minUserCount: Option[Long],
+ countColumnName: Option[String],
+ samplePointsPerPartitionHint: Option[Int],
+ numOfPartitions: Option[Int],
+ maxRankPerDim: Option[Map[String, Int]],
+ defaultMaxRank: Option[Int],
+ dictPath: Option[String]) {
+ import FeatureIndexParam._
+ val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
+ val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
+ val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
+ val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
new file mode 100644
index 0000000..2839d6a
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
@@ -0,0 +1,3 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+case class DefaultTransformer() extends Transformer
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
new file mode 100644
index 0000000..9b09d71
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.utils.UrlUtils
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+case class ExtractDomain(urlDimensions: Set[String],
+ hostDimName: String = "host",
+ domainDimName: String = "domain",
+ keywordDimName: String = "uri_keywords") extends Transformer {
+ override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+ if (!urlDimensions(propertyKey)) Nil
+ else {
+ val (_, domains, kwdOpt) = UrlUtils.extract(propertyValue)
+
+ domains.headOption.toSeq.map(DimVal(hostDimName, _)) ++
+ domains.map(DimVal(domainDimName, _)) ++
+ kwdOpt.toSeq.map(DimVal(keywordDimName, _))
+ }
+ }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
new file mode 100644
index 0000000..7ad7dfd
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+class ExtractServiceName(serviceDims: Set[String],
+ domainServiceMap: Map[String, String] = Map.empty,
+ serviceDimName: String = "serviceName") extends Transformer {
+ override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+ if (!serviceDims(propertyKey)) Nil
+ else {
+ val serviceName = domainServiceMap.getOrElse(propertyValue, propertyValue)
+
+ Seq(DimVal(serviceDimName, serviceName))
+ }
+ }
+}
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
new file mode 100644
index 0000000..523f58d
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
@@ -0,0 +1,15 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+/**
+ * decide how to transform walLog's each property key value to Seq[DimVal]
+ */
+trait Transformer {
+ def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = {
+ val dim = s"${walLog.label}:${propertyKey}"
+ val value = propertyValue
+
+ Seq(DimVal(dim, value))
+ }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
similarity index 98%
rename from s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
rename to s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
index dabfd99..81a1356 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
@@ -8,7 +8,7 @@
import scala.annotation.tailrec
import scala.collection.mutable
-object S2EdgeDataAggregate {
+object WalLogUDAF {
type Element = (Long, String, String, String)
val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
@@ -142,7 +142,7 @@
class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
- import S2EdgeDataAggregate._
+ import WalLogUDAF._
implicit val ord = rowOrdering
@@ -195,7 +195,7 @@
}
class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
- import S2EdgeDataAggregate._
+ import WalLogUDAF._
implicit val ord = rowOrderingDesc
@@ -249,7 +249,7 @@
}
class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction {
- import S2EdgeDataAggregate._
+ import WalLogUDAF._
implicit val ord = rowOrdering
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
new file mode 100644
index 0000000..34f4a2b
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
@@ -0,0 +1,216 @@
+package org.apache.s2graph.s2jobs.wal.udfs
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.core.JSONParser
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{Dataset, Row}
+import play.api.libs.json._
+
+import scala.reflect.ClassTag
+
+object WalLogUDF {
+
+ import scala.collection.mutable
+
+ type MergedProps = Map[String, Seq[String]]
+ type MutableMergedProps = mutable.Map[String, mutable.Map[String, Int]]
+ type MutableMergedPropsInner = mutable.Map[String, Int]
+
+ def initMutableMergedPropsInner = mutable.Map.empty[String, Int]
+
+ def initMutableMergedProps = mutable.Map.empty[String, mutable.Map[String, Int]]
+
+// //TODO:
+// def toDimension(rawActivity: RawActivity, propertyKey: String): String = {
+// // val (ts, dst, label, _) = rawActivity
+// // label + "." + propertyKey
+// propertyKey
+// }
+//
+// def updateMutableMergedProps(mutableMergedProps: MutableMergedProps)(dimension: String,
+// dimensionValue: String,
+// count: Int = 1): Unit = {
+// val buffer = mutableMergedProps.getOrElseUpdate(dimension, initMutableMergedPropsInner)
+// val newCount = buffer.getOrElse(dimensionValue, 0) + count
+// buffer += (dimensionValue -> newCount)
+// }
+//
+// def groupByDimensionValues(rawActivity: RawActivity,
+// propsJson: JsObject,
+// mergedProps: MutableMergedProps,
+// toDimensionFunc: (RawActivity, String) => String,
+// excludePropKeys: Set[String] = Set.empty): Unit = {
+// propsJson.fields.filter(t => !excludePropKeys(t._1)).foreach { case (propertyKey, jsValue) =>
+// val values = jsValue match {
+// case JsString(s) => Seq(s)
+// case JsArray(arr) => arr.map(JSONParser.jsValueToString)
+// case _ => Seq(jsValue.toString())
+// }
+// val dimension = toDimensionFunc(rawActivity, propertyKey)
+//
+// values.foreach { value =>
+// updateMutableMergedProps(mergedProps)(dimension, value)
+// }
+// }
+// }
+//
+// def buildMergedProps(rawActivities: Seq[RawActivity],
+// toDimensionFunc: (RawActivity, String) => String,
+// defaultTopKs: Int = 100,
+// dimTopKs: Map[String, Int] = Map.empty,
+// excludePropKeys: Set[String] = Set.empty,
+// dimValExtractors: Seq[Extractor] = Nil): MergedProps = {
+// val mergedProps = initMutableMergedProps
+//
+// rawActivities.foreach { case rawActivity@(_, _, _, rawProps) =>
+// val propsJson = Json.parse(rawProps).as[JsObject]
+// groupByDimensionValues(rawActivity, propsJson, mergedProps, toDimensionFunc, excludePropKeys)
+// }
+// // work on extra dimVals.
+// dimValExtractors.foreach { extractor =>
+// extractor.extract(rawActivities, mergedProps)
+// }
+//
+// mergedProps.map { case (key, values) =>
+// val topK = dimTopKs.getOrElse(key, defaultTopKs)
+//
+// key -> values.toSeq.sortBy(-_._2).take(topK).map(_._1)
+// }.toMap
+// }
+//
+// def rowToRawActivity(row: Row): RawActivity = {
+// (row.getAs[Long](0), row.getAs[String](1), row.getAs[String](2), row.getAs[String](3))
+// }
+//
+// def appendMergeProps(toDimensionFunc: (RawActivity, String) => String = toDimension,
+// defaultTopKs: Int = 100,
+// dimTopKs: Map[String, Int] = Map.empty,
+// excludePropKeys: Set[String] = Set.empty,
+// dimValExtractors: Seq[Extractor] = Nil,
+// minTs: Long = 0,
+// maxTs: Long = Long.MaxValue) = udf((acts: Seq[Row]) => {
+// val rows = acts.map(rowToRawActivity).filter(act => act._1 >= minTs && act._1 < maxTs)
+//
+// buildMergedProps(rows, toDimensionFunc, defaultTopKs, dimTopKs, excludePropKeys, dimValExtractors)
+// })
+
+ val extractDimensionValues = {
+ udf((dimensionValues: Map[String, Seq[String]]) => {
+ dimensionValues.toSeq.flatMap { case (dimension, values) =>
+ values.map { value => dimension -> value }
+ }
+ })
+ }
+
+ def toHash(dimension: String, dimensionValue: String): Long = {
+ val key = s"$dimension.$dimensionValue"
+ Hashing.murmur3_128().hashBytes(key.toString.getBytes("UTF-8")).asLong()
+ }
+
+ def filterDimensionValues(validDimValues: Broadcast[Set[Long]]) = {
+ udf((dimensionValues: Map[String, Seq[String]]) => {
+ dimensionValues.map { case (dimension, values) =>
+ val filtered = values.filter { value =>
+ val hash = toHash(dimension, value)
+
+ validDimValues.value(hash)
+ }
+
+ dimension -> filtered
+ }
+ })
+ }
+
+ def appendRank[K1: ClassTag, K2: ClassTag, V: ClassTag](ds: Dataset[((K1, K2), V)],
+ numOfPartitions: Option[Int] = None,
+ samplePointsPerPartitionHint: Option[Int] = None)(implicit ordering: Ordering[(K1, K2)]) = {
+ import org.apache.spark.RangePartitioner
+ val rdd = ds.rdd
+
+ val partitioner = new RangePartitioner(numOfPartitions.getOrElse(rdd.partitions.size),
+ rdd,
+ true,
+ samplePointsPerPartitionHint = samplePointsPerPartitionHint.getOrElse(20)
+ )
+
+ val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
+
+ def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+ val initialK1: K1 = null.asInstanceOf[K1]
+ val initialK2: K2 = null.asInstanceOf[K2]
+ val initialV: V = null.asInstanceOf[V]
+ val zero = List((1L, initialK1, initialK2, initialV))
+
+ def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) =
+ (acc.head, x) match {
+ case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: V)) => {
+ val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L
+ (newOffset, curKey1, curKey2, curVal) :: acc
+ }
+ }
+
+ iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, key2, value) =>
+ (idx, offset, key1, key2, value)
+ }.toIterator
+ }
+
+ def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+ val buffer = mutable.Map.empty[K1, (Int, Long)]
+ if (!iter.hasNext) buffer.toIterator
+ else {
+ val ((k1, k2), v) = iter.next()
+ var prevKey1: K1 = k1
+ var size = 1L
+ iter.foreach { case ((k1, k2), v) =>
+ if (prevKey1 != k1) {
+ buffer += prevKey1 -> (idx, size)
+ prevKey1 = k1
+ size = 0L
+ }
+ size += 1L
+ }
+ if (size > 0) buffer += prevKey1 -> (idx, size)
+ buffer.iterator
+ }
+ }
+
+ val partRanks = sorted.mapPartitionsWithIndex(rank)
+ val _offsets = sorted.mapPartitionsWithIndex(getOffset)
+ val offsets = _offsets.groupBy(_._1).flatMap { case (k1, partitionWithSize) =>
+ val ls = partitionWithSize.toSeq.map(_._2).sortBy(_._1)
+ var sum = ls.head._2
+ val lss = ls.tail.map { case (partition, size) =>
+ val x = (partition, sum)
+ sum += size
+ x
+ }
+ lss.map { case (partition, offset) =>
+ (k1, partition) -> offset
+ }
+ }.collect()
+
+ println(offsets)
+
+ val offsetsBCast = ds.sparkSession.sparkContext.broadcast(offsets)
+
+ def adjust(iter: Iterator[(Int, Long, K1, K2, V)], startOffsets: Map[(K1, Int), Long]) = {
+ iter.map { case (partition, rankInPartition, key1, key2, value) =>
+ val startOffset = startOffsets.getOrElse((key1, partition), 0L)
+ val rank = startOffset + rankInPartition
+
+ (partition, rankInPartition, rank, (key1, key2), value)
+ }
+ }
+
+ val withRanks = partRanks
+ .mapPartitions { iter =>
+ val startOffsets = offsetsBCast.value.toMap
+ adjust(iter, startOffsets)
+ }.map { case (_, _, rank, (key1, key2), value) =>
+ (rank, (key1, key2), value)
+ }
+
+ withRanks
+ }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
new file mode 100644
index 0000000..2941357
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
@@ -0,0 +1,57 @@
+package org.apache.s2graph.s2jobs.wal.utils
+
+import java.net.{URI, URLDecoder}
+
+import scala.util.matching.Regex
+
+object UrlUtils {
+ val pattern = new Regex("""(\\x[0-9A-Fa-f]{2}){3}""")
+ val koreanPattern = new scala.util.matching.Regex("([가-힣]+[\\-_a-zA-Z 0-9]*)+|([\\-_a-zA-Z 0-9]+[가-힣]+)")
+
+
+ // url extraction functions
+ def urlDecode(url: String): (Boolean, String) = {
+ try {
+ val decoded = URLDecoder.decode(url, "UTF-8")
+ (url != decoded, decoded)
+ } catch {
+ case e: Exception => (false, url)
+ }
+ }
+
+ def hex2String(url: String): String = {
+ pattern replaceAllIn(url, m => {
+ new String(m.toString.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte), "utf-8")
+ })
+ }
+
+ def toDomains(url: String, maxDepth: Int = 3): Seq[String] = {
+ val uri = new URI(url)
+ val domain = uri.getHost
+ if (domain == null) Nil
+ else {
+ val paths = uri.getPath.split("/")
+ if (paths.isEmpty) Seq(domain)
+ else {
+ val depth = Math.min(maxDepth, paths.size)
+ (1 to depth).map { ith =>
+ domain + paths.take(ith).mkString("/")
+ }
+ }
+ }
+ }
+
+ def extract(_url: String): (String, Seq[String], Option[String]) = {
+ try {
+ val url = hex2String(_url)
+ val (encoded, decodedUrl) = urlDecode(url)
+
+ val kwdOpt = koreanPattern.findAllMatchIn(decodedUrl).toList.map(_.group(0)).headOption.map(_.replaceAll("\\s", ""))
+ val domains = toDomains(url.replaceAll(" ", ""))
+ (decodedUrl, domains, kwdOpt)
+ } catch {
+ case e: Exception => (_url, Nil, None)
+ }
+ }
+}
+
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
new file mode 100644
index 0000000..5089390
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
@@ -0,0 +1,26 @@
+package org.apache.s2graph.s2jobs.wal
+
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.scalatest.{FunSuite, Matchers}
+
+class TransformerTest extends FunSuite with Matchers {
+ val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1, "url": "www.google.com"}""")
+
+ test("test default transformer") {
+ val transformer = new DefaultTransformer
+ val dimVals = transformer.toDimValLs(walLog, "name", "1")
+
+ dimVals shouldBe Seq(DimVal("friends:name", "1"))
+ }
+
+ test("test ExtractDomain from URL") {
+ val transformer = new ExtractDomain(urlDimensions = Set("url"))
+ val dimVals = transformer.toDimValLs(walLog, "url", "http://www.google.com/abc")
+
+ dimVals shouldBe Seq(
+ DimVal("host", "www.google.com"),
+ DimVal("domain", "www.google.com"),
+ DimVal("domain", "www.google.com/abc")
+ )
+ }
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
index 5ae595b..5753411 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
@@ -31,9 +31,8 @@
val edges = spark.createDataset(walLogsLs).toDF()
val inputMap = Map("edges" -> edges)
val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"),
- options = Map("maxNumOfEdges" -> "10",
- "runOrderBy" -> "false",
- "groupByAggClassName" -> "GroupByAggOptimized"))
+ options = Map("maxNumOfEdges" -> "10")
+ )
val job = new WalLogAggregateProcess(taskConf = taskConf)
val processed = job.execute(spark, inputMap)
@@ -48,7 +47,7 @@
val prev: Array[Int] = Array(3, 2, 1)
val cur: Array[Int] = Array(4, 2, 2)
- val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
+ val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
println(ls.size)
ls.foreach { x =>
@@ -57,7 +56,7 @@
}
test("addToTopK test.") {
- import S2EdgeDataAggregate._
+ import WalLogUDAF._
val numOfTest = 100
val numOfNums = 100
val maxNum = 10