blob: 21ce920daf296b97168da4dc0be2abf571e4697c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.loader.subscriber
import java.io.PrintWriter
import java.util
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.util.ToolRunner
import org.apache.s2graph.core.{Management, PostProcess}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.schema.{Label, ServiceColumn}
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.loader.subscriber.TransferToHFile.options
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import play.api.libs.json.Json
import scala.util.Try
class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll {
import TransferToHFile._
import scala.collection.JavaConverters._
private val master = "local[2]"
private val appName = "example-spark"
private var sc: SparkContext = _
/* TransferHFile parameters */
val options = GraphFileOptions(
input = "/tmp/test.txt",
tmpPath = "/tmp/s2graph",
zkQuorum = "localhost",
dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
dbUser = "sa",
dbPassword = "sa",
tableName = "s2graph",
maxHFilePerRegionServer = 1,
numRegions = 3,
compressionAlgorithm = "NONE",
buildDegree = false,
autoEdgeCreate = false)
val s2Config = Management.toConfig(options.toConfigParams)
val tableName = options.tableName
val schemaVersion = HBaseType.DEFAULT_VERSION
val compressionAlgorithm: String = options.compressionAlgorithm
override def beforeAll(): Unit = {
// initialize spark context.
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
sc = new SparkContext(conf)
GraphSubscriberHelper.apply(s2Config)
}
override def afterAll(): Unit = {
GraphSubscriberHelper.g.shutdown()
if (sc != null) {
sc.stop()
}
}
private def writeToFile(fileName: String)(lines: Seq[String]): Unit = {
val writer = new PrintWriter(fileName)
lines.foreach(line => writer.write(line + "\n"))
writer.close
}
private def initTestEdgeSchema(): Label = {
import scala.collection.JavaConverters._
/* initialize model for test */
val management = GraphSubscriberHelper.management
val service = management.createService(serviceName = "s2graph", cluster = "localhost",
hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz")
val serviceColumn = management.createServiceColumn(service.serviceName, "user", "string", Nil)
Try {
management.createLabel("friends", serviceColumn, serviceColumn, isDirected = true,
serviceName = service.serviceName, indices = new java.util.ArrayList[Index],
props = Seq(Prop("since", "0", "long"), Prop("score", "0", "integer")).asJava, consistencyLevel = "strong", hTableName = tableName,
hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = "")
}
Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized."))
}
private def initTestVertexSchema(): ServiceColumn = {
import scala.collection.JavaConverters._
/* initialize model for test */
val management = GraphSubscriberHelper.management
val service = management.createService(serviceName = "s2graph", cluster = "localhost",
hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz")
management.createServiceColumn(service.serviceName, "imei", "string",
Seq(
Prop(name = "first_time", defaultValue = "''", dataType = "string"),
Prop(name = "last_time", defaultValue = "''", dataType = "string"),
Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"),
Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"),
Prop(name = "active_months", defaultValue = "-1", dataType = "integer"),
Prop(name = "fua", defaultValue = "''", dataType = "string"),
Prop(name = "location_often_province", defaultValue = "''", dataType = "string"),
Prop(name = "location_often_city", defaultValue = "''", dataType = "string"),
Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"),
Prop(name = "location_last_province", defaultValue = "''", dataType = "string"),
Prop(name = "location_last_city", defaultValue = "''", dataType = "string"),
Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer")
))
}
test("test generateKeyValues edge only.") {
import scala.collection.JavaConverters._
import org.apache.s2graph.core.storage.CanSKeyValue._
val label = initTestEdgeSchema()
/* end of initialize model */
val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
val input = sc.parallelize(Seq(bulkEdgeString))
val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
val serDe = GraphSubscriberHelper.g.defaultStorage.serDe
// val snapshotEdgeOpt = serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.head), None)
// val indexEdgeOpt = serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.last), None)
val bulkEdge = GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get
val indexEdges = ls.flatMap { kv =>
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
}
val indexEdge = indexEdges.head
bulkEdge shouldBe(indexEdge)
}
test("test generateKeyValues vertex only.") {
val serviceColumn = initTestVertexSchema()
val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
val bulkVertex = GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get
val input = sc.parallelize(Seq(bulkVertexString))
val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
val serDe = GraphSubscriberHelper.g.defaultStorage.serDe
val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get
PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
println(Json.prettyPrint(jsValue))
}
bulkVertex shouldBe(vertex)
}
test("test generateHFile vertex only.") {
val serviceColumn = initTestVertexSchema()
val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
val input = sc.parallelize(Seq(bulkVertexString))
val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
TransferToHFile.generateHFile(sc, s2Config, kvs, options)
}
test("test loader script.") {
val serviceColumn = initTestVertexSchema()
val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
val bulkVertexLs = Seq(bulkVertexString)
writeToFile(options.input)(bulkVertexLs)
val input = sc.parallelize(bulkVertexLs)
GraphSubscriberHelper.apply(s2Config)
val graph = GraphSubscriberHelper.g
val vertex = graph.elementBuilder.toVertex(bulkVertexString).get
val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
TransferToHFile.generateHFile(sc, s2Config, kvs, options)
val hfileArgs = Array(options.tmpPath, options.tableName)
val hbaseConfig = HBaseConfiguration.create()
val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
val vertexId = graph.elementBuilder.newVertexId("s2graph")("imei")("800188448586078")
val vertexOpt = graph.getVertex(vertexId)
vertexOpt.isDefined shouldBe(true)
vertexOpt.get shouldBe (vertex)
}
}