* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.s2graph.loader.subscriber
import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.s2graph.spark.spark.SparkApp
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.hbase.async.PutRequest
import play.api.libs.json.Json
import scala.collection.JavaConversions._
object TransferToHFile extends SparkApp {
var options:GraphFileOptions = _
case class GraphFileOptions(input: String = "",
tmpPath: String = s"/tmp/bulkload",
zkQuorum: String = "",
tableName: String = "",
dbUrl: String = "",
dbUser: String = "",
dbPassword: String = "",
maxHFilePerRegionServer: Int = 1,
numRegions: Int = 3,
labelMapping: Map[String, String] = Map.empty[String, String],
autoEdgeCreate: Boolean = false,
buildDegree: Boolean = false,
incrementalLoad: Boolean = false,
compressionAlgorithm: String = "NONE") {
def toConfigParams = {
"hbase.zookeeper.quorum" -> zkQuorum,
"db.default.url" -> dbUrl,
"db.default.user" -> dbUser,
"db.default.password" -> dbPassword
val parser = new scopt.OptionParser[GraphFileOptions]("run") {
opt[String]('i', "input").required().action( (x, c) =>
c.copy(input = x) ).text("hdfs path for tsv file(bulk format)")
opt[String]('m', "tmpPath").required().action( (x, c) =>
c.copy(tmpPath = x) ).text("temp hdfs path for storing HFiles")
opt[String]('z', "zkQuorum").required().action( (x, c) =>
c.copy(zkQuorum = x) ).text("zookeeper config for hbase")
opt[String]('t', "table").required().action( (x, c) =>
c.copy(tableName = x) ).text("table name for this bulk upload.")
opt[String]('c', "dbUrl").required().action( (x, c) =>
c.copy(dbUrl = x)).text("jdbc connection url.")
opt[String]('u', "dbUser").required().action( (x, c) =>
c.copy(dbUser = x)).text("database user name.")
opt[String]('p', "dbPassword").required().action( (x, c) =>
c.copy(dbPassword = x)).text("database password.")
opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer."
opt[Int]('n', "numRegions").action ( (x, c) =>
c.copy(numRegions = x)).text("total numRegions(pre-split size) on table."
opt[String]('l', "labelMapping").action( (x, c) =>
c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)")
opt[Boolean]('d', "buildDegree").action( (x, c) =>
c.copy(buildDegree = x)).text("generate degree values")
opt[Boolean]('a', "autoEdgeCreate").action( (x, c) =>
c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
opt[Boolean]('c', "incrementalLoad").action( (x, c) =>
c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not."
//TODO: Process AtomicIncrementRequest too.
/** build key values */
case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)
private def toLabelMapping(lableMapping: String): Map[String, String] = {
(for {
token <- lableMapping.split(",")
inner = token.split(":") if inner.length == 2
} yield {
(inner.head, inner.last)
private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = {
val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) }
def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], edgeAutoCreate: Boolean) = {
val filtered = msgs.filter { case msg =>
val tokens = GraphUtil.split(msg)
tokens(2) == "e" || tokens(2) == "edge"
for {
msg <- filtered
tokens = GraphUtil.split(msg)
tempDirection = if (tokens.length == 7) "out" else tokens(7)
direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
reverseDirection = if (direction == "out") "in" else "out"
convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5))
(vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4))
degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction)
degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, reverseDirection)
extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil
output <- List(degreeKey -> 1L) ++ extra
} yield output
def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList { kv => new PutRequest(kv.table, kv.row,, kv.qualifier, kv.value, kv.timestamp) }
def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList { kv => new PutRequest(kv.table, kv.row,, kv.qualifier, kv.value, kv.timestamp) }
def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = {
val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB."))
val dir = GraphUtil.directions(direction)
val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
throw new RuntimeException(s"$vertexId can not be converted into innerval")
val vertex = GraphSubscriberHelper.builder.newVertex(SourceVertexId(label.srcColumn, innerVal))
val ts = System.currentTimeMillis()
val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs)
edge.edgesWithIndex.flatMap { indexEdge =>
GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge) { kv =>
new PutRequest(kv.table, kv.row,, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp)
def toKeyValues(degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = {
val kvs = for {
(key, value) <- degreeKeyVals
putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, key.direction, value)
} yield {
val p = putRequest
val kv = new KeyValue(p.key(),, p.qualifier, p.timestamp, p.value)
def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
val kvList = new java.util.ArrayList[KeyValue]
for (s <- strs) {
val elementList = GraphSubscriberHelper.g.elementBuilder.toGraphElement(s, labelMapping).toSeq
for (element <- elementList) {
if (element.isInstanceOf[S2Edge]) {
val edge = element.asInstanceOf[S2Edge]
val putRequestList = insertBulkForLoaderAsync(edge, autoEdgeCreate)
for (p <- putRequestList) {
val kv = new KeyValue(p.key(),, p.qualifier, p.timestamp, p.value)
} else if (element.isInstanceOf[S2Vertex]) {
val vertex = element.asInstanceOf[S2Vertex]
val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).serDe.vertexSerializer(vertex) { kv =>
new PutRequest(kv.table, kv.row,, kv.qualifier, kv.value, kv.timestamp)
for (p <- putRequestList) {
val kv = new KeyValue(p.key(),, p.qualifier, p.timestamp, p.value)
def generateKeyValues(sc: SparkContext,
s2Config: Config,
input: RDD[String],
graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
val kvs = input.mapPartitions { iter =>
toKeyValues(iter.toSeq, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate)
if (!graphFileOptions.buildDegree) kvs
else {
kvs ++ buildDegrees(input, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) =>
agg + current
}.mapPartitions { iter =>
def generateHFile(sc: SparkContext,
s2Config: Config,
kvs: RDD[KeyValue],
options: GraphFileOptions): Unit = {
val hbaseConfig = toHBaseConfig(options)
val startKeys =
if (options.incrementalLoad) {
// need hbase connection to existing table to figure out the ranges of regions.
getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName))
} else {
// otherwise we do not need to initialize Connection to hbase cluster.
// only numRegions determine region's pre-split.
getStartKeys(numRegions = options.numRegions)
val hbaseSc = new HBaseContext(sc, hbaseConfig)
def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
val v = CellUtil.cloneValue(kv)
Seq((k -> v)).toIterator
val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase
val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass,, 32768,
val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions)
hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, flatMap, options.tmpPath, familyOptionsMap)
def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): Array[Array[Byte]] = {
val conn = ConnectionFactory.createConnection(hbaseConfig)
val regionLocator = conn.getRegionLocator(tableName)
def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}")
def getStartKeys(numRegions: Int): Array[Array[Byte]] = {
val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
if (numRegions < 3) {
throw new IllegalArgumentException("Must create at least three regions")
} else if (Bytes.compareTo(startKey, endKey) >= 0) {
throw new IllegalArgumentException("Start key must be smaller than end key")
val empty = new Array[Byte](0)
if (numRegions == 3) {
Array(empty, startKey, endKey)
} else {
val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, numRegions - 3)
if (splitKeys == null || splitKeys.length != numRegions - 1) {
throw new IllegalArgumentException("Unable to split key range into enough regions")
Array(empty) ++ splitKeys.toSeq
override def run() = {
parser.parse(args, GraphFileOptions()) match {
case Some(o) => options = o
case None =>
throw new IllegalArgumentException("failed to parse options...")
println(s">>> Options: ${options}")
val s2Config = Management.toConfig(options.toConfigParams)
val conf = sparkConf(s"TransferToHFile")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "24")
val sc = new SparkContext(conf)
val rdd = sc.textFile(options.input)
val merged = TransferToHFile.generateKeyValues(sc, s2Config, rdd, options)
generateHFile(sc, s2Config, merged, options)
// /* set up hbase init */
// val hbaseSc = new HBaseContext(sc, toHBaseConfig(options))
// def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
// val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
// val v = CellUtil.cloneValue(kv)
// Seq((k -> v)).toIterator
// }
// val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
//, 32768,
// val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions)
// val startKeys = getStartKeys(numRegions = options.numRegions)
// hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), startKeys, flatMap, options.tmpPath, familyOptionsMap)