blob: 0748efb891bcdb5e29b81609aeb9e413f7bbd9fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.storage.serde
import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.utils.logger
import scala.concurrent.{ExecutionContext, Future}
class MutationHelper(storage: Storage) {
val serDe = storage.serDe
val io = storage.io
val fetcher = storage.fetcher
val mutator = storage.mutator
val conflictResolver = storage.conflictResolver
private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
mutator.writeToStorage(cluster, kvs, withWait)
def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
requestTs: Long,
retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
if (stepInnerResult.isEmpty) Future.successful(true)
else {
val head = stepInnerResult.edgeWithScores.head
val zkQuorum = head.edge.innerLabel.hbaseZkAddr
val futures = for {
edgeWithScore <- stepInnerResult.edgeWithScores
} yield {
val edge = edgeWithScore.edge
val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
io.buildIncrementsAsync(indexEdge, -1L)
}
/* reverted direction */
val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
io.buildIncrementsAsync(indexEdge, -1L)
}
val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
writeToStorage(zkQuorum, mutations, withWait = true)
}
Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
}
}
def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
if (vertex.op == GraphUtil.operations("delete")) {
writeToStorage(zkQuorum,
serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
} else if (vertex.op == GraphUtil.operations("deleteAll")) {
logger.info(s"deleteAll for vertex is truncated. $vertex")
Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
} else {
writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
}
}
def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
val mutations = _edges.flatMap { edge =>
val (_, edgeUpdate) =
if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
else S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
}
writeToStorage(zkQuorum, mutations, withWait).map { ret =>
_edges.zipWithIndex.map { case (edge, idx) =>
idx -> ret.isSuccess
}
}
}
def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
def mutateEdgesInner(edges: Seq[S2EdgeLike],
checkConsistency: Boolean,
withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
assert(edges.nonEmpty)
// TODO:: remove after code review: unreachable code
if (!checkConsistency) {
val futures = edges.map { edge =>
val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
val mutations =
io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
writeToStorage(zkQuorum, mutations, withWait)
}
Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
} else {
fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
}
}
}
val edgeWithIdxs = _edges.zipWithIndex
val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
(edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
} toSeq
val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
// After deleteAll, process others
val mutateEdgeFutures = edges.toList match {
case head :: tail =>
val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
//TODO: decide what we will do on failure on vertex put
val puts = io.buildVertexPutsAsync(head)
val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
Seq(edgeFuture, vertexFuture)
case Nil => Nil
}
val composed = for {
// deleteRet <- Future.sequence(deleteAllFutures)
mutateRet <- Future.sequence(mutateEdgeFutures)
} yield mutateRet
composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
}
Future.sequence(mutateEdges).map { squashedRets =>
squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
}
}
def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
val futures = for {
edge <- edges
} yield {
val kvs = for {
relEdge <- edge.relatedEdges
edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
} yield {
val countWithTs = edge.propertyValueInner(LabelMeta.count)
val countVal = countWithTs.innerVal.toString().toLong
io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
}
writeToStorage(zkQuorum, kvs, withWait = withWait)
}
Future.sequence(futures)
}
def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
val kvs = io.buildDegreePuts(edge, degreeVal)
mutator.writeToStorage(zkQuorum, kvs, withWait = true)
}
}