blob: 5db02ccb1c66c0a050ceb9446d38c0da16f1b6d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.storage.rocks
import java.util.Base64
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
import org.apache.s2graph.core.schema.{Label, ServiceColumn}
import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
import org.apache.s2graph.core.storage.serde.StorageSerializable
import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe}
import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.rocksdb.RocksDB
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
class RocksStorageReadable(val graph: S2GraphLike,
val config: Config,
val db: RocksDB,
val vdb: RocksDB,
val serDe: StorageSerDe,
override val io: StorageIO) extends StorageReadable {
private val table = Array.emptyByteArray
private val qualifier = Array.emptyByteArray
private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = {
queryRequest.queryParam.tgtVertexInnerIdOpt match {
case None => // indexEdges
val queryParam = queryRequest.queryParam
val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
val labelWithDirBytes = indexEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
val (startKey, stopKey) =
if (queryParam.intervalOpt.isDefined) {
val _startKey = queryParam.cursorOpt match {
case Some(cursor) => Base64.getDecoder.decode(cursor)
case None => Bytes.add(baseKey, intervalMaxBytes)
}
(_startKey, Bytes.add(baseKey, intervalMinBytes))
} else {
val _startKey = queryParam.cursorOpt match {
case Some(cursor) => Base64.getDecoder.decode(cursor)
case None => baseKey
}
(_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
}
Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit))
case Some(tgtId) => // snapshotEdge
val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head
Left(GetRequest(SKeyValue.EdgeCf, kv.row))
}
}
private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = {
val startKey = vertex.id.bytes
val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue))
Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue))
// val kv = serDe.vertexSerializer(vertex).toKeyValues.head
// Left(GetRequest(SKeyValue.VertexCf, kv.row))
}
override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
val futures = for {
queryRequest <- queryRequests
} yield {
val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil)
val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
val rpc = buildRequest(queryRequest, edge)
fetchKeyValues(rpc).map { kvs =>
val queryParam = queryRequest.queryParam
val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges)
val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore =>
val edge = edgeWithScore.edge
val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue))
edge.ts >= duration._1 && edge.ts < duration._2
}
stepResult.copy(edgeWithScores = edgeWithScores)
}
}
Future.sequence(futures)
}
private def fetchKeyValues(rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
rpc match {
case Left(GetRequest(cf, key)) =>
val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
val v = _db.get(key)
val kvs =
if (v == null) Seq.empty
else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis()))
Future.successful(kvs)
case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) =>
val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
val kvs = new ArrayBuffer[SKeyValue]()
val iter = _db.newIterator()
try {
var idx = 0
iter.seek(startKey)
val (startOffset, len) = (offset, limit)
while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) {
if (idx >= startOffset) {
kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis())
}
iter.next()
idx += 1
}
} finally {
iter.close()
}
Future.successful(kvs)
}
}
override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
val edges = new ArrayBuffer[S2EdgeLike]()
Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
val distinctLabels = labels.toSet
val iter = db.newIterator()
try {
iter.seekToFirst()
while (iter.isValid) {
val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
.filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
.foreach { edge =>
edges += edge
}
iter.next()
}
} finally {
iter.close()
}
}
Future.successful(edges)
}
override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
import scala.collection.mutable
val vertices = new ArrayBuffer[S2VertexLike]()
ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) =>
val distinctColumns = columns.toSet
val iter = vdb.newIterator()
val buffer = mutable.ListBuffer.empty[SKeyValue]
var oldVertexIdBytes = Array.empty[Byte]
var minusPos = 0
try {
iter.seekToFirst()
while (iter.isValid) {
val row = iter.key()
if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) {
if (buffer.nonEmpty)
serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
.filter(v => distinctColumns(v.serviceColumn))
.foreach { vertex =>
vertices += vertex
}
oldVertexIdBytes = row
minusPos = 1
buffer.clear()
}
val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis())
buffer += kv
iter.next()
}
if (buffer.nonEmpty)
serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
.filter(v => distinctColumns(v.serviceColumn))
.foreach { vertex =>
vertices += vertex
}
} finally {
iter.close()
}
}
Future.successful(vertices)
}
override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
fetchKeyValues(buildRequest(queryRequest, edge))
}
override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = {
fetchKeyValues(buildRequest(queryRequest, vertex))
}
}