package subscriber
import com.daumkakao.s2graph.core._
import com.daumkakao.s2graph.core.Graph
import com.typesafe.config.{Config, ConfigFactory}
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTableInterface, HConnection, HConnectionManager}
import org.apache.spark.{Accumulable, SparkContext}
import s2.spark.{HashMapParam, SparkApp, WithKafka}
import subscriber.parser.{GraphParser, GraphParsers}
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.ExecutionContext
object GraphConfig {
var database = ""
var zkQuorum = ""
var kafkaBrokers = ""
def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], kafkaBrokerList: Option[String]) = {
database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph")
zkQuorum = zkAddr.getOrElse("localhost")
kafkaBrokers = kafkaBrokerList.getOrElse("localhost:9092")
val s = s"""
|# Logger used by the framework:
|# Logger provided to your application:
|# DB
|# Query server
|# Local Cache
|# Kafka
| """.stripMargin
object GraphSubscriberHelper extends WithKafka {
type HashMapAccumulable = Accumulable[HashMap[String, Long], (String, Long)]
lazy val producer = new Producer[String, String](kafkaConf(GraphConfig.kafkaBrokers))
var config: Config = _
private val writeBufferSize = 1024 * 1024 * 8
private val sleepPeriod = 10000
private val maxTryNum = 10
def toOption(s: String) = {
s match {
case "" | "none" => None
case _ => Some(s)
def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String) : Unit = {
config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList))
// def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit = {
// Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(
// }
def report(key: String, value: Option[String], topic: String = "report") = {
val msg = Seq(Some(key), value).flatten.mkString("\t")
val kafkaMsg = new KeyedMessage[String, String](topic, msg)
* bulkMutates read connection and table info from database.
def store(msgs: Seq[String], labelToReplace: Option[String] = None)(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
val counts = HashMap[String, Long]()
val statFunc = storeStat(counts)(mapAccOpt)_
val elements = (for (msg <- msgs) yield {
statFunc("total", 1)
val element = Graph.toGraphElement(msg, labelToReplace)
element match {
case Some(e) =>
statFunc("parseOk", 1)
case None =>
statFunc("errorParsing", 1)
* fetch cluster, table from database.
try {
Graph.bulkMutates(elements, mutateInPlace = true)
statFunc("store", elements.size)
} catch {
case e: Throwable =>
statFunc("storeFailed", elements.size)
println(s"[Exception]: $e")
throw e
def storeBulk(conn: HConnection, tableName: String)(msgs: Seq[String], labelToReplace: Option[String] = None)(mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
val counts = HashMap[String, Long]()
val statFunc = storeStat(counts)(mapAccOpt) _
val edges = (for (msg <- msgs) yield {
statFunc("total", 1)
try {
Graph.toGraphElement(msg, labelToReplace) match {
case Some(e) if e.isInstanceOf[Edge] =>
statFunc("parseOk", 1)
case None =>
statFunc("errorParsing", 1)
} catch {
case e: Throwable =>
System.err.println(s"$msg $e", e)
* don't use database for connection and table.
// throw all exception to caller.
def storeRec(edges: List[Edge], tryNum: Int = maxTryNum): Unit = {
if (tryNum <= 0) {
statFunc("errorStore", edges.size)
throw new RuntimeException(s"retry failed after $maxTryNum")
val table = conn.getTable(tableName)
table.setAutoFlush(false, false)
try {
// we only care about insertBulk
val puts = edges.flatMap { edge =>
edge.relatedEdges.flatMap { relatedEdge =>
relatedEdge.insertBulk() ++ relatedEdge.buildVertexPuts()
statFunc("storeOk", msgs.size)
} catch {
case e: Throwable =>
storeRec(edges, tryNum - 1)
} finally {
def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = {
counts.put(key, counts.getOrElse(key, 0L) + value)
mapAccOpt match {
case None =>
case Some(mapAcc) => mapAcc += (key -> value)
object GraphSubscriber extends SparkApp with WithKafka {
val sleepPeriod = 5000
val usages =
| * this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API.
| * params:
| * 1. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix
| * 2. dbUrl: jdbc database connection string to specify database for meta.
| * 3. originalLabelName: label to copy with.
| * 4. newLabelName: label field will be replaced to this value.
| * 5. zkQuorum: target hbase zkQuorum where this job will publish data to.
| * 6. hTableName: target hbase physical table name where this job will publish data to.
| * 7. batchSize: how many edges will be batched for Put request to target hbase.
| * 8. kafkaBrokerList: using kafka as fallback queue. when something goes wrong during batch, data needs to be replay will be stored in kafka.
| * 9. kafkaTopic: fallback queue topic.
| * after this job finished, s2graph will have data with sequence corresponding newLabelName.
| * change this newLabelName to ogirinalName if you want to online replace of label.
| *
| */
override def run() = {
* Main function
if (args.length != 9) {
val hdfsPath = args(0)
val dbUrl = args(1)
val oldLabelName = args(2)
val newLabelName = args(3)
val zkQuorum = args(4)
val hTableName = args(5)
val batchSize = args(6).toInt
val kafkaBrokerList = args(7)
val kafkaTopic = args(8)
val conf = sparkConf(s"$hdfsPath: GraphSubscriber")
val sc = new SparkContext(conf)
val mapAcc = sc.accumulable(HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _))
try {
import GraphSubscriberHelper._
// set local driver setting.
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
/** copy when oldLabel exist and newLabel done exist. otherwise ignore. */
Management.copyLabel(oldLabelName, newLabelName, toOption(hTableName))
val msgs = sc.textFile(hdfsPath)
msgs.foreachPartition(partition => {
// set executor setting.
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", zkQuorum)
val conn = HConnectionManager.createConnection(conf)
partition.grouped(batchSize).foreach { msgs =>
try {
val start = System.currentTimeMillis()
// val counts =
//, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
val counts =
GraphSubscriberHelper.storeBulk(conn, hTableName)(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
for ((k, v) <- counts) {
mapAcc += (k, v)
val duration = System.currentTimeMillis() - start
println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName")
} catch {
case e: Throwable =>
println(s"[Failed]: store $e")
msgs.foreach { msg =>, Some(e.getMessage()), topic = kafkaTopic)
logInfo(s"counter: $mapAcc")
println(s"Stats: ${mapAcc}")
} catch {
case e: Throwable =>
println(s"job failed with exception: $e")
throw e