blob: 7ece8c21b629832a2299c755bfbb5ed26f5f8ad3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import java.util
import java.util.function.{BiConsumer, Consumer}
import org.apache.s2graph.core.S2Vertex.Props
import org.apache.s2graph.core.schema.{ColumnMeta, Label, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerValLike, VertexId}
import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex, VertexProperty}
import play.api.libs.json.Json
import scala.collection.JavaConverters._
import scala.concurrent.Await
trait S2VertexLike extends Vertex with GraphElement {
val graph: S2GraphLike
val id: VertexId
val ts: Long
val props: Props
val op: Byte
val belongLabelIds: Seq[Int]
val innerId = id.innerId
val innerIdVal = innerId.value
val builder = new S2VertexBuilder(this)
def label(): String = serviceColumn.columnName
def schemaVer = serviceColumn.schemaVersion
def serviceColumn = ServiceColumn.findById(id.colId)
def columnName = serviceColumn.columnName
lazy val service = Service.findById(serviceColumn.serviceId)
lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName)
def defaultProps: util.HashMap[String, S2VertexProperty[_]] = builder.defaultProps
def toLogString(): String = {
val propsWithName = PostProcess.s2VertexPropsJsonString(this)
val (serviceName, columnName) =
if (!id.storeColId) ("", "")
else (serviceColumn.service.serviceName, serviceColumn.columnName)
Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t")
}
def vertices(direction: Direction, edgeLabels: String*): util.Iterator[Vertex] = {
val arr = new util.ArrayList[Vertex]()
edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] {
override def accept(edge: Edge): Unit = {
val s2Edge = edge.asInstanceOf[S2EdgeLike]
s2Edge.getDirection() match {
case "out" => arr.add(edge.inVertex())
case "in" => arr.add(edge.outVertex())
case _ => throw new IllegalStateException("only out/in direction can be found in S2Edge")
}
}
})
arr.iterator()
}
def edges(direction: Direction, labelNames: String*): util.Iterator[Edge] = {
val labelNameWithDirs =
if (labelNames.isEmpty) {
// TODO: Let's clarify direction
if (direction == Direction.BOTH) {
Label.findBySrcColumnId(id.colId).map(l => l.label -> Direction.OUT.name) ++
Label.findByTgtColumnId(id.colId).map(l => l.label -> Direction.IN.name)
} else if (direction == Direction.IN) {
Label.findByTgtColumnId(id.colId).map(l => l.label -> direction.name)
} else {
Label.findBySrcColumnId(id.colId).map(l => l.label -> direction.name)
}
} else {
direction match {
case Direction.BOTH =>
Seq(Direction.OUT, Direction.IN).flatMap { dir => labelNames.map(_ -> dir.name()) }
case _ => labelNames.map(_ -> direction.name())
}
}
graph.fetchEdges(this, labelNameWithDirs.distinct)
}
// do no save to storage
def propertyInner[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
S2Property.assertValidProp(key, value)
cardinality match {
case Cardinality.single =>
val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Vertex."))
val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
props.put(key, newProps)
newProps
case _ => throw new RuntimeException("only single cardinality is supported.")
}
}
def property[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
S2Property.assertValidProp(key, value)
cardinality match {
case Cardinality.single =>
val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Vertex."))
val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
props.put(key, newProps)
// FIXME: save to persistent for tp test
// graph.addVertexInner(this)
graph.addVertex(id, ts, props, op, belongLabelIds)
newProps
case _ => throw new RuntimeException("only single cardinality is supported.")
}
}
def addEdge(labelName: String, vertex: Vertex, kvs: AnyRef*): Edge = {
graph.addEdge(this, labelName, vertex, kvs: _*)
}
def property[V](key: String): VertexProperty[V] = {
if (props.containsKey(key)) {
props.get(key).asInstanceOf[S2VertexProperty[V]]
} else {
VertexProperty.empty()
}
}
def properties[V](keys: String*): util.Iterator[VertexProperty[V]] = {
val ls = new util.ArrayList[VertexProperty[V]]()
if (keys.isEmpty) {
props.forEach(new BiConsumer[String, VertexProperty[_]] {
override def accept(key: String, property: VertexProperty[_]): Unit = {
if (!ColumnMeta.reservedMetaNamesSet(key) && property.isPresent && key != T.id.name)
ls.add(property.asInstanceOf[VertexProperty[V]])
}
})
} else {
keys.foreach { key =>
val prop = property[V](key)
if (prop.isPresent) ls.add(prop)
}
}
ls.iterator
}
def remove(): Unit = {
if (graph.features().vertex().supportsRemoveVertices()) {
// remove edge
// TODO: remove related edges also.
implicit val ec = graph.ec
val verticesToDelete = Seq(builder.copyVertex(op = GraphUtil.operations("delete")))
val vertexFuture = graph.mutateVertices(verticesToDelete, withWait = true)
val future = for {
vertexSuccess <- vertexFuture
edges <- graph.edgesAsync(this, Direction.BOTH)
} yield {
edges.asScala.toSeq.foreach { edge => edge.remove() }
if (!vertexSuccess.forall(_.isSuccess)) throw new RuntimeException("Vertex.remove vertex delete failed.")
true
}
Await.result(future, graph.WaitTimeout)
} else {
throw Vertex.Exceptions.vertexRemovalNotSupported()
}
}
def propertyValue(key: String): Option[InnerValLike] =
S2VertexPropertyHelper.propertyValue(this, key)
def propertyValueInner(columnMeta: ColumnMeta): InnerValLike =
S2VertexPropertyHelper.propertyValueInner(this, columnMeta)
def propertyValues(keys: Seq[String] = Nil): Map[ColumnMeta, InnerValLike] = {
S2VertexPropertyHelper.propertyValuesInner(this, S2VertexPropertyHelper.toColumnMetas(this, keys))
}
def propertyValuesInner(columnMetas: Seq[ColumnMeta] = Nil): Map[ColumnMeta, InnerValLike] = {
S2VertexPropertyHelper.propertyValuesInner(this, columnMetas)
}
}