blob: f3010f7068fb0a78da8a5c0bf7e9d4a7ce71bd3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ExecutorService, Executors}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.rocks.RocksStorage
import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.{Extensions, Importer, logger}
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.util.Try
object S2Graph {
type HashKey = (Int, Int, Int, Int, Boolean)
type FilterHashKey = (Int, Int)
val DefaultScore = 1.0
val FetchAllLimit = 10000000
val DefaultFetchLimit = 1000
private val DefaultConfigs: Map[String, AnyRef] = Map(
"hbase.zookeeper.quorum" -> "localhost",
"hbase.table.name" -> "s2graph",
"hbase.table.compression.algorithm" -> "gz",
"phase" -> "dev",
"db.default.driver" -> "org.h2.Driver",
"db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
"db.default.password" -> "graph",
"db.default.user" -> "graph",
"cache.max.size" -> java.lang.Integer.valueOf(0),
"cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
"resource.cache.max.size" -> java.lang.Integer.valueOf(1000),
"resource.cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
"hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
"hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
"hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000),
"max.retry.number" -> java.lang.Integer.valueOf(100),
"lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
"max.back.off" -> java.lang.Integer.valueOf(100),
"back.off.timeout" -> java.lang.Integer.valueOf(1000),
"hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
"delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
"query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"s2graph.storage.backend" -> "hbase",
"query.hardlimit" -> java.lang.Integer.valueOf(100000),
"hbase.zookeeper.znode.parent" -> "/hbase",
"query.log.sample.rate" -> Double.box(0.05)
)
var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
val numOfThread = Runtime.getRuntime.availableProcessors()
val threadPool = Executors.newFixedThreadPool(numOfThread)
val ec = ExecutionContext.fromExecutor(threadPool)
val resourceManagerEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThread))
val DefaultServiceName = ""
val DefaultColumnName = "vertex"
val DefaultLabelName = "_s2graph"
var hbaseExecutor: ExecutorService = _
val graphStrategies: TraversalStrategies =
TraversalStrategies.GlobalCache.getStrategies(classOf[Graph]).addStrategies(S2GraphStepStrategy.instance)
def toTypeSafeConfig(configuration: Configuration): Config = {
val m = new mutable.HashMap[String, AnyRef]()
for {
key <- configuration.getKeys
value = configuration.getProperty(key)
} {
m.put(key, value)
}
val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
config
}
def fromTypeSafeConfig(config: Config): Configuration = {
val configuration = new BaseConfiguration()
for {
e <- config.entrySet()
} {
configuration.setProperty(e.getKey, e.getValue.unwrapped())
}
configuration
}
def open(configuration: Configuration): S2GraphLike = {
new S2Graph(configuration)(ec)
}
def initStorage(graph: S2GraphLike, config: Config)(ec: ExecutionContext): Storage = {
val storageBackend = config.getString("s2graph.storage.backend")
logger.info(s"[InitStorage]: $storageBackend")
storageBackend match {
case "hbase" =>
hbaseExecutor =
if (config.getString("hbase.zookeeper.quorum") == "localhost")
AsynchbaseStorage.initLocalHBase(config)
else
null
new AsynchbaseStorage(graph, config)
case "rocks" => new RocksStorage(graph, config)
case _ => throw new RuntimeException("not supported storage.")
}
}
def parseCacheConfig(config: Config, prefix: String): Config = {
import scala.collection.JavaConversions._
val kvs = new java.util.HashMap[String, AnyRef]()
for {
entry <- config.entrySet()
(k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix)
} yield {
val newKey = k.replace(prefix, "")
kvs.put(newKey, v.unwrapped())
}
ConfigFactory.parseMap(kvs)
}
def initMutators(graph: S2GraphLike): Unit = {
val management = graph.management
ServiceColumn.findAll().foreach { column =>
management.updateVertexMutator(column, column.options)
}
Label.findAll().foreach { label =>
management.updateEdgeMutator(label, label.options)
}
}
def initFetchers(graph: S2GraphLike): Unit = {
val management = graph.management
ServiceColumn.findAll().foreach { column =>
management.updateVertexFetcher(column, column.options)
}
Label.findAll().foreach { label =>
management.updateEdgeFetcher(label, label.options)
}
}
def loadFetchersAndMutators(graph: S2GraphLike): Unit = {
initFetchers(graph)
initMutators(graph)
}
}
class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
var apacheConfiguration: Configuration = _
def dbSession() = scalikejdbc.AutoSession
def this(apacheConfiguration: Configuration)(ec: ExecutionContext) = {
this(S2Graph.toTypeSafeConfig(apacheConfiguration))(ec)
this.apacheConfiguration = apacheConfiguration
}
private val running = new AtomicBoolean(true)
override val config = _config.withFallback(S2Graph.DefaultConfig)
val storageBackend = Try {
config.getString("s2graph.storage.backend")
}.getOrElse("hbase")
Schema.apply(config)
Schema.loadCache()
override val management = new Management(this)
override val resourceManager: ResourceManager = new ResourceManager(this, config)(S2Graph.resourceManagerEc)
override val indexProvider = IndexProvider.apply(config)
override val elementBuilder = new GraphElementBuilder(this)
override val traversalHelper = new TraversalHelper(this)
private def confWithFallback(conf: Config): Config = {
conf.withFallback(config)
}
val defaultStorage: Storage = S2Graph.initStorage(this, config)(ec)
for {
entry <- config.entrySet() if S2Graph.DefaultConfigs.contains(entry.getKey)
(k, v) = (entry.getKey, entry.getValue)
} logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
/**
* TODO: we need to some way to handle malformed configuration for storage.
*/
val storagePool: scala.collection.mutable.Map[String, Storage] = {
val labels = Label.findAll()
val services = Service.findAll()
val labelConfigs = labels.flatMap(_.toStorageConfig)
val serviceConfigs = services.flatMap(_.toStorageConfig)
val configs = (labelConfigs ++ serviceConfigs).map { conf =>
confWithFallback(conf)
}.toSet
val pools = new java.util.HashMap[Config, Storage]()
configs.foreach { config =>
pools.put(config, S2Graph.initStorage(this, config)(ec))
}
val m = new java.util.concurrent.ConcurrentHashMap[String, Storage]()
labels.foreach { label =>
if (label.storageConfigOpt.isDefined) {
m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get))
}
}
services.foreach { service =>
if (service.storageConfigOpt.isDefined) {
m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get))
}
}
m
}
override def getStorage(service: Service): Storage = {
storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
}
override def getStorage(label: Label): Storage = {
storagePool.getOrElse(s"label:${label.label}", defaultStorage)
}
/* Currently, each getter on Fetcher and Mutator missing proper implementation
* Please discuss what is proper way to maintain resources here and provide
* right implementation(S2GRAPH-213).
* */
override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
resourceManager.getOrElseUpdateVertexFetcher(column)
.getOrElse(defaultStorage.vertexFetcher)
}
override def getEdgeFetcher(label: Label): EdgeFetcher = {
resourceManager.getOrElseUpdateEdgeFetcher(label)
.getOrElse(defaultStorage.edgeFetcher)
}
override def getAllVertexFetchers(): Seq[VertexFetcher] = {
resourceManager.getAllVertexFetchers()
}
override def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
resourceManager.getAllEdgeFetchers()
}
override def getVertexMutator(column: ServiceColumn): VertexMutator = {
resourceManager.getOrElseUpdateVertexMutator(column)
.getOrElse(defaultStorage.vertexMutator)
}
override def getEdgeMutator(label: Label): EdgeMutator = {
resourceManager.getOrElseUpdateEdgeMutator(label)
.getOrElse(defaultStorage.edgeMutator)
}
//TODO:
override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
// getStorage(label).optimisticEdgeFetcher
null
}
//TODO:
override def flushStorage(): Unit = {
storagePool.foreach { case (_, storage) =>
/* flush is blocking */
storage.flush()
}
}
override def shutdown(modelDataDelete: Boolean = false): Unit =
if (running.compareAndSet(true, false)) {
flushStorage()
Schema.shutdown(modelDataDelete)
defaultStorage.shutdown()
localLongId.set(0l)
}
def searchVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
val matchedVerticesFuture = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids =>
(queryParam.vertexIds ++ vids).distinct.map(vid => elementBuilder.newVertex(vid))
}
if (queryParam.fetchProp) matchedVerticesFuture.flatMap(vs => getVertices(queryParam.copy(vertexIds = vs.map(_.id))))
else matchedVerticesFuture
}
override def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
val vertexIdsWithIdx = queryParam.vertexIds.zipWithIndex
val futures = vertexIdsWithIdx.groupBy { case (vId, idx) => vId.column }.map { case (serviceColumn, vertexGroup) =>
val (vertexIds, indices) = vertexGroup.unzip
getVertexFetcher(serviceColumn).fetchVertices(queryParam.copy(vertexIds = vertexIds)).map(_.zip(indices))
}
Future.sequence(futures).map { ls =>
ls.flatten.toSeq.sortBy(_._2).map(_._1)
}
}
override def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] = {
val futures = for {
edge <- edges
} yield {
getOptimisticEdgeFetcher(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
}
}
Future.sequence(futures).map { edgeWithScoreLs =>
val edgeWithScores = edgeWithScoreLs.flatten
StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil)
}
}
override def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val futures = vertices.map { vertex =>
getVertexMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
}
Future.sequence(futures)
}
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
}
indexProvider.mutateVerticesAsync(vertices)
Future.sequence(futures).map{ ls =>
ls.flatten.toSeq.sortBy(_._2).map(_._1)
}
}
override def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeWithIdxs = edges.zipWithIndex
val (strongEdges, weakEdges) =
edgeWithIdxs.partition { case (edge, idx) =>
val e = edge
e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk")
}
val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
val mutator = getEdgeMutator(label)
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
/* multiple edges with weak consistency level will be processed as batch */
mutator.mutateWeakEdges(zkQuorum, edges, withWait)
}
Future.sequence(futures)
}
val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") }
val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.getDir(), edge.ts).map(idx -> _)
}
val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
val mutator = getEdgeMutator(label)
val zkQuorum = label.hbaseZkAddr
mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
idxs.zip(rets)
}
}
for {
weak <- Future.sequence(weakEdgesFutures)
deleteAll <- Future.sequence(deleteAllFutures)
strong <- Future.sequence(strongEdgesFutures)
} yield {
(deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2))
}
}
override def mutateElements(elements: Seq[GraphElement],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeBuffer = ArrayBuffer[(S2EdgeLike, Int)]()
val vertexBuffer = ArrayBuffer[(S2VertexLike, Int)]()
elements.zipWithIndex.foreach {
case (e: S2EdgeLike, idx: Int) => edgeBuffer.append((e, idx))
case (v: S2VertexLike, idx: Int) => vertexBuffer.append((v, idx))
case any@_ => logger.error(s"Unknown type: ${any}")
}
val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result =>
edgeBuffer.map(_._2).zip(result)
}
val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result =>
vertexBuffer.map(_._2).zip(result)
}
val graphFuture = for {
edgesMutated <- edgeFutureWithIdx
verticesMutated <- vertexFutureWithIdx
} yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2)
graphFuture
}
override def getEdges(q: Query): Future[StepResult] = {
Try {
if (q.steps.isEmpty) {
// TODO: this should be get vertex query.
fallback
} else {
val filterOutFuture = q.queryOption.filterOutQuery match {
case None => fallback
case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery)
}
for {
stepResult <- traversalHelper.getEdgesStepInner(q)
filterOutInnerResult <- filterOutFuture
} yield {
if (filterOutInnerResult.isEmpty) stepResult
else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult)
}
}
} recover {
case e: Exception =>
logger.error(s"getEdgesAsync: $e", e)
fallback
} get
}
override def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
Try {
if (mq.queries.isEmpty) fallback
else {
val filterOutFuture = mq.queryOption.filterOutQuery match {
case None => fallback
case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery)
}
val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
for {
multiQueryResults <- multiQueryFutures
filterOutInnerResult <- filterOutFuture
} yield {
StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult)
}
}
} recover {
case e: Exception =>
logger.error(s"getEdgesAsync: $e", e)
fallback
} get
}
override def deleteAllAdjacentEdges(srcVertices: Seq[S2VertexLike],
labels: Seq[Label],
dir: Int,
ts: Long): Future[Boolean] = {
val requestTs = ts
val vertices = srcVertices
/* create query per label */
val queries = for {
label <- labels
} yield {
val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir),
offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw)
val step = Step(List(queryParam))
Query(vertices, Vector(step))
}
val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
traversalHelper.fetchAndDeleteAll(queries, requestTs)
} { case (allDeleted, deleteSuccess) =>
allDeleted && deleteSuccess
}.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
retryFuture onFailure {
case ex =>
logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
}
retryFuture
}
override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
val edgesWithIdx = edges.zipWithIndex
val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
getEdgeMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
ls.flatten.toSeq.sortBy(_._2).map(_._1)
}
}
override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
val label = edge.innerLabel
val mutator = getEdgeMutator(label)
mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal)
}
override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
val queryParam = VertexQueryParam(vertexIds = Seq(vertexId))
Await.result(getVertices(queryParam).map { vertices => vertices.headOption }, WaitTimeout)
}
override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = {
Await.result(traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout)
}
override def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = {
val labelNameWithDirs =
if (labelNames.isEmpty) {
// TODO: Let's clarify direction
if (direction == Direction.BOTH) {
Label.findBySrcColumnId(vertex.id.colId).map(l => l.label -> Direction.OUT.name) ++
Label.findByTgtColumnId(vertex.id.colId).map(l => l.label -> Direction.IN.name)
} else if (direction == Direction.IN) {
Label.findByTgtColumnId(vertex.id.colId).map(l => l.label -> direction.name)
} else {
Label.findBySrcColumnId(vertex.id.colId).map(l => l.label -> direction.name)
}
} else {
direction match {
case Direction.BOTH =>
Seq(Direction.OUT, Direction.IN).flatMap { dir => labelNames.map(_ -> dir.name()) }
case _ => labelNames.map(_ -> direction.name())
}
}
traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs.distinct)
}
def isRunning(): Boolean = running.get()
}