test cases passed.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index 1818d10..3d3821e 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hbase.util.{Base64, Bytes}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.util.ToolRunner
+import org.apache.s2graph.core.S2GraphConfigs
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
@@ -55,7 +56,7 @@
   def toHBaseConfig(zkQuorum: String, tableName: String): Configuration = {
     val hbaseConf = HBaseConfiguration.create()
 
-    hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
+    hbaseConf.set(S2GraphConfigs.HBaseConfigs.HBASE_ZOOKEEPER_QUORUM, zkQuorum)
     hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 
     hbaseConf
@@ -164,7 +165,6 @@
            restorePath: String,
            tableNames: Seq[String],
            columnFamily: String = "e",
-           elementType: String = "IndexEdge",
            batchSize: Int = 1000,
            labelMapping: Map[String, String] = Map.empty,
            buildDegree: Boolean = false): RDD[Seq[Cell]] = {
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 8c88ee8..6a84e00 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -299,7 +299,7 @@
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
     else {
-      conf.options.getOrElse("writeMethod", "mutate") match {
+      conf.options.getOrElse(S2_SINK_WRITE_METHOD, "mutate") match {
         case "mutate" => writeBatchWithMutate(df)
         case "bulk" =>
           val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 9c80e58..e848ab3 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -112,28 +112,33 @@
 }
 
 class S2GraphSource(conf: TaskConf) extends Source(conf) {
-
-  override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names")
+  import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
+  override def mandatoryOptions: Set[String] = Set(
+    S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR,
+    S2_SOURCE_BULKLOAD_RESTORE_PATH,
+    S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES
+  )
 
   override def toDF(ss: SparkSession): DataFrame = {
     val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++
       TaskConf.parseLocalCacheConfigs(conf)
     val config = Management.toConfig(mergedConf)
 
-    val snapshotPath = conf.options("hbase.rootdir")
-    val restorePath = conf.options("restore.path")
-    val tableNames = conf.options("hbase.table.names").split(",")
-    val columnFamily = conf.options.getOrElse("hbase.table.cf", "e")
-    val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt
+    val snapshotPath = conf.options(S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR)
+    val restorePath = conf.options(S2_SOURCE_BULKLOAD_RESTORE_PATH)
+    val tableNames = conf.options(S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES).split(",")
+    val columnFamily = conf.options.getOrElse(S2_SOURCE_BULKLOAD_HBASE_TABLE_CF, "e")
+    val batchSize = conf.options.getOrElse(S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE, "1000").toInt
+
     val labelMapping = Map.empty[String, String]
     val buildDegree =
       if (columnFamily == "v") false
-      else conf.options.getOrElse("build.degree", "false").toBoolean
-    val elementType = conf.options.getOrElse("element.type", "IndexEdge")
+      else conf.options.getOrElse(S2_SOURCE_BULKLOAD_BUILD_DEGREE, "false").toBoolean
+    val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge")
     val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
 
     val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
-      restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree)
+      restorePath, tableNames, columnFamily, batchSize, labelMapping, buildDegree)
 
     implicit val reader = new S2GraphCellReader(elementType)
     implicit val writer = new RowDataFrameWriter
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..51e13f0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -19,27 +19,28 @@
 
 package org.apache.s2graph.s2jobs.task
 
+import org.apache.s2graph.core.S2GraphConfigs
 import org.apache.s2graph.s2jobs.Logger
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+//import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
 object TaskConf {
-  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
-    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
-      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
-
-    GraphFileOptions.toOption(args)
-  }
+//  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+//    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
+//      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+//
+//    GraphFileOptions.toOption(args)
+//  }
 
   def parseHBaseConfigs(taskConf: TaskConf): Map[String, Any] = {
-    taskConf.options.filterKeys(_.startsWith("hbase."))
+    taskConf.options.filterKeys(S2GraphConfigs.HBaseConfigs.DEFAULTS.keySet)
   }
 
   def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, Any] = {
-    taskConf.options.filterKeys(_.startsWith("db."))
+    taskConf.options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet)
   }
 
   def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
-    taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
+    taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt)
   }
 }
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
index d778478..561a810 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
@@ -24,23 +24,14 @@
 import scala.util.Try
 
 object S2SinkConfigs {
-  // Common
-
-  // meta storage Configurations.
-//  val DB_DEFAULT_DRIVER = "db.default.driver"
-//  val DB_DEFAULT_URL = "db.default.url"
-//  val DB_DEFAULT_PASSWORD = "db.default.password"
-//  val DB_DEFAULT_USER = "db.default.user"
-//
-//  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
-
-
   val DEFAULT_GROUPED_SIZE = "100"
   val DEFAULT_WAIT_TIME_SECONDS = "5"
 
   val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink"
   val S2_SINK_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.sink"
 
+  val S2_SINK_WRITE_METHOD = s"$S2_SINK_PREFIX.writeMethod"
+
   val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname"
   val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath"
   val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation"
@@ -58,6 +49,7 @@
   val S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD = s"$S2_SINK_BULKLOAD_PREFIX.hbase.incrementalLoad"
   val S2_SINK_BULKLOAD_HBASE_COMPRESSION = s"$S2_SINK_BULKLOAD_PREFIX.hbase.compression"
 
+  //
   val S2_SINK_BULKLOAD_AUTO_EDGE_CREATE= s"$S2_SINK_BULKLOAD_PREFIX.auto.edge.create"
   val S2_SINK_BULKLOAD_BUILD_DEGREE = s"$S2_SINK_BULKLOAD_PREFIX.build.degree"
   val S2_SINK_BULKLOAD_LABEL_MAPPING = s"$S2_SINK_BULKLOAD_PREFIX.label.mapping"
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala
new file mode 100644
index 0000000..d4553b0
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala
@@ -0,0 +1,21 @@
+package org.apache.s2graph.spark.sql.streaming
+
+object S2SourceConfigs {
+  val S2_SOURCE_PREFIX = "s2.spark.sql.streaming.source"
+  val S2_SOURCE_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.source"
+
+  //
+  // vertex/indexedge/snapshotedge
+  val S2_SOURCE_ELEMENT_TYPE = s"$S2_SOURCE_PREFIX.element.type"
+
+  // HBASE HFILE BULK
+  val S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.rootdir"
+  val S2_SOURCE_BULKLOAD_RESTORE_PATH = s"$S2_SOURCE_BULKLOAD_PREFIX.restore.path"
+  val S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.names"
+  val S2_SOURCE_BULKLOAD_HBASE_TABLE_CF = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.cf"
+  val S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE = s"$S2_SOURCE_BULKLOAD_PREFIX.scan.batch.size"
+
+  // BULKLOAD
+  val S2_SOURCE_BULKLOAD_BUILD_DEGREE = s"$S2_SOURCE_BULKLOAD_PREFIX.build.degree"
+
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index f97dce4..d30b7a3 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -35,11 +35,27 @@
 
 class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
   import org.apache.s2graph.core.S2GraphConfigs._
-  import S2SinkConfigs._
+
   protected val bulkloadOptions = new TaskConf("bulkloadOptions", "test", options = Map(
-    DBConfigs.DEFAULT_DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL"
+    DBConfigs.DB_DEFAULT_DRIVER -> "org.h2.Driver",
+    DBConfigs.DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL",
+    DBConfigs.DB_DEFAULT_USER -> "sa",
+    DBConfigs.DB_DEFAULT_PASSWORD -> "sa",
+    HBaseConfigs.HBASE_ZOOKEEPER_QUORUM -> "localhost",
+
+    S2SinkConfigs.S2_SINK_WRITE_METHOD -> "bulk",
+
+    S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TABLE_NAME -> "s2graph",
+    S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_NUM_REGIONS -> "3",
+    S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TEMP_DIR -> "/tmp/bulkload_tmp",
+    S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD -> "true",
+    S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_COMPRESSION -> "NONE",
+
+    S2SinkConfigs.S2_SINK_BULKLOAD_AUTO_EDGE_CREATE -> "false",
+    S2SinkConfigs.S2_SINK_BULKLOAD_BUILD_DEGREE -> "false",
+    S2SinkConfigs.S2_SINK_BULKLOAD_LABEL_MAPPING -> ""
   ))
-  protected val options = GraphFileOptions(
+  protected val options: GraphFileOptions = GraphFileOptions(
     input = "/tmp/test.txt",
     tempDir = "/tmp/bulkload_tmp",
     output = "/tmp/s2graph_bulkload",
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index 6b21cfc..0b2f5fa 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -22,14 +22,14 @@
 import org.apache.s2graph.s2jobs.task.TaskConf
 
 class S2GraphHelperTest extends BaseSparkTest {
-  test("toGraphFileOptions") {
-    val args = options.toCommand.grouped(2).map { kv =>
-          kv.head -> kv.last
-      }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
-
-    println(args)
-    val taskConf = TaskConf("dummy", "sink", Nil, args)
-    val graphFileOptions = TaskConf.toGraphFileOptions(taskConf)
-    println(graphFileOptions)
-  }
+//  test("toGraphFileOptions") {
+//    val args = options.toCommand.grouped(2).map { kv =>
+//          kv.head -> kv.last
+//      }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
+//
+//    println(args)
+//    val taskConf = TaskConf("dummy", "sink", Nil, args)
+//    val graphFileOptions = TaskConf.toGraphFileOptions(taskConf)
+//    println(graphFileOptions)
+//  }
 }
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index c4a9033..3918c84 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -50,12 +50,7 @@
   test("S2graphSink writeBatchWithBulkload") {
     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
     val df = toDataFrame(Seq(bulkEdgeString))
-    val args = Map("writeMethod" -> "bulk") ++
-      options.toCommand.grouped(2).map { kv =>
-        kv.head -> kv.last
-      }.toMap
-
-    val conf = TaskConf("test", "sql", Seq("input"), args)
+    val conf = bulkloadOptions
 
     val sink = new S2GraphSink("testQuery", conf)
     sink.write(df)
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
index 4715bd6..0c2cbb7 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -49,13 +49,7 @@
 
     val input = df.collect().flatMap(reader.read(s2)(_))
 
-    val args = options.toCommand.grouped(2).map { kv =>
-      kv.head -> kv.last
-    }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> "true")
-
-    val conf = TaskConf("test", "sql", Seq("input"), args)
-
-    val sink = new S2GraphSink("testQuery", conf)
+    val sink = new S2GraphSink("testQuery", bulkloadOptions)
     sink.write(df)
 
     // 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat.
@@ -70,15 +64,20 @@
     }
 
     // 3. Decode S2GraphSource to parse HFile
-    val metaAndHBaseArgs = options.toConfigParams
+    val metaAndHBaseArgs = {
+      TaskConf.parseHBaseConfigs(bulkloadOptions) ++
+        TaskConf.parseMetaStoreConfigs(bulkloadOptions)
+    }.mapValues(_.toString)
+
     val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration)
+    import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
 
     val dumpArgs = Map(
-      "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"),
-      "restore.path" -> "/tmp/hbase_restore",
-      "hbase.table.names" -> s"${snapshotTableName}",
-      "hbase.table.cf" -> columnFamily,
-      "element.type" -> elementType
+      S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR -> hbaseConfig.get("hbase.rootdir"),
+      S2_SOURCE_BULKLOAD_RESTORE_PATH -> "/tmp/hbase_restore",
+      S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES -> s"${snapshotTableName}",
+      S2_SOURCE_BULKLOAD_HBASE_TABLE_CF -> columnFamily,
+      S2_SOURCE_ELEMENT_TYPE -> elementType
     ) ++ metaAndHBaseArgs
 
     val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)