test cases passed.
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index 1818d10..3d3821e 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.util.ToolRunner
+import org.apache.s2graph.core.S2GraphConfigs
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
@@ -55,7 +56,7 @@
def toHBaseConfig(zkQuorum: String, tableName: String): Configuration = {
val hbaseConf = HBaseConfiguration.create()
- hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
+ hbaseConf.set(S2GraphConfigs.HBaseConfigs.HBASE_ZOOKEEPER_QUORUM, zkQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
hbaseConf
@@ -164,7 +165,6 @@
restorePath: String,
tableNames: Seq[String],
columnFamily: String = "e",
- elementType: String = "IndexEdge",
batchSize: Int = 1000,
labelMapping: Map[String, String] = Map.empty,
buildDegree: Boolean = false): RDD[Seq[Cell]] = {
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 8c88ee8..6a84e00 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -299,7 +299,7 @@
if (inputDF.isStreaming) writeStream(df.writeStream)
else {
- conf.options.getOrElse("writeMethod", "mutate") match {
+ conf.options.getOrElse(S2_SINK_WRITE_METHOD, "mutate") match {
case "mutate" => writeBatchWithMutate(df)
case "bulk" =>
val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 9c80e58..e848ab3 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -112,28 +112,33 @@
}
class S2GraphSource(conf: TaskConf) extends Source(conf) {
-
- override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names")
+ import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
+ override def mandatoryOptions: Set[String] = Set(
+ S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR,
+ S2_SOURCE_BULKLOAD_RESTORE_PATH,
+ S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES
+ )
override def toDF(ss: SparkSession): DataFrame = {
val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++
TaskConf.parseLocalCacheConfigs(conf)
val config = Management.toConfig(mergedConf)
- val snapshotPath = conf.options("hbase.rootdir")
- val restorePath = conf.options("restore.path")
- val tableNames = conf.options("hbase.table.names").split(",")
- val columnFamily = conf.options.getOrElse("hbase.table.cf", "e")
- val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt
+ val snapshotPath = conf.options(S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR)
+ val restorePath = conf.options(S2_SOURCE_BULKLOAD_RESTORE_PATH)
+ val tableNames = conf.options(S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES).split(",")
+ val columnFamily = conf.options.getOrElse(S2_SOURCE_BULKLOAD_HBASE_TABLE_CF, "e")
+ val batchSize = conf.options.getOrElse(S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE, "1000").toInt
+
val labelMapping = Map.empty[String, String]
val buildDegree =
if (columnFamily == "v") false
- else conf.options.getOrElse("build.degree", "false").toBoolean
- val elementType = conf.options.getOrElse("element.type", "IndexEdge")
+ else conf.options.getOrElse(S2_SOURCE_BULKLOAD_BUILD_DEGREE, "false").toBoolean
+ val elementType = conf.options.getOrElse(S2_SOURCE_ELEMENT_TYPE, "IndexEdge")
val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
- restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree)
+ restorePath, tableNames, columnFamily, batchSize, labelMapping, buildDegree)
implicit val reader = new S2GraphCellReader(elementType)
implicit val writer = new RowDataFrameWriter
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ea42828..51e13f0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -19,27 +19,28 @@
package org.apache.s2graph.s2jobs.task
+import org.apache.s2graph.core.S2GraphConfigs
import org.apache.s2graph.s2jobs.Logger
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+//import org.apache.s2graph.s2jobs.loader.GraphFileOptions
object TaskConf {
- def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
- val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
- .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
-
- GraphFileOptions.toOption(args)
- }
+// def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+// val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
+// .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+//
+// GraphFileOptions.toOption(args)
+// }
def parseHBaseConfigs(taskConf: TaskConf): Map[String, Any] = {
- taskConf.options.filterKeys(_.startsWith("hbase."))
+ taskConf.options.filterKeys(S2GraphConfigs.HBaseConfigs.DEFAULTS.keySet)
}
def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, Any] = {
- taskConf.options.filterKeys(_.startsWith("db."))
+ taskConf.options.filterKeys(S2GraphConfigs.DBConfigs.DEFAULTS.keySet)
}
def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
- taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
+ taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt)
}
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
index d778478..561a810 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
@@ -24,23 +24,14 @@
import scala.util.Try
object S2SinkConfigs {
- // Common
-
- // meta storage Configurations.
-// val DB_DEFAULT_DRIVER = "db.default.driver"
-// val DB_DEFAULT_URL = "db.default.url"
-// val DB_DEFAULT_PASSWORD = "db.default.password"
-// val DB_DEFAULT_USER = "db.default.user"
-//
-// val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
-
-
val DEFAULT_GROUPED_SIZE = "100"
val DEFAULT_WAIT_TIME_SECONDS = "5"
val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink"
val S2_SINK_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.sink"
+ val S2_SINK_WRITE_METHOD = s"$S2_SINK_PREFIX.writeMethod"
+
val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname"
val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath"
val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation"
@@ -58,6 +49,7 @@
val S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD = s"$S2_SINK_BULKLOAD_PREFIX.hbase.incrementalLoad"
val S2_SINK_BULKLOAD_HBASE_COMPRESSION = s"$S2_SINK_BULKLOAD_PREFIX.hbase.compression"
+ //
val S2_SINK_BULKLOAD_AUTO_EDGE_CREATE= s"$S2_SINK_BULKLOAD_PREFIX.auto.edge.create"
val S2_SINK_BULKLOAD_BUILD_DEGREE = s"$S2_SINK_BULKLOAD_PREFIX.build.degree"
val S2_SINK_BULKLOAD_LABEL_MAPPING = s"$S2_SINK_BULKLOAD_PREFIX.label.mapping"
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala
new file mode 100644
index 0000000..d4553b0
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SourceConfigs.scala
@@ -0,0 +1,21 @@
+package org.apache.s2graph.spark.sql.streaming
+
+object S2SourceConfigs {
+ val S2_SOURCE_PREFIX = "s2.spark.sql.streaming.source"
+ val S2_SOURCE_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.source"
+
+ //
+ // vertex/indexedge/snapshotedge
+ val S2_SOURCE_ELEMENT_TYPE = s"$S2_SOURCE_PREFIX.element.type"
+
+ // HBASE HFILE BULK
+ val S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.rootdir"
+ val S2_SOURCE_BULKLOAD_RESTORE_PATH = s"$S2_SOURCE_BULKLOAD_PREFIX.restore.path"
+ val S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.names"
+ val S2_SOURCE_BULKLOAD_HBASE_TABLE_CF = s"$S2_SOURCE_BULKLOAD_PREFIX.hbase.table.cf"
+ val S2_SOURCE_BULKLOAD_SCAN_BATCH_SIZE = s"$S2_SOURCE_BULKLOAD_PREFIX.scan.batch.size"
+
+ // BULKLOAD
+ val S2_SOURCE_BULKLOAD_BUILD_DEGREE = s"$S2_SOURCE_BULKLOAD_PREFIX.build.degree"
+
+}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index f97dce4..d30b7a3 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -35,11 +35,27 @@
class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
import org.apache.s2graph.core.S2GraphConfigs._
- import S2SinkConfigs._
+
protected val bulkloadOptions = new TaskConf("bulkloadOptions", "test", options = Map(
- DBConfigs.DEFAULT_DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL"
+ DBConfigs.DB_DEFAULT_DRIVER -> "org.h2.Driver",
+ DBConfigs.DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL",
+ DBConfigs.DB_DEFAULT_USER -> "sa",
+ DBConfigs.DB_DEFAULT_PASSWORD -> "sa",
+ HBaseConfigs.HBASE_ZOOKEEPER_QUORUM -> "localhost",
+
+ S2SinkConfigs.S2_SINK_WRITE_METHOD -> "bulk",
+
+ S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TABLE_NAME -> "s2graph",
+ S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_NUM_REGIONS -> "3",
+ S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TEMP_DIR -> "/tmp/bulkload_tmp",
+ S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD -> "true",
+ S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_COMPRESSION -> "NONE",
+
+ S2SinkConfigs.S2_SINK_BULKLOAD_AUTO_EDGE_CREATE -> "false",
+ S2SinkConfigs.S2_SINK_BULKLOAD_BUILD_DEGREE -> "false",
+ S2SinkConfigs.S2_SINK_BULKLOAD_LABEL_MAPPING -> ""
))
- protected val options = GraphFileOptions(
+ protected val options: GraphFileOptions = GraphFileOptions(
input = "/tmp/test.txt",
tempDir = "/tmp/bulkload_tmp",
output = "/tmp/s2graph_bulkload",
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index 6b21cfc..0b2f5fa 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -22,14 +22,14 @@
import org.apache.s2graph.s2jobs.task.TaskConf
class S2GraphHelperTest extends BaseSparkTest {
- test("toGraphFileOptions") {
- val args = options.toCommand.grouped(2).map { kv =>
- kv.head -> kv.last
- }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
-
- println(args)
- val taskConf = TaskConf("dummy", "sink", Nil, args)
- val graphFileOptions = TaskConf.toGraphFileOptions(taskConf)
- println(graphFileOptions)
- }
+// test("toGraphFileOptions") {
+// val args = options.toCommand.grouped(2).map { kv =>
+// kv.head -> kv.last
+// }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
+//
+// println(args)
+// val taskConf = TaskConf("dummy", "sink", Nil, args)
+// val graphFileOptions = TaskConf.toGraphFileOptions(taskConf)
+// println(graphFileOptions)
+// }
}
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index c4a9033..3918c84 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -50,12 +50,7 @@
test("S2graphSink writeBatchWithBulkload") {
val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
val df = toDataFrame(Seq(bulkEdgeString))
- val args = Map("writeMethod" -> "bulk") ++
- options.toCommand.grouped(2).map { kv =>
- kv.head -> kv.last
- }.toMap
-
- val conf = TaskConf("test", "sql", Seq("input"), args)
+ val conf = bulkloadOptions
val sink = new S2GraphSink("testQuery", conf)
sink.write(df)
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
index 4715bd6..0c2cbb7 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -49,13 +49,7 @@
val input = df.collect().flatMap(reader.read(s2)(_))
- val args = options.toCommand.grouped(2).map { kv =>
- kv.head -> kv.last
- }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> "true")
-
- val conf = TaskConf("test", "sql", Seq("input"), args)
-
- val sink = new S2GraphSink("testQuery", conf)
+ val sink = new S2GraphSink("testQuery", bulkloadOptions)
sink.write(df)
// 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat.
@@ -70,15 +64,20 @@
}
// 3. Decode S2GraphSource to parse HFile
- val metaAndHBaseArgs = options.toConfigParams
+ val metaAndHBaseArgs = {
+ TaskConf.parseHBaseConfigs(bulkloadOptions) ++
+ TaskConf.parseMetaStoreConfigs(bulkloadOptions)
+ }.mapValues(_.toString)
+
val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration)
+ import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
val dumpArgs = Map(
- "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"),
- "restore.path" -> "/tmp/hbase_restore",
- "hbase.table.names" -> s"${snapshotTableName}",
- "hbase.table.cf" -> columnFamily,
- "element.type" -> elementType
+ S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR -> hbaseConfig.get("hbase.rootdir"),
+ S2_SOURCE_BULKLOAD_RESTORE_PATH -> "/tmp/hbase_restore",
+ S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES -> s"${snapshotTableName}",
+ S2_SOURCE_BULKLOAD_HBASE_TABLE_CF -> columnFamily,
+ S2_SOURCE_ELEMENT_TYPE -> elementType
) ++ metaAndHBaseArgs
val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)