package org.apache.s2graph.core
import java.util
import java.util.function.{BiConsumer, Consumer}
import org.apache.s2graph.core.S2Vertex.Props
import org.apache.s2graph.core.schema.{ColumnMeta, Label, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerValLike, VertexId}
import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex, VertexProperty}
import play.api.libs.json.Json
import scala.collection.JavaConverters._
import scala.concurrent.Await
trait S2VertexLike extends Vertex with GraphElement {
val graph: S2GraphLike
val id: VertexId
val ts: Long
val props: Props
val op: Byte
val belongLabelIds: Seq[Int]
val innerId = id.innerId
val innerIdVal = innerId.value
val builder = new S2VertexBuilder(this)
def label(): String = serviceColumn.columnName
def schemaVer = serviceColumn.schemaVersion
def serviceColumn = ServiceColumn.findById(id.colId)
def columnName = serviceColumn.columnName
lazy val service = Service.findById(serviceColumn.serviceId)
lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
def defaultProps: util.HashMap[String, S2VertexProperty[_]] = builder.defaultProps
def toLogString(): String = {
val propsWithName = PostProcess.s2VertexPropsJsonString(this)
val (serviceName, columnName) =
if (!id.storeColId) ("", "")
else (serviceColumn.service.serviceName, serviceColumn.columnName)
Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t")
def vertices(direction: Direction, edgeLabels: String*): util.Iterator[Vertex] = {
val arr = new util.ArrayList[Vertex]()
edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] {
override def accept(edge: Edge): Unit = {
val s2Edge = edge.asInstanceOf[S2EdgeLike]
s2Edge.getDirection() match {
case "out" => arr.add(edge.inVertex())
case "in" => arr.add(edge.outVertex())
case _ => throw new IllegalStateException("only out/in direction can be found in S2Edge")
def edges(direction: Direction, labelNames: String*): util.Iterator[Edge] = {
val labelNameWithDirs =
if (labelNames.isEmpty) {
// TODO: Let's clarify direction
if (direction == Direction.BOTH) {
Label.findBySrcColumnId(id.colId).map(l => l.label -> ++
Label.findByTgtColumnId(id.colId).map(l => l.label ->
} else if (direction == Direction.IN) {
Label.findByTgtColumnId(id.colId).map(l => l.label ->
} else {
Label.findBySrcColumnId(id.colId).map(l => l.label ->
} else {
direction match {
case Direction.BOTH =>
Seq(Direction.OUT, Direction.IN).flatMap { dir => -> }
case _ => ->
graph.fetchEdges(this, labelNameWithDirs.distinct)
// do no save to storage
def propertyInner[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
S2Property.assertValidProp(key, value)
cardinality match {
case Cardinality.single =>
val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Vertex."))
val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
props.put(key, newProps)
case _ => throw new RuntimeException("only single cardinality is supported.")
def property[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
S2Property.assertValidProp(key, value)
cardinality match {
case Cardinality.single =>
val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Vertex."))
val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
props.put(key, newProps)
// FIXME: save to persistent for tp test
// graph.addVertexInner(this)
graph.addVertex(id, ts, props, op, belongLabelIds)
case _ => throw new RuntimeException("only single cardinality is supported.")
def addEdge(labelName: String, vertex: Vertex, kvs: AnyRef*): Edge = {
graph.addEdge(this, labelName, vertex, kvs: _*)
def property[V](key: String): VertexProperty[V] = {
if (props.containsKey(key)) {
} else {
def properties[V](keys: String*): util.Iterator[VertexProperty[V]] = {
val ls = new util.ArrayList[VertexProperty[V]]()
if (keys.isEmpty) {
props.forEach(new BiConsumer[String, VertexProperty[_]] {
override def accept(key: String, property: VertexProperty[_]): Unit = {
if (!ColumnMeta.reservedMetaNamesSet(key) && property.isPresent && key !=
} else {
keys.foreach { key =>
val prop = property[V](key)
if (prop.isPresent) ls.add(prop)
def remove(): Unit = {
if (graph.features().vertex().supportsRemoveVertices()) {
// remove edge
// TODO: remove related edges also.
implicit val ec =
val verticesToDelete = Seq(builder.copyVertex(op = GraphUtil.operations("delete")))
val vertexFuture = graph.mutateVertices(verticesToDelete, withWait = true)
val future = for {
vertexSuccess <- vertexFuture
edges <- graph.edgesAsync(this, Direction.BOTH)
} yield {
edges.asScala.toSeq.foreach { edge => edge.remove() }
if (!vertexSuccess.forall(_.isSuccess)) throw new RuntimeException("Vertex.remove vertex delete failed.")
Await.result(future, graph.WaitTimeout)
} else {
throw Vertex.Exceptions.vertexRemovalNotSupported()
def propertyValue(key: String): Option[InnerValLike] =
S2VertexPropertyHelper.propertyValue(this, key)
def propertyValueInner(columnMeta: ColumnMeta): InnerValLike =
S2VertexPropertyHelper.propertyValueInner(this, columnMeta)
def propertyValues(keys: Seq[String] = Nil): Map[ColumnMeta, InnerValLike] = {
S2VertexPropertyHelper.propertyValuesInner(this, S2VertexPropertyHelper.toColumnMetas(this, keys))
def propertyValuesInner(columnMetas: Seq[ColumnMeta] = Nil): Map[ColumnMeta, InnerValLike] = {
S2VertexPropertyHelper.propertyValuesInner(this, columnMetas)