blob: 05423b1bf901ae8d687cf4a513eba3395274317e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.counter.loader.core
import kafka.producer.KeyedMessage
import org.apache.s2graph.core.GraphUtil
import org.apache.s2graph.counter.TrxLog
import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
import org.apache.s2graph.counter.core._
import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, ExactStorageGraph}
import org.apache.s2graph.counter.loader.config.StreamingConfig
import org.apache.s2graph.counter.loader.models.DefaultCounterModel
import org.apache.s2graph.counter.models.{Counter, DBModel}
import org.apache.s2graph.spark.config.S2ConfigFactory
import org.apache.s2graph.spark.spark.WithKafka
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulable, Logging}
import play.api.libs.json.{JsNumber, JsString, JsValue, Json}
import scala.collection.mutable.{HashMap => MutableHashMap}
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import scala.util.Try
object CounterFunctions extends Logging with WithKafka {
private val K_MAX = 500
implicit val ec = ExecutionContext.Implicits.global
val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config))
val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config))
lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS)
type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, Long)]
val initialize = {
logInfo("initialize CounterFunctions")
DBModel.initialize(S2ConfigFactory.config)
true
}
def getCountValue(policy: Counter, item: CounterEtlItem): ExactValueMap = {
for {
dimKeys <- policy.dimensionList
dimValues <- getDimensionValues(item.dimension, dimKeys).toSeq
eq <- ExactQualifier.getQualifiers(policy.intervals.map(IntervalUnit.withName), item.ts, dimKeys.zip(dimValues).toMap)
} yield {
eq -> item.value
}
}.toMap
def getDimensionValues(dimension: JsValue, keys: Array[String]): Option[Array[String]] = {
Try {
for {
k <- keys
jsValue = dimension \ k
} yield {
jsValue match {
case JsNumber(n) => n.toString()
case JsString(s) => s
case _ => throw new Exception()
}
}
}.toOption
}
def exactMapper(item: CounterEtlItem): Option[(ExactKeyTrait, ExactValueMap)] = {
DefaultCounterModel.findByServiceAction(item.service, item.action).map { policy =>
(ExactKey(policy, item.item, checkItemType = true), getCountValue(policy, item))
}
}
def rankingMapper(row: ItemRankingRow): Seq[(RankingKey, RankingValueMap)] = {
// normal ranking
for {
(eq, rv) <- row.value
} yield {
(RankingKey(row.key.policyId, row.key.version, eq), Map(row.key.itemKey -> rv))
}
}.toSeq
def logToRankValue(log: TrxLog): Option[(ExactKeyTrait, Map[ExactQualifier, RankingValue])] = {
DefaultCounterModel.findById(log.policyId).map { policy =>
val key = ExactKey(policy, log.item, checkItemType = false)
val value = {
for {
result <- log.results
} yield {
ExactQualifier(TimedQualifier(result.interval, result.ts), result.dimension) -> RankingValue(result.result, result.value)
}
}.toMap
key -> value
}
}
def reduceValue[T, U](op: (U, U) => U, default: U)(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = {
m1 ++ m2.map { case (k, v) =>
k -> op(m1.getOrElse(k, default), v)
}
}
def makeExactRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[(ExactKeyTrait, ExactValueMap)] = {
rdd.mapPartitions { part =>
assert(initialize)
for {
(k, v) <- part
line <- GraphUtil.parseString(v)
item <- CounterEtlItem(line).toSeq
ev <- exactMapper(item).toSeq
} yield {
ev
}
}.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), numPartitions)
}
def makeRankingRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = {
val logRdd = makeTrxLogRdd(rdd, numPartitions)
makeRankingRddFromTrxLog(logRdd, numPartitions)
}
def makeRankingRddFromTrxLog(rdd: RDD[TrxLog], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = {
val itemRankingRdd = makeItemRankingRdd(rdd, numPartitions).cache()
try {
rankingCount(itemRankingRdd, numPartitions) union
rateRankingCount(itemRankingRdd, numPartitions) union
trendRankingCount(itemRankingRdd, numPartitions) coalesce numPartitions
} finally {
itemRankingRdd.unpersist(false)
}
}
def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = {
rdd.mapPartitions { part =>
assert(initialize)
for {
(k, v) <- part
line <- GraphUtil.parseString(v)
trxLog = Json.parse(line).as[TrxLog] if trxLog.success
} yield {
trxLog
}
}
}
def rankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = {
rdd.mapPartitions { part =>
for {
row <- part
rv <- rankingMapper(row)
} yield {
rv
}
}.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions)
}
case class ItemRankingRow(key: ExactKeyTrait, value: Map[ExactQualifier, RankingValue])
def makeItemRankingRdd(rdd: RDD[TrxLog], numPartitions: Int): RDD[ItemRankingRow] = {
rdd.mapPartitions { part =>
for {
log <- part
rv <- logToRankValue(log)
} yield {
rv
}
}.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions).mapPartitions { part =>
for {
(key, value) <- part
} yield {
ItemRankingRow(key, value)
}
}
}
def mapTrendRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, Map[ExactQualifier, RateRankingValue])] = {
for {
row <- rows
trendPolicy <- DefaultCounterModel.findByTrendActionId(row.key.policyId)
} yield {
val key = ExactKey(trendPolicy, row.key.itemKey, checkItemType = false)
val value = row.value.filter { case (eq, rv) =>
// eq filter by rate policy dimension
trendPolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys }
}.map { case (eq, rv) =>
eq -> RateRankingValue(rv.score, -1)
}
(key, value)
}
}
def mapRateRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, Map[ExactQualifier, RateRankingValue])] = {
val actionPart = {
for {
row <- rows
ratePolicy <- DefaultCounterModel.findByRateActionId(row.key.policyId)
} yield {
val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false)
val value = row.value.filter { case (eq, rv) =>
// eq filter by rate policy dimension
ratePolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys }
}.map { case (eq, rv) =>
eq -> RateRankingValue(rv.score, -1)
}
(key, value)
}
}
val basePart = {
for {
row <- rows
ratePolicy <- DefaultCounterModel.findByRateBaseId(row.key.policyId)
} yield {
val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false)
val value = row.value.filter { case (eq, rv) =>
// eq filter by rate policy dimension
ratePolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys }
}.map { case (eq, rv) =>
eq -> RateRankingValue(-1, rv.score)
}
(key, value)
}
}
actionPart ++ basePart
}
def trendRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = {
rdd.mapPartitions { part =>
mapTrendRankingValue(part.toSeq) toIterator
}.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(_, _), numPartitions).mapPartitions { part =>
val missingByPolicy = {
for {
(key, value) <- part.toSeq
trendPolicy <- DefaultCounterModel.findById(key.policyId).toSeq
actionId <- trendPolicy.rateActionId.toSeq
actionPolicy <- DefaultCounterModel.findById(actionId).toSeq
} yield {
// filter total eq
val missingQualifiersWithRRV = value.filterKeys { eq => eq.tq.q != IntervalUnit.TOTAL }
(actionPolicy, key, missingQualifiersWithRRV)
}
}.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
val filled = {
for {
(policy, missing) <- missingByPolicy.toSeq
keyWithPast = exactCounter.getPastCounts(policy, missing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
(key, current) <- missing
} yield {
val past = keyWithPast.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long])
val base = past.mapValues(l => RateRankingValue(-1, l))
// log.warn(s"trend: $policy $key -> $current $base")
key -> reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, base)
}
}
// filled.foreach(println)
{
// filter by rate threshold
for {
(key, value) <- filled
ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
(eq, rrv) <- value if rrv.baseScore >= ratePolicy.rateThreshold.getOrElse(Int.MinValue)
} yield {
(RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> rrv.rankingValue))
}
} toIterator
}.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions)
}
def rateRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = {
rdd.mapPartitions { part =>
mapRateRankingValue(part.toSeq) toIterator
}.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(_, _), numPartitions).mapPartitions { part =>
val seq = part.toSeq
// seq.foreach(x => println(s"item ranking row>> $x"))
// find and evaluate action value is -1
val actionMissingByPolicy = {
for {
(key, value) <- seq if value.exists { case (eq, rrv) => rrv.actionScore == -1 }
ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
actionId <- ratePolicy.rateActionId.toSeq
actionPolicy <- DefaultCounterModel.findById(actionId)
} yield {
(actionPolicy, key, value.filter { case (eq, rrv) => rrv.actionScore == -1 })
}
}.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
val actionFilled = {
for {
(actionPolicy, actionMissing) <- actionMissingByPolicy.toSeq
keyWithRelated = exactCounter.getRelatedCounts(actionPolicy, actionMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
(key, current) <- actionMissing
} yield {
val related = keyWithRelated.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long])
val found = related.mapValues(l => RateRankingValue(l, -1))
val filled = reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, found)
// log.warn(s"action: $key -> $found $filled")
key -> filled
}
}
// actionFilled.foreach(x => println(s"action filled>> $x"))
// find and evaluate base value is -1
val baseMissingByPolicy = {
for {
(key, value) <- seq if value.exists { case (eq, rrv) => rrv.baseScore == -1 }
ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
baseId <- ratePolicy.rateBaseId.toSeq
basePolicy <- DefaultCounterModel.findById(baseId)
} yield {
(basePolicy, key, value.filter { case (eq, rrv) => rrv.baseScore == -1 })
}
}.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
val baseFilled = {
for {
(basePolicy, baseMissing) <- baseMissingByPolicy.toSeq
keyWithRelated = exactCounter.getRelatedCounts(basePolicy, baseMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
(key, current) <- baseMissing
} yield {
val related = keyWithRelated.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long])
val found = related.mapValues(l => RateRankingValue(-1, l))
val filled = reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, found)
// log.warn(s"base: $basePolicy $key -> $found $filled")
key -> filled
}
}
// baseFilled.foreach(x => println(s"base filled>> $x"))
val alreadyFilled = {
for {
(key, value) <- seq if value.exists { case (eq, rrv) => rrv.actionScore != -1 && rrv.baseScore != -1 }
} yield {
key -> value.filter { case (eq, rrv) => rrv.actionScore != -1 && rrv.baseScore != -1 }
}
}
val rtn = {
// filter by rate threshold
for {
(key, value) <- actionFilled ++ baseFilled ++ alreadyFilled
ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
(eq, rrv) <- value if rrv.baseScore >= ratePolicy.rateThreshold.getOrElse(Int.MinValue)
} yield {
(RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> rrv.rankingValue))
}
}
rtn.toIterator
}.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions)
}
def insertBlobValue(keys: Seq[BlobExactKey], acc: HashMapAccumulable): Unit = {
val keyByPolicy = {
for {
key <- keys
policy <- DefaultCounterModel.findById(key.policyId)
} yield {
(policy, key)
}
}.groupBy(_._1).mapValues(values => values.map(_._2))
for {
(policy, allKeys) <- keyByPolicy
keys <- allKeys.grouped(10)
success <- exactCounter.insertBlobValue(policy, keys)
} yield {
success match {
case true => acc += (("BLOB", 1))
case false => acc += (("BLOBFailed", 1))
}
}
}
def updateExactCounter(counts: Seq[(ExactKeyTrait, ExactValueMap)], acc: HashMapAccumulable): Seq[TrxLog] = {
val countsByPolicy = {
for {
(key, count) <- counts
policy <- DefaultCounterModel.findById(key.policyId)
} yield {
(policy, (key, count))
}
}.groupBy { case (policy, _) => policy }.mapValues(values => values.map(_._2))
for {
(policy, allCounts) <- countsByPolicy
counts <- allCounts.grouped(10)
trxLog <- exactCounter.updateCount(policy, counts)
} yield {
trxLog.success match {
case true => acc += ((s"ExactV${policy.version}", 1))
case false => acc += ((s"ExactFailedV${policy.version}", 1))
}
trxLog
}
}.toSeq
def exactCountFromEtl(rdd: RDD[CounterEtlItem], numPartitions: Int): RDD[(ExactKeyTrait, ExactValueMap)] = {
rdd.mapPartitions { part =>
for {
item <- part
ev <- exactMapper(item).toSeq
} yield {
ev
}
}.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), numPartitions)
}
def updateRankingCounter(values: TraversableOnce[(RankingKey, RankingValueMap)], acc: HashMapAccumulable): Unit = {
assert(initialize)
val valuesByPolicy = {
for {
(key, value) <- values.toSeq
policy <- DefaultCounterModel.findById(key.policyId)
if policy.useRank && rankingCounter.ready(policy) // update only rank counter enabled and ready
} yield {
(policy, (key, value))
}
}.groupBy { case (policy, _) => policy }.mapValues(values => values.map(_._2))
for {
(policy, allValues) <- valuesByPolicy
groupedValues <- allValues.grouped(10)
} {
rankingCounter.update(groupedValues, K_MAX)
acc += ((s"RankingV${policy.version}", groupedValues.length))
}
}
def produceTrxLog(trxLogs: TraversableOnce[TrxLog]): Unit = {
for {
trxLog <- trxLogs
} {
val topic = trxLog.success match {
case true => StreamingConfig.KAFKA_TOPIC_COUNTER_TRX
case false => StreamingConfig.KAFKA_TOPIC_COUNTER_FAIL
}
val msg = new KeyedMessage[String, String](topic, s"${trxLog.policyId}${trxLog.item}", Json.toJson(trxLog).toString())
producer.send(msg)
}
}
}