import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core._
import org.apache.s2graph.core.utils.logger
import scala.concurrent.{ExecutionContext, Future}
class MutationHelper(storage: Storage) {
val serDe = storage.serDe
val io =
val fetcher = storage.fetcher
val mutator = storage.mutator
val conflictResolver = storage.conflictResolver
private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
mutator.writeToStorage(cluster, kvs, withWait)
def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
requestTs: Long,
retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
if (stepInnerResult.isEmpty) Future.successful(true)
else {
val head = stepInnerResult.edgeWithScores.head
val zkQuorum = head.edge.innerLabel.hbaseZkAddr
val futures = for {
edgeWithScore <- stepInnerResult.edgeWithScores
} yield {
val edge = edgeWithScore.edge
val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge) = SKeyValue.Put))
val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge) = SKeyValue.Delete)) ++
io.buildIncrementsAsync(indexEdge, -1L)
/* reverted direction */
val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge) = SKeyValue.Delete)) ++
io.buildIncrementsAsync(indexEdge, -1L)
val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
writeToStorage(zkQuorum, mutations, withWait = true)
Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
if (vertex.op == GraphUtil.operations("delete")) {
serDe.vertexSerializer(vertex) = SKeyValue.Delete)), withWait)
} else if (vertex.op == GraphUtil.operations("deleteAll")) {"deleteAll for vertex is truncated. $vertex")
Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
} else {
writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
val mutations = _edges.flatMap { edge =>
val (_, edgeUpdate) =
if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
else S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
writeToStorage(zkQuorum, mutations, withWait).map { ret => { case (edge, idx) =>
idx -> ret.isSuccess
def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
def mutateEdgesInner(edges: Seq[S2EdgeLike],
checkConsistency: Boolean,
withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
// TODO:: remove after code review: unreachable code
if (!checkConsistency) {
val futures = { edge =>
val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
val mutations =
io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
writeToStorage(zkQuorum, mutations, withWait)
Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
} else {
fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
val edgeWithIdxs = _edges.zipWithIndex
val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
(edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
} toSeq
val mutateEdges = { case ((_, _, _), edgeGroup) =>
val edges =
val idxs =
// After deleteAll, process others
val mutateEdgeFutures = edges.toList match {
case head :: tail =>
val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
//TODO: decide what we will do on failure on vertex put
val puts = io.buildVertexPutsAsync(head)
val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
Seq(edgeFuture, vertexFuture)
case Nil => Nil
val composed = for {
// deleteRet <- Future.sequence(deleteAllFutures)
mutateRet <- Future.sequence(mutateEdgeFutures)
} yield mutateRet { ret => => idx -> ret) }
Future.sequence(mutateEdges).map { squashedRets =>
squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
val futures = for {
edge <- edges
} yield {
val kvs = for {
relEdge <- edge.relatedEdges
edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
} yield {
val countWithTs = edge.propertyValueInner(LabelMeta.count)
val countVal = countWithTs.innerVal.toString().toLong
io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
writeToStorage(zkQuorum, kvs, withWait = withWait)
def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
val kvs = io.buildDegreePuts(edge, degreeVal)
mutator.writeToStorage(zkQuorum, kvs, withWait = true)