| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.s2graph.counter.core.v2 |
| |
| import akka.actor.ActorSystem |
| import akka.stream.ActorMaterializer |
| import com.typesafe.config.Config |
| import org.apache.commons.httpclient.HttpStatus |
| import org.apache.s2graph.core.GraphUtil |
| import org.apache.s2graph.core.schema.Label |
| import org.apache.s2graph.counter.config.S2CounterConfig |
| import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap |
| import org.apache.s2graph.counter.core.v2.ExactStorageGraph._ |
| import org.apache.s2graph.counter.core.{RankingResult, RankingKey, RankingStorage} |
| import org.apache.s2graph.counter.models.{Counter, CounterModel} |
| import org.apache.s2graph.counter.util.{CollectionCacheConfig, CollectionCache} |
| import org.asynchttpclient.DefaultAsyncHttpClientConfig |
| import org.slf4j.LoggerFactory |
| import play.api.libs.json.{JsObject, JsString, JsValue, Json} |
| import scala.concurrent.duration._ |
| import scala.concurrent.{Await, Future} |
| import scala.util.hashing.MurmurHash3 |
| |
| object RankingStorageGraph { |
| // using play-ws without play app |
| implicit val materializer = ActorMaterializer.create(ActorSystem(getClass.getSimpleName)) |
| private val builder = new DefaultAsyncHttpClientConfig.Builder() |
| private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) |
| } |
| |
| class RankingStorageGraph(config: Config) extends RankingStorage { |
| import RankingStorageGraph._ |
| |
| private[counter] val log = LoggerFactory.getLogger(this.getClass) |
| private val s2config = new S2CounterConfig(config) |
| |
| private val BUCKET_SHARD_COUNT = 53 |
| private val SERVICE_NAME = "s2counter" |
| private val BUCKET_COLUMN_NAME = "bucket" |
| private val counterModel = new CounterModel(config) |
| private val labelPostfix = "_topK" |
| |
| val s2graphUrl = s2config.GRAPH_URL |
| val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL |
| |
| val prepareCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(10000, 600)) |
| val graphOp = new GraphOperation(config) |
| import scala.concurrent.ExecutionContext.Implicits.global |
| |
| private def makeBucketKey(rankingKey: RankingKey): String = { |
| val eq = rankingKey.eq |
| val tq = eq.tq |
| s"${tq.q}.${tq.ts}.${eq.dimension}" |
| } |
| |
| // "", "age.32", "age.gender.32.M" |
| private def makeBucketShardKey(shardIdx: Int, rankingKey: RankingKey): String = { |
| s"$shardIdx.${makeBucketKey(rankingKey)}" |
| } |
| |
| /** |
| * indexProps: ["time_unit", "time_value", "score"] |
| */ |
| override def getTopK(key: RankingKey, k: Int): Option[RankingResult] = { |
| getTopK(Seq(key), k).headOption.map(_._2) |
| } |
| |
| override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] = { |
| val futures = for { |
| key <- keys |
| } yield { |
| getEdges(key).map { edges => |
| key -> RankingResult(0d, toWithScoreLs(edges).take(k)) |
| } |
| } |
| |
| Await.result(Future.sequence(futures), 10 seconds) |
| } |
| |
| override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { |
| update(Seq((key, value)), k) |
| } |
| |
| override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { |
| val futures = { |
| for { |
| (key, value) <- values |
| } yield { |
| // prepare dimension bucket edge |
| checkAndPrepareDimensionBucket(key) |
| |
| val future = getEdges(key, "raw").flatMap { edges => |
| val prevRankingSeq = toWithScoreLs(edges) |
| val prevRankingMap: Map[String, Double] = prevRankingSeq.groupBy(_._1).map(_._2.sortBy(-_._2).head) |
| val currentRankingMap: Map[String, Double] = value.mapValues(_.score) |
| val mergedRankingSeq = (prevRankingMap ++ currentRankingMap).toSeq.sortBy(-_._2).take(k) |
| val mergedRankingMap = mergedRankingSeq.toMap |
| |
| val bucketRankingSeq = mergedRankingSeq.groupBy { case (itemId, score) => |
| // 0-index |
| GraphUtil.transformHash(MurmurHash3.stringHash(itemId)) % BUCKET_SHARD_COUNT |
| }.map { case (shardIdx, groupedRanking) => |
| shardIdx -> groupedRanking.filter { case (itemId, _) => currentRankingMap.contains(itemId) } |
| }.toSeq |
| |
| insertBulk(key, bucketRankingSeq).flatMap { _ => |
| val duplicatedItems = prevRankingMap.filterKeys(s => currentRankingMap.contains(s)) |
| val cutoffItems = prevRankingMap.filterKeys(s => !mergedRankingMap.contains(s)) |
| val deleteItems = duplicatedItems ++ cutoffItems |
| |
| val keyWithEdgesLs = prevRankingSeq.map(_._1).zip(edges) |
| val deleteEdges = keyWithEdgesLs.filter{ case (s, _) => deleteItems.contains(s) }.map(_._2) |
| |
| deleteAll(deleteEdges) |
| } |
| } |
| |
| future |
| } |
| } |
| |
| Await.result(Future.sequence(futures), 10 seconds) |
| } |
| |
| private def toWithScoreLs(edges: List[JsValue]): List[(String, Double)] = { |
| for { |
| edgeJson <- edges |
| to = (edgeJson \ "to").as[JsValue] |
| score = (edgeJson \ "score").as[JsValue].toString().toDouble |
| } yield { |
| val toValue = to match { |
| case s: JsString => s.as[String] |
| case _ => to.toString() |
| } |
| toValue -> score |
| } |
| } |
| |
| private def insertBulk(key: RankingKey, newRankingSeq: Seq[(Int, Seq[(String, Double)])]): Future[Boolean] = { |
| val labelName = counterModel.findById(key.policyId).get.action + labelPostfix |
| val timestamp: Long = System.currentTimeMillis |
| val payload = Json.toJson { |
| for { |
| (shardIdx, rankingSeq) <- newRankingSeq |
| (itemId, score) <- rankingSeq |
| } yield { |
| val srcId = makeBucketShardKey(shardIdx, key) |
| Json.obj( |
| "timestamp" -> timestamp, |
| "from" -> srcId, |
| "to" -> itemId, |
| "label" -> labelName, |
| "props" -> Json.obj( |
| "time_unit" -> key.eq.tq.q.toString, |
| "time_value" -> key.eq.tq.ts, |
| "date_time" -> key.eq.tq.dateTime, |
| "score" -> score |
| ) |
| ) |
| } |
| } |
| |
| wsClient.url(s"$s2graphUrl/graphs/edges/insertBulk").post(payload).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| true |
| case _ => |
| throw new RuntimeException(s"failed insertBulk. errCode: ${resp.status}, body: ${resp.body}, query: $payload") |
| } |
| } |
| } |
| |
| private def deleteAll(edges: List[JsValue]): Future[Boolean] = { |
| // /graphs/edges/delete |
| val futures = { |
| for { |
| groupedEdges <- edges.grouped(50) |
| } yield { |
| val payload = Json.toJson(groupedEdges) |
| wsClient.url(s"$s2graphUrl/graphs/edges/delete").post(payload).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| true |
| case _ => |
| log.error(s"failed delete. errCode: ${resp.status}, body: ${resp.body}, query: $payload") |
| false |
| } |
| } |
| } |
| }.toSeq |
| |
| Future.sequence(futures).map { seq => |
| seq.forall(x => x) |
| } |
| } |
| |
| /** select and delete */ |
| override def delete(key: RankingKey): Unit = { |
| val future = getEdges(key).flatMap { edges => |
| deleteAll(edges) |
| } |
| Await.result(future, 10 second) |
| } |
| |
| private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = { |
| val labelName = counterModel.findById(key.policyId).get.action + labelPostfix |
| |
| val ids = { |
| (0 until BUCKET_SHARD_COUNT).map { shardIdx => |
| s""""${makeBucketShardKey(shardIdx, key)}"""" |
| } |
| }.mkString(",") |
| |
| val strJs = |
| s""" |
| |{ |
| | "srcVertices": [ |
| | { |
| | "serviceName": "$SERVICE_NAME", |
| | "columnName": "$BUCKET_COLUMN_NAME", |
| | "ids": [$ids] |
| | } |
| | ], |
| | "steps": [ |
| | { |
| | "step": [ |
| | { |
| | "label": "$labelName", |
| | "duplicate": "$duplicate", |
| | "direction": "out", |
| | "offset": 0, |
| | "limit": -1, |
| | "interval": { |
| | "from": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}}, |
| | "to": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}} |
| | }, |
| | "scoring": {"score": 1} |
| | } |
| | ] |
| | } |
| | ] |
| |} |
| """.stripMargin |
| log.debug(strJs) |
| |
| val payload = Json.parse(strJs) |
| wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(payload).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| (resp.json \ "results").asOpt[List[JsValue]].getOrElse(Nil) |
| case _ => |
| throw new RuntimeException(s"failed getEdges. errCode: ${resp.status}, body: ${resp.body}, query: $payload") |
| } |
| } |
| } |
| |
| private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = { |
| val action = policy.action |
| val counterLabelName = action + labelPostfix |
| |
| Label.findByName(counterLabelName, useCache).nonEmpty |
| } |
| |
| private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean = { |
| val dimension = rankingKey.eq.dimension |
| val bucketKey = makeBucketKey(rankingKey) |
| val labelName = "s2counter_topK_bucket" |
| |
| val prepared = prepareCache.withCache(s"$dimension:$bucketKey") { |
| val checkReqJs = Json.arr( |
| Json.obj( |
| "label" -> labelName, |
| "direction" -> "out", |
| "from" -> dimension, |
| "to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey) |
| ) |
| ) |
| |
| val future = wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| val checkRespJs = resp.json |
| if (checkRespJs.as[Seq[JsValue]].nonEmpty) { |
| true |
| } else { |
| false |
| } |
| case _ => |
| // throw exception |
| throw new Exception(s"failed checkEdges. ${resp.body} ${resp.status}") |
| } |
| }.flatMap { |
| case true => Future.successful(Some(true)) |
| case false => |
| val insertReqJsLs = { |
| for { |
| i <- 0 until BUCKET_SHARD_COUNT |
| } yield { |
| Json.obj( |
| "timestamp" -> rankingKey.eq.tq.ts, |
| "from" -> dimension, |
| "to" -> makeBucketShardKey(i, rankingKey), |
| "label" -> labelName, |
| "props" -> Json.obj( |
| "time_unit" -> rankingKey.eq.tq.q.toString, |
| "date_time" -> rankingKey.eq.tq.dateTime |
| ) |
| ) |
| } |
| } |
| wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| Some(true) |
| case _ => |
| // throw exception |
| throw new Exception(s"failed insertEdges. ${resp.body} ${resp.status}") |
| } |
| } |
| }.recover { |
| case e: Exception => |
| None |
| } |
| |
| Await.result(future, 10 second) |
| } |
| prepared.getOrElse(false) |
| } |
| |
| override def prepare(policy: Counter): Unit = { |
| val service = policy.service |
| val action = policy.action |
| |
| val graphLabel = { |
| policy.rateActionId match { |
| case Some(rateId) => |
| counterModel.findById(rateId, useCache = false).flatMap { ratePolicy => |
| Label.findByName(ratePolicy.action) |
| } |
| case None => |
| Label.findByName(action) |
| } |
| } |
| if (graphLabel.isEmpty) { |
| throw new Exception(s"label not found. $service.$action") |
| } |
| |
| if (!existsLabel(policy, useCache = false)) { |
| // find input label to specify target column |
| val inputLabelName = policy.rateActionId.flatMap { id => |
| counterModel.findById(id, useCache = false).map(_.action) |
| }.getOrElse(action) |
| val label = graphLabel.get |
| |
| val counterLabelName = action + labelPostfix |
| val defaultJson = |
| s""" |
| |{ |
| | "label": "$counterLabelName", |
| | "srcServiceName": "$SERVICE_NAME", |
| | "srcColumnName": "$BUCKET_COLUMN_NAME", |
| | "srcColumnType": "string", |
| | "tgtServiceName": "$service", |
| | "tgtColumnName": "${label.tgtColumnName}", |
| | "tgtColumnType": "${label.tgtColumnType}", |
| | "indices": [ |
| | {"name": "time", "propNames": ["time_unit", "time_value", "score"]} |
| | ], |
| | "props": [ |
| | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, |
| | {"name": "time_value", "dataType": "long", "defaultValue": 0}, |
| | {"name": "date_time", "dataType": "long", "defaultValue": 0}, |
| | {"name": "score", "dataType": "float", "defaultValue": 0.0} |
| | ], |
| | "hTableName": "${policy.hbaseTable.get}" |
| |} |
| """.stripMargin |
| val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match { |
| case Some(ttl) => |
| Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl)) |
| case None => |
| Json.parse(defaultJson) |
| } |
| |
| graphOp.createLabel(json) |
| } |
| } |
| |
| override def destroy(policy: Counter): Unit = { |
| val action = policy.action |
| |
| if (existsLabel(policy, useCache = false)) { |
| val counterLabelName = action + labelPostfix |
| |
| graphOp.deleteLabel(counterLabelName) |
| } |
| } |
| |
| override def ready(policy: Counter): Boolean = { |
| existsLabel(policy) |
| } |
| } |