blob: 27747f6e8e1647d400c980dc61a35bf8bbd4224a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.s2jobs
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage.SKeyValue
import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
import play.api.libs.json.Json
object DegreeKey {
def fromGraphElement(s2: S2Graph,
element: GraphElement,
labelMapping: Map[String, String] = Map.empty): Option[DegreeKey] = {
element match {
case v: S2Vertex => None
case e: S2Edge =>
val newLabel = labelMapping.getOrElse(e.innerLabel.label, e.innerLabel.label)
val degreeKey = DegreeKey(e.srcVertex.innerIdVal.toString, newLabel, e.getDirection())
Option(degreeKey)
case _ => None
}
}
def toEdge(s2: S2Graph, degreeKey: DegreeKey, count: Long): S2EdgeLike = {
val labelName = degreeKey.labelName
val direction = degreeKey.direction
val vertexId = degreeKey.vertexIdStr
val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB."))
val dir = GraphUtil.directions(direction)
val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
throw new RuntimeException(s"$vertexId can not be converted into innerval")
}
val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal))
val ts = System.currentTimeMillis()
val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion),
LabelMeta.degree -> InnerValLikeWithTs.withLong(count, ts, label.schemaVersion))
s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs)
}
def toSKeyValue(s2: S2Graph,
degreeKey: DegreeKey,
count: Long): Seq[SKeyValue] = {
try {
val edge = toEdge(s2, degreeKey, count)
edge.edgesWithIndex.flatMap { indexEdge =>
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
}
} catch {
case e: Exception => Nil
}
}
def toKeyValue(s2: S2Graph, degreeKey: DegreeKey, count: Long): Seq[HKeyValue] = {
toSKeyValue(s2, degreeKey, count).map(skv => new HKeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value))
}
}
case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)