* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.s2graph.core
import java.lang.{Boolean => JBoolean, Long => JLong}
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{CompletableFuture, TimeUnit}
import com.typesafe.config.Config
import org.apache.commons.configuration.Configuration
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
import{MutateResponse, OptimisticEdgeFetcher, Storage}
import org.apache.s2graph.core.types.VertexId
import org.apache.tinkerpop.gremlin.structure
import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
import org.apache.tinkerpop.gremlin.structure.Graph.Variables
import{GraphReader, GraphWriter, Io, Mapper}
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Element, Graph, T, Transaction, Vertex}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
trait S2GraphLike extends Graph {
implicit val ec: ExecutionContext
var apacheConfiguration: Configuration
protected val localLongId = new AtomicLong()
protected val s2Features = new S2Features
val config: Config
val management: Management
val indexProvider: IndexProvider
val elementBuilder: GraphElementBuilder
val traversalHelper: TraversalHelper
val resourceManager: ResourceManager
lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
lazy val MaxBackOff: Int = config.getInt("")
lazy val BackoffTimeout: Int = config.getInt("")
lazy val DeleteAllFetchCount: Int = config.getInt("delete.all.fetch.count")
lazy val DeleteAllFetchSize: Int = config.getInt("delete.all.fetch.size")
lazy val FailProb: Double = config.getDouble("")
lazy val LockExpireDuration: Int = config.getInt("lock.expire.time")
lazy val MaxSize: Int = config.getInt("future.cache.max.size")
lazy val ExpireAfterWrite: Int = config.getInt("future.cache.expire.after.write")
lazy val ExpireAfterAccess: Int = config.getInt("future.cache.expire.after.access")
lazy val WaitTimeout: Duration = Duration(600, TimeUnit.SECONDS)
override def features() = s2Features
def fallback = Future.successful(StepResult.Empty)
def defaultStorage: Storage
def getStorage(service: Service): Storage
def getStorage(label: Label): Storage
def getVertexFetcher(column: ServiceColumn): VertexFetcher
def getEdgeFetcher(label: Label): EdgeFetcher
def getAllVertexFetchers(): Seq[VertexFetcher]
def getAllEdgeFetchers(): Seq[EdgeFetcher]
/** optional */
def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher
def getEdgeMutator(label: Label): EdgeMutator
def getVertexMutator(column: ServiceColumn): VertexMutator
def flushStorage(): Unit
def shutdown(modelDataDelete: Boolean = false): Unit
def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]]
def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult]
def checkEdgesJava(edges: util.List[S2EdgeLike]): CompletableFuture[StepResult] =
def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]]
def mutateVerticesJava(vertices: util.List[S2VertexLike], withWait: JBoolean): CompletableFuture[util.List[MutateResponse]] =
mutateVertices(vertices.asScala, withWait.booleanValue()).map(_.asJava).toJava.toCompletableFuture
def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]]
def mutateEdgesJava(edges: util.List[S2EdgeLike], withWait: JBoolean): CompletableFuture[util.List[MutateResponse]] =
mutateEdges(edges.asScala, withWait.booleanValue()).map(_.asJava).toJava.toCompletableFuture
def mutateElements(elements: Seq[GraphElement],
withWait: Boolean = false): Future[Seq[MutateResponse]]
def mutateElementsJava(elements: util.List[GraphElement], withWait: JBoolean): CompletableFuture[util.List[MutateResponse]] =
mutateElements(elements.asScala, withWait.booleanValue()).map(_.asJava).toJava.toCompletableFuture
def getEdges(q: Query): Future[StepResult]
def getEdgesJava(q: Query): CompletableFuture[StepResult] =
def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult]
def getEdgesMultiQueryJava(mq: MultiQuery): CompletableFuture[StepResult] =
def deleteAllAdjacentEdges(srcVertices: Seq[S2VertexLike],
labels: Seq[Label],
dir: Int,
ts: Long): Future[Boolean]
def deleteAllAdjacentEdgesJava(srcVertices: util.List[S2VertexLike], labels: util.List[Label], direction: Direction): CompletableFuture[JBoolean] =
deleteAllAdjacentEdges(srcVertices.asScala, labels.asScala, GraphUtil.toDirection(direction), System.currentTimeMillis()).map(JBoolean.valueOf(_)).toJava.toCompletableFuture
def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]]
def incrementCountsJava(edges: util.List[S2EdgeLike], withWait: JBoolean): CompletableFuture[util.List[MutateResponse]] =
incrementCounts(edges.asScala, withWait.booleanValue()).map(_.asJava).toJava.toCompletableFuture
def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse]
def updateDegreeJava(edge: S2EdgeLike, degreeVal: JLong): CompletableFuture[MutateResponse] =
updateDegree(edge, degreeVal.longValue()).toJava.toCompletableFuture
def getVertex(vertexId: VertexId): Option[S2VertexLike]
def getVertexJava(vertexId: VertexId): Optional[S2VertexLike] =
def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge]
def fetchEdgesJava(vertex: S2VertexLike, labelNameWithDirs: util.List[(String, String)]): util.Iterator[Edge] =
fetchEdges(vertex, labelNameWithDirs.asScala)
def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]]
def edgesAsyncJava(vertex: S2VertexLike, direction: Direction, labelNames: String*): CompletableFuture[util.Iterator[Edge]] =
edgesAsync(vertex, direction, labelNames: _*).toJava.toCompletableFuture
/** Convert to Graph Element **/
def toVertex(serviceName: String,
columnName: String,
id: Any,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2VertexLike =
elementBuilder.toVertex(serviceName, columnName, id, props, ts, operation)
def toEdge(srcId: Any,
tgtId: Any,
labelName: String,
direction: String,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2EdgeLike =
elementBuilder.toEdge(srcId, tgtId, labelName, direction, props, ts, operation)
def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] =
elementBuilder.toGraphElement(s, labelMapping)
/** TinkerPop Interfaces **/
def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = {
val fetchVertices = { lastParam =>
if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
else true
if (ids.isEmpty) {
//TODO: default storage need to be fixed.
val futures = (defaultStorage.vertexFetcher +: getAllVertexFetchers).map { vertexFetcher =>
val future = Future.sequence(futures)
Await.result(future, WaitTimeout).flatten.iterator
} else {
val vertexIds = ids.collect {
case s2Vertex: S2VertexLike =>
case vId: VertexId => vId
case vertex: Vertex =>[VertexId]
case other@_ => VertexId.fromString(other.toString)
if (fetchVertices) {
val queryParam = VertexQueryParam(vertexIds = vertexIds)
val future = getVertices(queryParam).map { vs =>
val ls = new util.ArrayList[structure.Vertex]()
Await.result(future, WaitTimeout)
} else { => elementBuilder.newVertex(vId)).iterator
def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
if (edgeIds.isEmpty) {
val futures = (defaultStorage.edgeFetcher +: getAllEdgeFetchers()).map { edgeFetcher =>
val future = Future.sequence(futures)
Await.result(future, WaitTimeout).flatten.iterator
} else {
Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
val s2EdgeIds = edgeIds.collect {
case s2Edge: S2EdgeLike =>[EdgeId]
case id: EdgeId => id
case s: String => EdgeId.fromString(s)
val edgesToFetch = for {
id <- s2EdgeIds
} yield {
elementBuilder.toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
checkEdges(edgesToFetch).map { stepResult =>
val ls = new util.ArrayList[structure.Edge]
stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
def tx(): Transaction = {
if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported
def variables(): Variables = new S2GraphVariables
def configuration(): Configuration = apacheConfiguration
def addVertex(label: String): Vertex = {
if (label == null) throw Element.Exceptions.labelCanNotBeNull
if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty
addVertex(Seq(T.label, label): _*)
def addVertex(kvs: AnyRef*): structure.Vertex = {
if (!features().vertex().supportsUserSuppliedIds() && kvs.contains( {
throw Vertex.Exceptions.userSuppliedIdsNotSupported
val kvsMap = S2Property.kvsToProps(kvs)
kvsMap.get( match {
case Some(idValue) if !S2Property.validType(idValue) =>
throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported()
case _ =>
kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
if (kvsMap.contains( && kvsMap(
throw Element.Exceptions.labelCanNotBeEmpty
val vertex = kvsMap.get( match {
case None => // do nothing
val id = localLongId.getAndIncrement()
elementBuilder.makeVertex(, kvsMap)
case Some(idValue) if S2Property.validType(idValue) =>
elementBuilder.makeVertex(idValue, kvsMap)
case _ =>
throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
addVertex(, vertex.ts, vertex.props, vertex.op, vertex.belongLabelIds)
def addVertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: S2Vertex.Props = S2Vertex.EmptyProps,
op: Byte = 0,
belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
val vertex = elementBuilder.newVertex(id, ts, props, op, belongLabelIds)
val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
if (rets.forall(_.isSuccess)) {
indexProvider.mutateVerticesAsync(Seq(vertex)).map { ls =>
if (ls.forall(identity)) vertex
else {
throw new RuntimeException("indexVertex failed.")
else throw new RuntimeException("addVertex failed.")
Await.ready(future, WaitTimeout)
/* tp3 only */
def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
val containsId = kvs.contains(
tgtVertex match {
case otherV: S2VertexLike =>
if (!features().edge().supportsUserSuppliedIds() && containsId) {
throw Exceptions.userSuppliedIdsNotSupported()
val props = S2Property.kvsToProps(kvs)
props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
//TODO: direction, operation, _timestamp need to be reserved property key.
try {
val direction = props.get("direction").getOrElse("out").toString
val ts = props.get(
val operation = props.get("operation").map(_.toString).getOrElse("insert")
val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
val propsPlusTs = props ++ Map( -> ts)
val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
val edge = elementBuilder.newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets =>
Await.ready(future, WaitTimeout)
} catch {
case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e)
case null => throw new java.lang.IllegalArgumentException
case _ => throw new RuntimeException("only S2Graph vertex can be used.")
def close(): Unit = {
def compute[C <: GraphComputer](aClass: Class[C]): C = ???
def compute(): GraphComputer = {
if (!features.graph.supportsComputer) {
throw Graph.Exceptions.graphComputerNotSupported
def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
override def toString(): String = "[s2graph]"