blob: 0c2cbb753fb6ecba88244d586de3028e1ef1d8a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.s2jobs.task
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
import org.apache.s2graph.core.{GraphElement, S2EdgeLike, S2VertexLike}
import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper, Schema}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
class SourceTest extends BaseSparkTest {
//TODO: props to valid string.
def toDataFrame(elements: Seq[String], schema: StructType): DataFrame = {
val ls = elements.flatMap(s2.elementBuilder.toGraphElement(_)).map { element =>
S2GraphHelper.graphElementToSparkSqlRow(s2, element)
}
val rdd = spark.sparkContext.parallelize(ls)
spark.sqlContext.createDataFrame(rdd, schema)
}
def runCheck(data: Seq[String],
schema: StructType,
columnFamily: String,
elementType: String): (Seq[GraphElement], Seq[GraphElement]) = {
val snapshotTableName = options.tableName + "-snapshot"
val df = toDataFrame(data, schema)
val reader = new RowBulkFormatReader
val input = df.collect().flatMap(reader.read(s2)(_))
val sink = new S2GraphSink("testQuery", bulkloadOptions)
sink.write(df)
// 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat.
s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config) { admin =>
import scala.collection.JavaConverters._
val set = admin.listSnapshots(snapshotTableName).asScala.toList.map(_.getName).toSet
if (set(snapshotTableName))
admin.deleteSnapshot(snapshotTableName)
admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName))
}
// 3. Decode S2GraphSource to parse HFile
val metaAndHBaseArgs = {
TaskConf.parseHBaseConfigs(bulkloadOptions) ++
TaskConf.parseMetaStoreConfigs(bulkloadOptions)
}.mapValues(_.toString)
val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration)
import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
val dumpArgs = Map(
S2_SOURCE_BULKLOAD_HBASE_ROOT_DIR -> hbaseConfig.get("hbase.rootdir"),
S2_SOURCE_BULKLOAD_RESTORE_PATH -> "/tmp/hbase_restore",
S2_SOURCE_BULKLOAD_HBASE_TABLE_NAMES -> s"${snapshotTableName}",
S2_SOURCE_BULKLOAD_HBASE_TABLE_CF -> columnFamily,
S2_SOURCE_ELEMENT_TYPE -> elementType
) ++ metaAndHBaseArgs
val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)
val source = new S2GraphSource(dumpConf)
val realDF = source.toDF(spark)
realDF.printSchema()
val output = realDF.collect().flatMap(reader.read(s2)(_))
(input, output)
}
test("S2GraphSource edge toDF") {
val column = initTestVertexSchema(s2)
val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
val bulkEdges = Seq(
s"1416236400000\tinsert\tedge\ta\tb\t${label.label}\t{}",
s"1416236400000\tinsert\tedge\ta\tc\t${label.label}\t{}"
)
s2.defaultStorage.truncateTable(s2Config, label.hTableName)
val (_inputEdges, _outputEdges) = runCheck(bulkEdges, Schema.EdgeSchema, "e", "IndexEdge")
val inputEdges = _inputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
val outputEdges = _outputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
inputEdges.foreach { e => println(s"[Input]: $e")}
outputEdges.foreach { e => println(s"[Output]: $e")}
inputEdges shouldBe outputEdges
}
test("S2GraphSource vertex toDF") {
val column = initTestVertexSchema(s2)
val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
val bulkVertices = Seq(
s"1416236400000\tinsert\tvertex\tc\t${column.service.serviceName}\t${column.columnName}\t{}",
s"1416236400000\tinsert\tvertex\td\t${column.service.serviceName}\t${column.columnName}\t{}"
)
val (_inputVertices, _outputVertices) = runCheck(bulkVertices, Schema.VertexSchema, "v", "Vertex")
val inputVertices = _inputVertices.sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
val outputVertices = _outputVertices.sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
inputVertices.foreach { v => println(s"[Input]: $v")}
outputVertices.foreach { v => println(s"[Output]: $v")}
inputVertices shouldBe outputVertices
}
}