blob: d1eda5e4779fc6b7959aaf80ea188880612e9976 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{Executors, TimeUnit}
import java.util.function.Consumer
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.features.S2GraphVariables
import org.apache.s2graph.core.index.{IndexProvider, LuceneIndexProvider}
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.{SKeyValue, Storage}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
import org.apache.tinkerpop.gremlin.structure
import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper}
import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
import play.api.libs.json.{JsObject, Json}
import scalikejdbc.DBSession
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.{Random, Try}
object S2Graph {
type HashKey = (Int, Int, Int, Int, Boolean)
type FilterHashKey = (Int, Int)
val DefaultScore = 1.0
val FetchAllLimit = 10000000
val DefaultFetchLimit = 1000
private val DefaultConfigs: Map[String, AnyRef] = Map(
"hbase.zookeeper.quorum" -> "localhost",
"hbase.table.name" -> "s2graph",
"hbase.table.compression.algorithm" -> "gz",
"phase" -> "dev",
"db.default.driver" -> "org.h2.Driver",
"db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
"db.default.password" -> "graph",
"db.default.user" -> "graph",
"cache.max.size" -> java.lang.Integer.valueOf(0),
"cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
"hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
"hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
"hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000),
"max.retry.number" -> java.lang.Integer.valueOf(100),
"lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
"max.back.off" -> java.lang.Integer.valueOf(100),
"back.off.timeout" -> java.lang.Integer.valueOf(1000),
"hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
"delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
"query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"s2graph.storage.backend" -> "hbase",
"query.hardlimit" -> java.lang.Integer.valueOf(100000),
"hbase.zookeeper.znode.parent" -> "/hbase",
"query.log.sample.rate" -> Double.box(0.05)
)
var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
val numOfThread = Runtime.getRuntime.availableProcessors()
val threadPool = Executors.newFixedThreadPool(numOfThread)
val ec = ExecutionContext.fromExecutor(threadPool)
val DefaultServiceName = ""
val DefaultColumnName = "vertex"
val DefaultLabelName = "_s2graph"
val graphStrategies: TraversalStrategies =
TraversalStrategies.GlobalCache.getStrategies(classOf[Graph]).addStrategies(S2GraphStepStrategy.instance)
def toTypeSafeConfig(configuration: Configuration): Config = {
val m = new mutable.HashMap[String, AnyRef]()
for {
key <- configuration.getKeys
value = configuration.getProperty(key)
} {
m.put(key, value)
}
val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
config
}
def fromTypeSafeConfig(config: Config): Configuration = {
val configuration = new BaseConfiguration()
for {
e <- config.entrySet()
} {
configuration.setProperty(e.getKey, e.getValue.unwrapped())
}
configuration
}
def open(configuration: Configuration): S2Graph = {
new S2Graph(configuration)(ec)
}
def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
val storageBackend = config.getString("s2graph.storage.backend")
logger.info(s"[InitStorage]: $storageBackend")
storageBackend match {
case "hbase" => new AsynchbaseStorage(graph, config)(ec)
case _ => throw new RuntimeException("not supported storage.")
}
}
def parseCacheConfig(config: Config, prefix: String): Config = {
import scala.collection.JavaConversions._
val kvs = new java.util.HashMap[String, AnyRef]()
for {
entry <- config.entrySet()
(k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix)
} yield {
val newKey = k.replace(prefix, "")
kvs.put(newKey, v.unwrapped())
}
ConfigFactory.parseMap(kvs)
}
/** Global helper functions */
@tailrec
final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
if (range < sampleNumber || set.size == sampleNumber) set
else randomInt(sampleNumber, range, set + Random.nextInt(range))
}
def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
if (edges.size <= n) {
edges
} else {
val plainEdges = if (queryRequest.queryParam.offset == 0) {
edges.tail
} else edges
val randoms = randomInt(n, plainEdges.size)
var samples = List.empty[EdgeWithScore]
var idx = 0
plainEdges.foreach { e =>
if (randoms.contains(idx)) samples = e :: samples
idx += 1
}
samples
}
}
def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
edgeWithScores.map { edgeWithScore =>
edgeWithScore.copy(score = edgeWithScore.score / sum)
}
}
def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2Vertex), Boolean] = {
val vertices = for {
edgeWithScore <- edgeWithScoreLs
edge = edgeWithScore.edge
vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
} yield (edge.labelWithDir, vertex) -> true
vertices.toMap
}
/** common methods for filter out, transform, aggregate queryResult */
def convertEdges(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
for {
convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
} yield convertedEdge
}
def processTimeDecay(queryParam: QueryParam, edge: S2Edge) = {
/* process time decay */
val tsVal = queryParam.timeDecay match {
case None => 1.0
case Some(timeDecay) =>
val tsVal = try {
val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name)
innerValWithTsOpt.map { innerValWithTs =>
val innerVal = innerValWithTs.innerVal
timeDecay.labelMeta.dataType match {
case InnerVal.LONG => innerVal.value match {
case n: BigDecimal => n.bigDecimal.longValue()
case _ => innerVal.toString().toLong
}
case _ => innerVal.toString().toLong
}
} getOrElse (edge.ts)
} catch {
case e: Exception =>
logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
edge.ts
}
val timeDiff = queryParam.timestamp - tsVal
timeDecay.decay(timeDiff)
}
tsVal
}
def processDuplicates[R](queryParam: QueryParam,
duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
if (queryParam.label.consistencyLevel != "strong") {
//TODO:
queryParam.duplicatePolicy match {
case DuplicatePolicy.First => Seq(duplicates.head)
case DuplicatePolicy.Raw => duplicates
case DuplicatePolicy.CountSum =>
val countSum = duplicates.size
val (headFilterHashKey, headEdgeWithScore) = duplicates.head
Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
case _ =>
val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
val (headFilterHashKey, headEdgeWithScore) = duplicates.head
Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
}
} else {
duplicates
}
}
def toHashKey(queryParam: QueryParam, edge: S2Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
val src = edge.srcVertex.innerId.hashCode()
val tgt = edge.tgtVertex.innerId.hashCode()
val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
val filterHashKey = (src, tgt)
(hashKey, filterHashKey)
}
def filterEdges(q: Query,
stepIdx: Int,
queryRequests: Seq[QueryRequest],
queryResultLsFuture: Future[Seq[StepResult]],
queryParams: Seq[QueryParam],
alreadyVisited: Map[(LabelWithDirection, S2Vertex), Boolean] = Map.empty,
buildLastStepInnerResult: Boolean = true,
parentEdges: Map[VertexId, Seq[EdgeWithScore]])
(implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
queryResultLsFuture.map { queryRequestWithResultLs =>
val (cursors, failCount) = {
val _cursors = ArrayBuffer.empty[Array[Byte]]
var _failCount = 0
queryRequestWithResultLs.foreach { stepResult =>
_cursors.append(stepResult.cursors: _*)
_failCount += stepResult.failCount
}
_cursors -> _failCount
}
if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
else {
val isLastStep = stepIdx == q.steps.size - 1
val queryOption = q.queryOption
val step = q.steps(stepIdx)
val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
if (shouldBuildInnerResults) {
val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
edgeWithScore
}
/* process step group by */
val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
} else {
val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
val edge = edgeWithScore.edge
val score = edgeWithScore.score
val label = edgeWithScore.label
/* Select */
val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
/* OrderBy */
val orderByValues =
if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
/* StepGroupBy */
val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
/* GroupBy */
val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
/* FilterOut */
val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
newEdgeWithScore.copy(orderByValues = orderByValues,
stepGroupByValues = stepGroupByValues,
groupByValues = groupByValues,
filterOutValues = filterOutValues)
}
/* process step group by */
val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
/* process ordered list */
val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
/* process grouped list */
val grouped =
if (queryOption.groupBy.keys.isEmpty) Nil
else {
val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
results.groupBy { edgeWithScore =>
// edgeWithScore.groupByValues.map(_.map(_.toString))
edgeWithScore.groupByValues
}.foreach { case (k, ls) =>
val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
val newScoreSum = scoreSum
/*
* watch out here. by calling toString on Any, we lose type information which will be used
* later for toJson.
*/
if (merged.nonEmpty) {
val newKey = merged.head.groupByValues
agg += ((newKey, (newScoreSum, merged)))
}
}
agg.toSeq.sortBy(_._2._1 * -1)
}
StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
}
}
}
}
private def toEdgeWithScores(queryRequest: QueryRequest,
stepResult: StepResult,
parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
val queryOption = queryRequest.query.queryOption
val queryParam = queryRequest.queryParam
val prevScore = queryRequest.prevStepScore
val labelWeight = queryRequest.labelWeight
val edgeWithScores = stepResult.edgeWithScores
val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
val parents = if (shouldBuildParents) {
parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
val edge = edgeWithScore.edge
val score = edgeWithScore.score
val label = edgeWithScore.label
/* Select */
val mergedPropsWithTs =
if (queryOption.selectColumns.isEmpty) {
edge.propertyValuesInner()
} else {
val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
edge.propertyValues(queryOption.selectColumns) ++ initial
}
val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
edgeWithScore.copy(edge = newEdge)
}
} else Nil
// skip
if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
else {
val degreeScore = 0.0
val sampled =
if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
else edgeWithScores
val withScores = for {
edgeWithScore <- sampled
} yield {
val edge = edgeWithScore.edge
val edgeScore = edgeWithScore.score
val score = queryParam.scorePropagateOp match {
case "plus" => edgeScore + prevScore
case "divide" =>
if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
case _ => edgeScore * prevScore
}
val tsVal = processTimeDecay(queryParam, edge)
val newScore = degreeScore + score
// val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
val newEdge = edge.copy(parentEdges = parents)
edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
}
val normalized =
if (queryParam.shouldNormalize) normalize(withScores)
else withScores
normalized
}
}
private def buildResult[R](query: Query,
stepIdx: Int,
stepResultLs: Seq[(QueryRequest, StepResult)],
parentEdges: Map[VertexId, Seq[EdgeWithScore]])
(createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
(implicit ev: WithScore[R]): ListBuffer[R] = {
import scala.collection._
val results = ListBuffer.empty[R]
val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
var numOfDuplicates = 0
val queryOption = query.queryOption
val step = query.steps(stepIdx)
val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
val queryParam = queryRequest.queryParam
val label = queryParam.label
val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
val propsSelectColumns = (for {
column <- queryOption.propsSelectColumns
labelMeta <- label.metaPropsInvMap.get(column)
} yield labelMeta)
for {
edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
} {
val edge = edgeWithScore.edge
val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
// params += (hashKey -> queryParam) //
/* check if this edge should be exlcuded. */
if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
if (shouldBeIncluded) {
edgesToInclude.add(filterHashKey)
}
val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
duplicates.get(hashKey) match {
case None =>
val newLs = ListBuffer.empty[(FilterHashKey, R)]
newLs += (filterHashKey -> newEdgeWithScore)
duplicates += (hashKey -> newLs) //
case Some(old) =>
numOfDuplicates += 1
old += (filterHashKey -> newEdgeWithScore) //
}
}
}
}
if (numOfDuplicates == 0) {
// no duplicates at all.
for {
(hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
} {
results += edgeWithScore
}
} else {
// need to resolve duplicates.
val seen = new mutable.HashSet[HashKey]()
for {
(hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
if !seen.contains(hashKey)
} {
// val queryParam = params(hashKey)
processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
if (ev.score(duplicate) >= queryParam.threshold) {
seen += hashKey
results += duplicate
}
}
}
}
results
}
}
@Graph.OptIns(value = Array(
new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD),
new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
))
@Graph.OptOuts(value = Array(
/* Process */
/* branch: passed all. */
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.ChooseTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.OptionalTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.RepeatTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.UnionTest$Traversals", method = "*", reason = "no"),
// passed: all
/* filter */
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.AndTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.CoinTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.CyclicPathTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupTest$Traversals", method = "*", reason = "no"),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."),
// passed: all, failed: g_V_properties_drop
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.IsTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.OrTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.SimplePathTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.WhereTest$Traversals", method = "*", reason = "no"),
// passed: all,
/* map */
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.AddEdgeTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.AddVertexTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CoalesceTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ConstantTest$Traversals", method = "*", reason = "no"),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."),
// passed: all, failed: g_V_both_both_count, g_V_repeatXoutX_timesX3X_count, g_V_repeatXoutX_timesX8X_count
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.FlatMapTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.LoopsTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MapTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MapKeysTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MapValuesTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$CountMatchTraversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$GreedyMatchTraversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanTest$Traversals", method = "*", reason = "no"),
// failed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MinTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SumTest$Traversals", method = "*", reason = "no"),
// failed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.PathTest$Traversals", method = "*", reason = "no"),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."),
// failed: grateful_V_out_out_profileXmetricsX, g_V_repeat_both_profileXmetricsX, grateful_V_out_out_profile, g_V_repeat_both_profile
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProjectTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.UnfoldTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ValueMapTest$Traversals", method = "*", reason = "no"),
// passed: all
/* sideEffect */
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.AggregateTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTestV3d0$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupCountTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SackTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest$Traversals", method = "*", reason = "no"),
// passed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StoreTest$Traversals", method = "*", reason = "no"),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"),
// passed: all, failed: g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup, g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.TreeTest$Traversals", method = "*", reason = "no"),
// passed: all
/* compliance */
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."),
// failed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.TranslationStrategyProcessTest", method = "*", reason = "no"),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
// failed: shouldGenerateDefaultIdOnAddVWithSpecifiedId, shouldGenerateDefaultIdOnAddVWithGeneratedCustomId, shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId,
// shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId, shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId, shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."),
// failed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategyProcessTest", method = "*", reason = "no"),
// passed: all
new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."),
// failed: all
// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest", method = "*", reason = "no"),
// passed: all
/* Structure */
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateIdEquality", reason="reference equals on EdgeId is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateEquality", reason="reference equals on EdgeId is not supported."),
// passed: all, failed: none
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest", method="*", reason="no"),
// passed: all, failed: none
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", method="*", reason="no"),
// passed: all, failed: none
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexPropertyTest", method="*", reason="no"),
// passed: all, failed: none
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", method="*", reason="no"),
// passed: all, failed: none
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method="shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason="S2Vertex.addEdge behave as upsert."),
// passed: , failed: shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method="shouldNotEvaluateToEqualDifferentId", reason="reference equals is not supported."),
// passed: all, failed: shouldNotEvaluateToEqualDifferentId
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexTest", method="*", reason="no"),
// passed: all, failed: none
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedGraphTest", method="*", reason="no"),
// passed: all, failed: none, all ignored
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method="shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason="reference equals is not supported."),
// // passed: , failed: shouldNotBeEqualPropertiesAsThereIsDifferentKey
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexPropertyTest", method="*", reason="no"),
// passed: all, failed: none
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="shouldRemoveVertices", reason="random label creation is not supported. all label need to be pre-configured."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason="Assigning the same ID to an Element update instead of throwing exception."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="shouldRemoveEdges", reason="random label creation is not supported. all label need to be pre-configured."),
// passed: , failed:
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method="shouldNotEvaluateToEqualDifferentId", reason="Assigning the same ID to an Element update instead of throwing exception."),
// passed: all, skip: shouldNotEvaluateToEqualDifferentId
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexPropertyTest", method="*", reason="no"),
// passed: all, failed: none
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceGraphTest", method="*", reason="no"),
// passed: all, failed: none, all ignored
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexTest", method="*", reason="no"),
// passed: all, failed: none, all ignored
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.star.StarGraphTest", method="*", reason="no"),
// passed: all,
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason="graphson-v2-embedded is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason="graphson-v2-embedded is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason="graphson-v2-embedded is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason="graphson-v2-embedded is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason="graphson-v2-embedded is not supported."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason="graphson-v2-embedded is not supported."),
// passed: all, except shouldGenerateDifferentGraph method.
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method="*", reason="non-deterministic test."),
// all failed.
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method="shouldSerializeTree", reason="order of children is reversed. not sure why."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method="shouldSerializeTraversalMetrics", reason="expected 2, actual 3."),
// passed: all, failed: $GryoTest.shouldSerializeTree
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoCustomTest", method="*", reason="no"),
// all ignored.
// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoPropertyTest", method="*", reason="no"),
// all passed.
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexWithBOTHEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexWithINEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteDetachedVertexAsReferenceNoEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexNoEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexWithOUTEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteDetachedVertexNoEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
// passed: all, except graphson-v2-embedded.
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method="shouldReadWriteDetachedEdgeAsReference", specific="graphson-v2-embedded", reason="no"),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method="shouldReadWriteEdge", specific="graphson-v2-embedded", reason="no"),
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method="shouldReadWriteDetachedEdge", specific="graphson-v2-embedded", reason="no"),
// passed: all, except graphson-v2-embedded.
// TODO:
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method="*", reason="no"), // all failed.
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoTest", method="*", reason="no")
// all failed.
))
class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph {
import S2Graph._
private var apacheConfiguration: Configuration = _
def dbSession() = scalikejdbc.AutoSession
def this(apacheConfiguration: Configuration)(ec: ExecutionContext) = {
this(S2Graph.toTypeSafeConfig(apacheConfiguration))(ec)
this.apacheConfiguration = apacheConfiguration
}
private val running = new AtomicBoolean(true)
val config = _config.withFallback(S2Graph.DefaultConfig)
Model.apply(config)
Model.loadCache()
val MaxRetryNum = config.getInt("max.retry.number")
val MaxBackOff = config.getInt("max.back.off")
val BackoffTimeout = config.getInt("back.off.timeout")
val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
val FailProb = config.getDouble("hbase.fail.prob")
val LockExpireDuration = config.getInt("lock.expire.time")
val MaxSize = config.getInt("future.cache.max.size")
val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
val WaitTimeout = Duration(600, TimeUnit.SECONDS)
val management = new Management(this)
def getManagement() = management
private val localLongId = new AtomicLong()
def nextLocalLongId = localLongId.getAndIncrement()
private def confWithFallback(conf: Config): Config = {
conf.withFallback(config)
}
/**
* TODO: we need to some way to handle malformed configuration for storage.
*/
val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
val labels = Label.findAll()
val services = Service.findAll()
val labelConfigs = labels.flatMap(_.toStorageConfig)
val serviceConfigs = services.flatMap(_.toStorageConfig)
val configs = (labelConfigs ++ serviceConfigs).map { conf =>
confWithFallback(conf)
}.toSet
val pools = new java.util.HashMap[Config, Storage[_, _]]()
configs.foreach { config =>
pools.put(config, S2Graph.initStorage(this, config)(ec))
}
val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
labels.foreach { label =>
if (label.storageConfigOpt.isDefined) {
m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get))
}
}
services.foreach { service =>
if (service.storageConfigOpt.isDefined) {
m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get))
}
}
m
}
val defaultStorage: Storage[_, _] = S2Graph.initStorage(this, config)(ec)
/** QueryLevel FutureCache */
val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
for {
entry <- config.entrySet() if S2Graph.DefaultConfigs.contains(entry.getKey)
(k, v) = (entry.getKey, entry.getValue)
} logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
val indexProvider = IndexProvider.apply(config)
def getStorage(service: Service): Storage[_, _] = {
storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
}
def getStorage(label: Label): Storage[_, _] = {
storagePool.getOrElse(s"label:${label.label}", defaultStorage)
}
def flushStorage(): Unit = {
storagePool.foreach { case (_, storage) =>
/* flush is blocking */
storage.flush()
}
}
def fallback = Future.successful(StepResult.Empty)
def checkEdges(edges: Seq[S2Edge]): Future[StepResult] = {
val futures = for {
edge <- edges
} yield {
fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
}
}
Future.sequence(futures).map { edgeWithScoreLs =>
val edgeWithScores = edgeWithScoreLs.flatten
StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil)
}
}
// def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges)
def getEdges(q: Query): Future[StepResult] = {
Try {
if (q.steps.isEmpty) {
// TODO: this should be get vertex query.
fallback
} else {
val filterOutFuture = q.queryOption.filterOutQuery match {
case None => fallback
case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
}
for {
stepResult <- getEdgesStepInner(q)
filterOutInnerResult <- filterOutFuture
} yield {
if (filterOutInnerResult.isEmpty) stepResult
else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult)
}
}
} recover {
case e: Exception =>
logger.error(s"getEdgesAsync: $e", e)
fallback
} get
}
def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
Try {
if (q.steps.isEmpty) fallback
else {
val queryOption = q.queryOption
def fetch: Future[StepResult] = {
val startStepInnerResult = QueryResult.fromVertices(this, q)
q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
for {
prevStepInnerResult <- prevStepInnerResultFuture
currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
} yield {
currentStepInnerResult.copy(
accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
)
}
}
}
fetch
}
} recover {
case e: Exception =>
logger.error(s"getEdgesAsync: $e", e)
fallback
} get
}
def fetchStep(orgQuery: Query,
stepIdx: Int,
stepInnerResult: StepResult,
buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
else {
val edgeWithScoreLs = stepInnerResult.edgeWithScores
val q = orgQuery
val queryOption = orgQuery.queryOption
val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
val step = q.steps(stepIdx)
val alreadyVisited =
if (stepIdx == 0) Map.empty[(LabelWithDirection, S2Vertex), Boolean]
else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
val initial = (Map.empty[S2Vertex, Double], Map.empty[S2Vertex, ArrayBuffer[EdgeWithScore]])
val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
val key = edgeWithScore.edge.tgtVertex
val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
buffer += edgeWithScore
(sum + (key -> newScore), group + (key -> buffer))
}
val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2)
val nextStepSrcVertices = if (prevStepLimit >= 0) {
groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
} else {
groupedByFiltered.toSeq
}
val queryRequests = for {
(vertex, prevStepScore) <- nextStepSrcVertices
queryParam <- step.queryParams
} yield {
val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
}
val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
filterEdges(orgQuery, stepIdx, queryRequests,
fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
}
}
/**
* responsible to fire parallel fetch call into storage and create future that will return merged result.
*
* @param queryRequests
* @param prevStepEdges
* @return
*/
def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
val reqWithIdxs = queryRequests.zipWithIndex
val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
for {
prev <- prevFuture
cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
} yield {
prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
}
}
aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
}
def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
Try {
if (mq.queries.isEmpty) fallback
else {
val filterOutFuture = mq.queryOption.filterOutQuery match {
case None => fallback
case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
}
val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
for {
multiQueryResults <- multiQueryFutures
filterOutInnerResult <- filterOutFuture
} yield {
StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult)
}
}
} recover {
case e: Exception =>
logger.error(s"getEdgesAsync: $e", e)
fallback
} get
}
def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
/* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
* so use empty cacheKey.
* */
val queryParam = QueryParam(labelName = edge.innerLabel.label,
direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
cacheTTLInMillis = -1)
val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
val storage = getStorage(edge.innerLabel)
storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
val (edgeOpt, kvOpt) =
if (kvs.isEmpty) (None, None)
else {
val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
val _kvOpt = kvs.headOption
(snapshotEdgeOpt, _kvOpt)
}
(queryParam, edgeOpt, kvOpt)
} recoverWith { case ex: Throwable =>
logger.error(s"fetchQueryParam failed. fallback return.", ex)
throw FetchTimeoutException(s"${edge.toLogString}")
}
}
def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
ls.flatten.toSeq.sortBy(_._2).map(_._1)
}
}
/** mutate */
def deleteAllAdjacentEdges(srcVertices: Seq[S2Vertex],
labels: Seq[Label],
dir: Int,
ts: Long): Future[Boolean] = {
val requestTs = ts
val vertices = srcVertices
/* create query per label */
val queries = for {
label <- labels
} yield {
val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir),
offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw)
val step = Step(List(queryParam))
Query(vertices, Vector(step))
}
// Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
fetchAndDeleteAll(queries, requestTs)
} { case (allDeleted, deleteSuccess) =>
allDeleted && deleteSuccess
}.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
retryFuture onFailure {
case ex =>
logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
}
retryFuture
}
def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
val future = for {
stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
(allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
} yield {
// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
(allDeleted, ret)
}
Extensions.retryOnFailure(MaxRetryNum) {
future
} {
logger.error(s"fetch and deleteAll failed.")
(true, false)
}
}
def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
requestTs: Long): Future[(Boolean, Boolean)] = {
stepInnerResultLs.foreach { stepInnerResult =>
if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
}
val futures = for {
stepInnerResult <- stepInnerResultLs
deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
if deleteStepInnerResult.edgeWithScores.nonEmpty
} yield {
val head = deleteStepInnerResult.edgeWithScores.head
val label = head.edge.innerLabel
val ret = label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
/*
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
} else {
getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
}
case _ =>
/*
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
}
ret
}
if (futures.isEmpty) {
// all deleted.
Future.successful(true -> true)
} else {
Future.sequence(futures).map { rets => false -> rets.forall(identity) }
}
}
def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
(edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
}
if (filtered.isEmpty) StepResult.Empty
else {
val head = filtered.head
val label = head.edge.innerLabel
val edgeWithScoreLs = filtered.map { edgeWithScore =>
val edge = edgeWithScore.edge
val copiedEdge = label.consistencyLevel match {
case "strong" =>
edge.copyEdge(op = GraphUtil.operations("delete"),
version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
case _ =>
edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
}
// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
// case "strong" =>
// val edge = edgeWithScore.edge
// edge.property(LabelMeta.timestamp.name, requestTs)
// val _newPropsWithTs = edge.updatePropsWithTs()
//
// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
// case _ =>
// val oldEdge = edgeWithScore.edge
// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
// }
//
// val copiedEdge =
// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
edgeToDelete
}
//Degree edge?
StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
}
}
// def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
// storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
def mutateElements(elements: Seq[GraphElement],
withWait: Boolean = false): Future[Seq[Boolean]] = {
val edgeBuffer = ArrayBuffer[(S2Edge, Int)]()
val vertexBuffer = ArrayBuffer[(S2Vertex, Int)]()
elements.zipWithIndex.foreach {
case (e: S2Edge, idx: Int) => edgeBuffer.append((e, idx))
case (v: S2Vertex, idx: Int) => vertexBuffer.append((v, idx))
case any@_ => logger.error(s"Unknown type: ${any}")
}
val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result =>
edgeBuffer.map(_._2).zip(result)
}
val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result =>
vertexBuffer.map(_._2).zip(result)
}
val graphFuture = for {
edgesMutated <- edgeFutureWithIdx
verticesMutated <- vertexFutureWithIdx
} yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2)
graphFuture
}
// def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
val edgeWithIdxs = edges.zipWithIndex
val (strongEdges, weakEdges) =
edgeWithIdxs.partition { case (edge, idx) =>
val e = edge
e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
}
val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
val storage = getStorage(label)
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
/* multiple edges with weak consistency level will be processed as batch */
val mutations = edges.flatMap { edge =>
val (_, edgeUpdate) =
if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
else S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
}
storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
idxs.map(idx => idx -> ret)
}
}
Future.sequence(futures)
}
val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _)
}
val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
val storage = getStorage(label)
storage.mutateStrongEdges(edges, withWait = true).map { rets =>
idxs.zip(rets)
}
}
for {
weak <- Future.sequence(weakEdgesFutures)
deleteAll <- Future.sequence(deleteAllFutures)
strong <- Future.sequence(strongEdgesFutures)
} yield {
(deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
}
}
def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
}
def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
val edgesWithIdx = edges.zipWithIndex
val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
}
Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
}
def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[Boolean] = {
val label = edge.innerLabel
val storage = getStorage(label)
val kvs = storage.buildDegreePuts(edge, degreeVal)
storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true)
}
def isRunning(): Boolean = running.get()
def shutdown(modelDataDelete: Boolean = false): Unit =
if (running.compareAndSet(true, false)) {
flushStorage()
Model.shutdown(modelDataDelete)
defaultStorage.shutdown()
localLongId.set(0l)
}
def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
val parts = GraphUtil.split(s)
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
/* current only edge is considered to be bulk loaded */
labelMapping.get(parts(5)) match {
case None =>
case Some(toReplace) =>
parts(5) = toReplace
}
toEdge(parts)
} else if (logType == "vertex" | logType == "v") {
toVertex(parts)
} else {
throw new GraphExceptions.JsonParseException("log type is not exist in log.")
}
element
} recover {
case e: Exception =>
logger.error(s"[toElement]: $e", e)
None
} get
def toVertex(s: String): Option[S2Vertex] = {
toVertex(GraphUtil.split(s))
}
def toEdge(s: String): Option[S2Edge] = {
toEdge(GraphUtil.split(s))
}
def toEdge(parts: Array[String]): Option[S2Edge] = Try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val tempDirection = if (parts.length >= 8) parts(7) else "out"
val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
Option(edge)
} recover {
case e: Exception =>
logger.error(s"[toEdge]: $e", e)
throw e
} get
def toVertex(parts: Array[String]): Option[S2Vertex] = Try {
val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
Option(vertex)
} recover {
case e: Throwable =>
logger.error(s"[toVertex]: $e", e)
throw e
} get
def toEdge(srcId: Any,
tgtId: Any,
labelName: String,
direction: String,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2Edge = {
val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn
val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion)
val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion)
val srcVertex = newVertex(SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
val tgtVertex = newVertex(TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
new S2Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
}
def toVertex(serviceName: String,
columnName: String,
id: Any,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
operation: String = "insert"): S2Vertex = {
val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found."))
val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found."))
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
val srcVertexId = id match {
case vid: VertexId => id.asInstanceOf[VertexId]
case _ => VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion))
}
val propsInner = column.propsToInnerVals(props) ++
Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
val vertex = new S2Vertex(this, srcVertexId, ts, S2Vertex.EmptyProps, op)
S2Vertex.fillPropsWithTs(vertex, propsInner)
vertex
}
/**
* helper to create new Edge instance from given parameters on memory(not actually stored in storage).
*
* Since we are using mutable map for property value(propsWithTs),
* we should make sure that reference for mutable map never be shared between multiple Edge instances.
* To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
*
* Note that we are using following convention
* 1. `add*` for method that actually store instance into storage,
* 2. `new*` for method that only create instance on memory, but not store it into storage.
*
* @param srcVertex
* @param tgtVertex
* @param innerLabel
* @param dir
* @param op
* @param version
* @param propsWithTs
* @param parentEdges
* @param originalEdgeOpt
* @param pendingEdgeOpt
* @param statusCode
* @param lockTs
* @param tsInnerValOpt
* @return
*/
def newEdge(srcVertex: S2Vertex,
tgtVertex: S2Vertex,
innerLabel: Label,
dir: Int,
op: Byte = GraphUtil.defaultOpByte,
version: Long = System.currentTimeMillis(),
propsWithTs: S2Edge.State,
parentEdges: Seq[EdgeWithScore] = Nil,
originalEdgeOpt: Option[S2Edge] = None,
pendingEdgeOpt: Option[S2Edge] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
tsInnerValOpt: Option[InnerValLike] = None): S2Edge = {
val edge = S2Edge(
this,
srcVertex,
tgtVertex,
innerLabel,
dir,
op,
version,
S2Edge.EmptyProps,
parentEdges,
originalEdgeOpt,
pendingEdgeOpt,
statusCode,
lockTs,
tsInnerValOpt)
S2Edge.fillPropsWithTs(edge, propsWithTs)
edge
}
/**
* helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
*
* Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
* @param srcVertex
* @param tgtVertex
* @param label
* @param dir
* @param op
* @param version
* @param propsWithTs
* @param pendingEdgeOpt
* @param statusCode
* @param lockTs
* @param tsInnerValOpt
* @return
*/
private[core] def newSnapshotEdge(srcVertex: S2Vertex,
tgtVertex: S2Vertex,
label: Label,
dir: Int,
op: Byte,
version: Long,
propsWithTs: S2Edge.State,
pendingEdgeOpt: Option[S2Edge],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
snapshotEdge
}
def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
newVertexId(service, column, id)
}
/**
* helper to create S2Graph's internal VertexId instance with given parameters.
* @param service
* @param column
* @param id
* @return
*/
def newVertexId(service: Service,
column: ServiceColumn,
id: Any): VertexId = {
val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
new VertexId(column, innerVal)
}
def newVertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: S2Vertex.Props = S2Vertex.EmptyProps,
op: Byte = 0,
belongLabelIds: Seq[Int] = Seq.empty): S2Vertex = {
val vertex = new S2Vertex(this, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
S2Vertex.fillPropsWithTs(vertex, props)
vertex
}
def getVertex(vertexId: VertexId): Option[S2Vertex] = {
val v = newVertex(vertexId)
Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout)
}
def fetchEdges(vertex: S2Vertex, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = {
Await.result(fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout)
}
def fetchEdgesAsync(vertex: S2Vertex, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = {
val queryParams = labelNameWithDirs.map { case (l, direction) =>
QueryParam(labelName = l, direction = direction.toLowerCase)
}
val query = Query.toQuery(Seq(vertex), queryParams)
val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) }
val ls = new util.ArrayList[Edge]()
fetches(queryRequests, Map.empty).map { stepResultLs =>
stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
ls.iterator()
}
// getEdges(query).map { stepResult =>
// val ls = new util.ArrayList[Edge]()
// stepResult.edgeWithScores.foreach(es => ls.add(es.edge))
// ls.iterator()
// }
}
/**
* used by graph.traversal().V()
* @param ids: array of VertexId values. note that last parameter can be used to control if actually fetch vertices from storage or not.
* since S2Graph use user-provided id as part of edge, it is possible to
* fetch edge without fetch start vertex. default is false which means we are not fetching vertices from storage.
* @return
*/
override def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = {
val fetchVertices = ids.lastOption.map { lastParam =>
if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
else true
}.getOrElse(true)
if (ids.isEmpty) {
//TODO: default storage need to be fixed.
Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
} else {
val vertices = ids.collect {
case s2Vertex: S2Vertex => s2Vertex
case vId: VertexId => newVertex(vId)
case vertex: Vertex => newVertex(vertex.id().asInstanceOf[VertexId])
case other @ _ => newVertex(VertexId.fromString(other.toString))
}
if (fetchVertices) {
val future = getVertices(vertices).map { vs =>
val ls = new util.ArrayList[structure.Vertex]()
ls.addAll(vs)
ls.iterator()
}
Await.result(future, WaitTimeout)
} else {
vertices.iterator
}
}
}
override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
if (edgeIds.isEmpty) {
// FIXME
Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
} else {
Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
}
}
def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
val s2EdgeIds = edgeIds.collect {
case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId]
case id: EdgeId => id
case s: String => EdgeId.fromString(s)
}
val edgesToFetch = for {
id <- s2EdgeIds
} yield {
toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
}
checkEdges(edgesToFetch).map { stepResult =>
val ls = new util.ArrayList[structure.Edge]
stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
ls.iterator()
}
}
override def tx(): Transaction = {
if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported
???
}
override def variables(): Variables = new S2GraphVariables
override def configuration(): Configuration = apacheConfiguration
override def addVertex(label: String): Vertex = {
if (label == null) throw Element.Exceptions.labelCanNotBeNull
if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty
addVertex(Seq(T.label, label): _*)
}
def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2Vertex = {
idValue match {
case vId: VertexId =>
toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
case _ =>
val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
val (serviceName, columnName) =
if (names.length == 1) (DefaultServiceName, names(0))
else throw new RuntimeException("malformed data on vertex label.")
toVertex(serviceName, columnName, idValue, kvsMap)
}
}
override def addVertex(kvs: AnyRef*): structure.Vertex = {
if (!features().vertex().supportsUserSuppliedIds() && kvs.contains(T.id)) {
throw Vertex.Exceptions.userSuppliedIdsNotSupported
}
val kvsMap = S2Property.kvsToProps(kvs)
kvsMap.get(T.id.name()) match {
case Some(idValue) if !S2Property.validType(idValue) =>
throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported()
case _ =>
}
kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
if (kvsMap.contains(T.label.name()) && kvsMap(T.label.name).toString.isEmpty)
throw Element.Exceptions.labelCanNotBeEmpty
val vertex = kvsMap.get(T.id.name()) match {
case None => // do nothing
val id = nextLocalLongId
makeVertex(Long.box(id), kvsMap)
case Some(idValue) if S2Property.validType(idValue) =>
makeVertex(idValue, kvsMap)
case _ =>
throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
}
addVertexInner(vertex)
vertex
}
def addVertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: S2Vertex.Props = S2Vertex.EmptyProps,
op: Byte = 0,
belongLabelIds: Seq[Int] = Seq.empty): S2Vertex = {
val vertex = newVertex(id, ts, props, op, belongLabelIds)
val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
if (rets.forall(identity)) vertex
else throw new RuntimeException("addVertex failed.")
}
Await.ready(future, WaitTimeout)
vertex
}
def addVertexInner(vertex: S2Vertex): S2Vertex = {
val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
if (rets.forall(identity)) {
indexProvider.mutateVerticesAsync(Seq(vertex))
} else throw new RuntimeException("addVertex failed.")
}
Await.ready(future, WaitTimeout)
vertex
}
/* tp3 only */
def addEdge(srcVertex: S2Vertex, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
val containsId = kvs.contains(T.id)
tgtVertex match {
case otherV: S2Vertex =>
if (!features().edge().supportsUserSuppliedIds() && containsId) {
throw Exceptions.userSuppliedIdsNotSupported()
}
val props = S2Property.kvsToProps(kvs)
props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
//TODO: direction, operation, _timestamp need to be reserved property key.
try {
val direction = props.get("direction").getOrElse("out").toString
val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
val operation = props.get("operation").map(_.toString).getOrElse("insert")
val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets =>
indexProvider.mutateEdgesAsync(Seq(edge))
}
Await.ready(future, WaitTimeout)
edge
} catch {
case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e)
}
case null => throw new java.lang.IllegalArgumentException
case _ => throw new RuntimeException("only S2Graph vertex can be used.")
}
}
override def close(): Unit = {
shutdown()
}
override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
override def compute(): GraphComputer = {
if (!features.graph.supportsComputer) {
throw Graph.Exceptions.graphComputerNotSupported
}
???
}
class S2GraphFeatures extends Features {
import org.apache.s2graph.core.{features => FS}
override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures
override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures
override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean =
super.supports(featureClass, feature)
override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures
override def toString: String = {
s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}"
}
}
private val s2Features = new S2GraphFeatures
override def features() = s2Features
override def toString(): String = "[s2graph]"
override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
}
}