| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.s2graph.counter.core.v2 |
| |
| import com.typesafe.config.Config |
| import org.apache.http.HttpStatus |
| import org.apache.s2graph.core.mysqls.Label |
| import org.apache.s2graph.counter.config.S2CounterConfig |
| import org.apache.s2graph.counter.core |
| import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap |
| import org.apache.s2graph.counter.core._ |
| import org.apache.s2graph.counter.models.Counter |
| import org.apache.s2graph.counter.util.CartesianProduct |
| import org.slf4j.LoggerFactory |
| import play.api.libs.json._ |
| import scala.concurrent.duration._ |
| import scala.concurrent.{Await, ExecutionContext, Future} |
| |
| object ExactStorageGraph { |
| case class RespGraph(success: Boolean, result: Long) |
| implicit val respGraphFormat = Json.format[RespGraph] |
| |
| // using play-ws without play app |
| private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() |
| private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) |
| } |
| |
| case class ExactStorageGraph(config: Config) extends ExactStorage { |
| private val log = LoggerFactory.getLogger(this.getClass) |
| private val s2config = new S2CounterConfig(config) |
| |
| private val SERVICE_NAME = "s2counter" |
| private val COLUMN_NAME = "bucket" |
| private val labelPostfix = "_counts" |
| |
| val s2graphUrl = s2config.GRAPH_URL |
| val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL |
| val graphOp = new GraphOperation(config) |
| |
| import ExactStorageGraph._ |
| |
| override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { |
| import scala.concurrent.ExecutionContext.Implicits.global |
| |
| val (keyWithEq, reqJsLs) = toIncrementCountRequests(policy, counts).unzip(x => ((x._1, x._2), x._3)) |
| |
| val future = wsClient.url(s"$s2graphUrl/graphs/edges/incrementCount").post(Json.toJson(reqJsLs)).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| val respSeq = resp.json.as[Seq[RespGraph]] |
| |
| val keyWithEqResult = { |
| for { |
| ((key, eq), RespGraph(success, result)) <- keyWithEq.zip(respSeq) |
| } yield { |
| (key, (eq, result)) |
| } |
| }.groupBy(_._1).mapValues{ seq => seq.map(_._2).toMap } |
| keyWithEqResult |
| case _ => |
| throw new RuntimeException(s"update failed: $policy $counts") |
| } |
| } |
| Await.result(future, 10 second) |
| } |
| |
| def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { |
| |
| } |
| |
| private def toIncrementCountRequests(policy: Counter, |
| counts: Seq[(ExactKeyTrait, ExactValueMap)]) |
| : Seq[(ExactKeyTrait, core.ExactQualifier, JsValue)] = { |
| val labelName = policy.action + labelPostfix |
| val timestamp = System.currentTimeMillis() |
| for { |
| (exactKey, values) <- counts |
| (eq, value) <- values |
| } yield { |
| val from = exactKey.itemKey |
| val to = eq.dimension |
| val json = Json.obj( |
| "timestamp" -> timestamp, |
| "operation" -> "incrementCount", |
| "from" -> from, |
| "to" -> to, |
| "label" -> labelName, |
| "props" -> Json.obj( |
| "_count" -> value, |
| "time_unit" -> eq.tq.q.toString, |
| "time_value" -> eq.tq.ts |
| ) |
| ) |
| (exactKey, eq, json) |
| } |
| } |
| |
| override def get(policy: Counter, |
| items: Seq[String], |
| timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)], |
| dimQuery: Map[String, Set[String]]) |
| (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { |
| val labelName = policy.action + labelPostfix |
| val label = Label.findByName(labelName).get |
| // val label = labelModel.findByName(labelName).get |
| |
| val ids = Json.toJson(items) |
| |
| val dimensions = { |
| for { |
| values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList) |
| } yield { |
| dimQuery.keys.zip(values).toMap |
| } |
| } |
| |
| val stepJsLs = { |
| for { |
| (tqFrom, tqTo) <- timeRange |
| dimension <- dimensions |
| } yield { |
| val eqFrom = core.ExactQualifier(tqFrom, dimension) |
| val eqTo = core.ExactQualifier(tqTo, dimension) |
| val intervalJs = |
| s""" |
| |{ |
| | "from": { |
| | "_to": "${eqFrom.dimension}", |
| | "time_unit": "${eqFrom.tq.q}", |
| | "time_value": ${eqFrom.tq.ts} |
| | }, |
| | "to": { |
| | "_to": "${eqTo.dimension}", |
| | "time_unit": "${eqTo.tq.q}", |
| | "time_value": ${eqTo.tq.ts + 1} |
| | } |
| |} |
| """.stripMargin |
| val stepJs = |
| s""" |
| |{ |
| | "direction": "out", |
| | "limit": -1, |
| | "duplicate": "raw", |
| | "label": "$labelName", |
| | "interval": $intervalJs |
| |} |
| """.stripMargin |
| stepJs |
| } |
| } |
| |
| val reqJsStr = |
| s""" |
| |{ |
| | "srcVertices": [ |
| | {"serviceName": "${policy.service}", "columnName": "${label.srcColumnName}", "ids": $ids} |
| | ], |
| | "steps": [ |
| | { |
| | "step": [ |
| | ${stepJsLs.mkString(",")} |
| | ] |
| | } |
| | ] |
| |} |
| """.stripMargin |
| |
| val reqJs = Json.parse(reqJsStr) |
| // log.warn(s"query: ${reqJs.toString()}") |
| |
| wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| val respJs = resp.json |
| // println(respJs) |
| val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result => |
| // println(s"result: $result") |
| resultToExactKeyValues(policy, result) |
| }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }) |
| for { |
| (k, v) <- keyWithValues.toSeq |
| } yield { |
| FetchedCountsGrouped(k, v) |
| } |
| case n: Int => |
| log.warn(s"getEdges status($n): $reqJsStr") |
| // println(s"getEdges status($n): $reqJsStr") |
| Nil |
| } |
| } |
| } |
| |
| private def resultToExactKeyValues(policy: Counter, result: JsValue): (ExactKeyTrait, (core.ExactQualifier, Long)) = { |
| val from = result \ "from" match { |
| case s: JsString => s.as[String] |
| case n: JsNumber => n.as[Long].toString |
| case x: JsValue => throw new RuntimeException(s"$x's type must be string or number") |
| } |
| val dimension = (result \ "to").as[String] |
| val props = result \ "props" |
| val count = (props \ "_count").as[Long] |
| val timeUnit = (props \ "time_unit").as[String] |
| val timeValue = (props \ "time_value").as[Long] |
| (ExactKey(policy, from, checkItemType = true), (core.ExactQualifier(core.TimedQualifier(timeUnit, timeValue), dimension), count)) |
| } |
| |
| private def getInner(policy: Counter, key: ExactKeyTrait, eqs: Seq[core.ExactQualifier]) |
| (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { |
| val labelName = policy.action + labelPostfix |
| |
| Label.findByName(labelName) match { |
| case Some(label) => |
| |
| val src = Json.obj("serviceName" -> policy.service, "columnName" -> label.srcColumnName, "id" -> key.itemKey) |
| val step = { |
| val stepLs = { |
| for { |
| eq <- eqs |
| } yield { |
| val from = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts) |
| val to = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts) |
| val interval = Json.obj("from" -> from, "to" -> to) |
| Json.obj("limit" -> 1, "label" -> labelName, "interval" -> interval) |
| } |
| } |
| Json.obj("step" -> stepLs) |
| } |
| val query = Json.obj("srcVertices" -> Json.arr(src), "steps" -> Json.arr(step)) |
| // println(s"query: ${query.toString()}") |
| |
| wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(query).map { resp => |
| resp.status match { |
| case HttpStatus.SC_OK => |
| val respJs = resp.json |
| val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result => |
| resultToExactKeyValues(policy, result) |
| }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap) |
| for { |
| (key, eqWithValues) <- keyWithValues.toSeq |
| } yield { |
| FetchedCounts(key, eqWithValues) |
| } |
| case _ => |
| Nil |
| } |
| } |
| case None => |
| Future.successful(Nil) |
| } |
| } |
| |
| // for query exact qualifier |
| override def get(policy: Counter, queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])]) |
| (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { |
| val futures = { |
| for { |
| (key, eqs) <- queries |
| } yield { |
| // println(s"$key $eqs") |
| getInner(policy, key, eqs) |
| } |
| } |
| Future.sequence(futures).map(_.flatten) |
| } |
| |
| override def getBlobValue(policy: Counter, blobId: String): Option[String] = { |
| throw new RuntimeException("unsupported getBlobValue operation") |
| } |
| |
| override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { |
| throw new RuntimeException("unsupported insertBlobValue operation") |
| } |
| |
| private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = { |
| val action = policy.action |
| val counterLabelName = action + labelPostfix |
| |
| Label.findByName(counterLabelName, useCache).nonEmpty |
| } |
| |
| override def prepare(policy: Counter): Unit = { |
| val service = policy.service |
| val action = policy.action |
| |
| val graphLabel = Label.findByName(action) |
| if (graphLabel.isEmpty) { |
| throw new Exception(s"label not found. $service.$action") |
| } |
| |
| if (!existsLabel(policy, useCache = false)) { |
| val label = Label.findByName(action, useCache = false).get |
| |
| val counterLabelName = action + labelPostfix |
| val defaultJson = |
| s""" |
| |{ |
| | "label": "$counterLabelName", |
| | "srcServiceName": "$service", |
| | "srcColumnName": "${label.tgtColumnName}", |
| | "srcColumnType": "${label.tgtColumnType}", |
| | "tgtServiceName": "$SERVICE_NAME", |
| | "tgtColumnName": "$COLUMN_NAME", |
| | "tgtColumnType": "string", |
| | "indices": [ |
| | {"name": "time", "propNames": ["_to", "time_unit", "time_value"]} |
| | ], |
| | "props": [ |
| | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, |
| | {"name": "time_value", "dataType": "long", "defaultValue": 0} |
| | ], |
| | "hTableName": "${policy.hbaseTable.get}" |
| |} |
| """.stripMargin |
| val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match { |
| case Some(ttl) => |
| Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl)) |
| case None => |
| Json.parse(defaultJson) |
| } |
| |
| graphOp.createLabel(json) |
| } |
| } |
| |
| override def destroy(policy: Counter): Unit = { |
| val action = policy.action |
| |
| if (existsLabel(policy, useCache = false)) { |
| val counterLabelName = action + labelPostfix |
| |
| graphOp.deleteLabel(counterLabelName) |
| } |
| } |
| |
| override def ready(policy: Counter): Boolean = { |
| existsLabel(policy) |
| } |
| |
| // for range query |
| override def get(policy: Counter, items: Seq[String], timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)]) |
| (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { |
| throw new NotImplementedError("Not implemented") |
| } |
| } |