blob: 580acd721f86a62865f1252224ddc3a82d963c2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core.storage.serde.snapshotedge.tall
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.types._
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.utils.logger
class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] {
val builder = graph.elementBuilder
def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
val statusCode = byte >> 4
val op = byte & ((1 << 4) - 1)
(statusCode.toByte, op.toByte)
}
override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
cacheElementOpt: Option[SnapshotEdge]): Option[SnapshotEdge] = {
try {
val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
assert(kvs.size == 1)
val kv = kvs.head
val version = kv.timestamp
var pos = 0
val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, HBaseType.DEFAULT_VERSION)
pos += srcIdLen
val isTallSchema = pos + 5 != kv.row.length
var tgtVertexId = TargetVertexId(ServiceColumn.Default, srcVertexId.innerId)
if (isTallSchema) {
val (tgtId, tgtBytesLen) = InnerVal.fromBytes(kv.row, pos, kv.row.length, HBaseType.DEFAULT_VERSION)
tgtVertexId = TargetVertexId(ServiceColumn.Default, tgtId)
pos += tgtBytesLen
}
val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
pos += 4
val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
pos += 1
if (!isInverted) None
else {
val label = Label.findById(labelWithDir.labelId)
val schemaVer = label.schemaVersion
// val srcVertexId = SourceVertexId(ServiceColumn.Default, srcIdAndTgtId.srcInnerId)
// val tgtVertexId = SourceVertexId(ServiceColumn.Default, tgtId.tgtInnerId)
var pos = 0
val (statusCode, op) = statusCodeWithOp(kv.value(pos))
pos += 1
val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
val kvsMap = props.toMap
val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
val ts = tsInnerVal.toString.toLong
pos = endAt
val _pendingEdgeOpt =
if (pos == kv.value.length) None
else {
val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
pos += 1
// val versionNum = Bytes.toLong(kv.value, pos, 8)
// pos += 8
val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
pos = endAt
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
builder.newEdge(
builder.newVertex(srcVertexId, version),
builder.newVertex(tgtVertexId, version),
label, labelWithDir.dir, pendingEdgeOp,
version, pendingEdgeProps.toMap,
statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
Option(pendingEdge)
}
val snapshotEdge = builder.newSnapshotEdge(
builder.newVertex(srcVertexId, ts),
builder.newVertex(tgtVertexId, ts),
label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
Option(snapshotEdge)
}
} catch {
case e: Exception =>
logger.error("#" * 100, e)
None
}
}
}