blob: d30b7a3e9d920f0485821701b360262c7c94dda3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.s2jobs
import java.io.{File, PrintWriter}
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.schema.{Label, Service, ServiceColumn}
import org.apache.s2graph.core.{Management, S2Graph}
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.s2jobs.loader.GraphFileOptions
import org.apache.s2graph.s2jobs.task.TaskConf
import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import scala.util.Try
class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
import org.apache.s2graph.core.S2GraphConfigs._
protected val bulkloadOptions = new TaskConf("bulkloadOptions", "test", options = Map(
DBConfigs.DB_DEFAULT_DRIVER -> "org.h2.Driver",
DBConfigs.DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL",
DBConfigs.DB_DEFAULT_USER -> "sa",
DBConfigs.DB_DEFAULT_PASSWORD -> "sa",
HBaseConfigs.HBASE_ZOOKEEPER_QUORUM -> "localhost",
S2SinkConfigs.S2_SINK_WRITE_METHOD -> "bulk",
S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TABLE_NAME -> "s2graph",
S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_NUM_REGIONS -> "3",
S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_TEMP_DIR -> "/tmp/bulkload_tmp",
S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD -> "true",
S2SinkConfigs.S2_SINK_BULKLOAD_HBASE_COMPRESSION -> "NONE",
S2SinkConfigs.S2_SINK_BULKLOAD_AUTO_EDGE_CREATE -> "false",
S2SinkConfigs.S2_SINK_BULKLOAD_BUILD_DEGREE -> "false",
S2SinkConfigs.S2_SINK_BULKLOAD_LABEL_MAPPING -> ""
))
protected val options: GraphFileOptions = GraphFileOptions(
input = "/tmp/test.txt",
tempDir = "/tmp/bulkload_tmp",
output = "/tmp/s2graph_bulkload",
zkQuorum = "localhost",
dbUrl = "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL",
dbUser = "sa",
dbPassword = "sa",
dbDriver = "org.h2.Driver",
tableName = "s2graph",
maxHFilePerRegionServer = 1,
numRegions = 3,
compressionAlgorithm = "NONE",
buildDegree = false,
autoEdgeCreate = false)
protected val s2Config = Management.toConfig(options.toConfigParams)
protected val tableName = options.tableName
protected val schemaVersion = HBaseType.DEFAULT_VERSION
protected val compressionAlgorithm: String = options.compressionAlgorithm
protected var s2: S2Graph = _
private val testLines = Seq(
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
)
override def beforeAll(): Unit = {
// initialize spark context.
super.beforeAll()
s2 = S2GraphHelper.getS2Graph(s2Config, true)
S2Graph.loadFetchersAndMutators(s2)
deleteRecursively(new File(options.output))
deleteRecursively(new File(options.tempDir))
def createDummyHbaseTable(hTableName: String): Unit = {
Try {
val hTables = Service.findAll().map(_.hTableName).toSet
if (!hTables(hTableName)) {
s2.management.createService(serviceName = s"_dummy_${hTableName}", cluster = "localhost",
hTableName = hTableName, preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz")
}
}
}
Seq(
"s2graph",
"_test_cases",
"test-htable",
"label_with_ttl",
"label_without_ttl",
"label_with_ttl_copied",
"label_without_ttl_copied",
"s2graph_with_ttl-dev",
"s2graph_without_ttl-dev",
"s2graph_label_test_copied"
).foreach(createDummyHbaseTable)
initTestDataFile
}
override def afterAll(): Unit = {
super.afterAll()
if (s2 != null) {
s2.shutdown()
}
}
def initTestDataFile: Unit = {
deleteRecursively(new File(options.input))
writeToFile(options.input)(testLines)
}
def initTestEdgeSchema(s2: S2Graph, tableName: String,
schemaVersion: String = HBaseType.DEFAULT_VERSION,
compressionAlgorithm: String = "none"): Label = {
import scala.collection.JavaConverters._
/* initialize model for test */
val management = s2.management
val service = management.createService(serviceName = "s2graph", cluster = "localhost",
hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz")
val serviceColumn = management.createServiceColumn(service.serviceName, "user", "string", Nil)
Try {
management.createLabel("friends",
serviceColumn.service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
serviceColumn.service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
serviceName = service.serviceName, indices = Nil,
props = Seq(Prop("since", "0", "long"), Prop("score", "0", "integer")),
isDirected = true, consistencyLevel = "strong", hTableName = Option(tableName),
hTableTTL = None, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = None)
}
Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized."))
}
def initTestVertexSchema(s2: S2Graph): ServiceColumn = {
/* initialize model for test */
val management = s2.management
val service = management.createService(serviceName = "device_profile", cluster = "localhost",
hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz")
management.createServiceColumn(service.serviceName, "imei", "string",
Seq(
Prop(name = "first_time", defaultValue = "''", dataType = "string"),
Prop(name = "last_time", defaultValue = "''", dataType = "string"),
Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"),
Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"),
Prop(name = "active_months", defaultValue = "-1", dataType = "integer"),
Prop(name = "fua", defaultValue = "''", dataType = "string"),
Prop(name = "location_often_province", defaultValue = "''", dataType = "string"),
Prop(name = "location_often_city", defaultValue = "''", dataType = "string"),
Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"),
Prop(name = "location_last_province", defaultValue = "''", dataType = "string"),
Prop(name = "location_last_city", defaultValue = "''", dataType = "string"),
Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer")
))
}
def writeToFile(fileName: String)(lines: Seq[String]): Unit = {
val writer = new PrintWriter(fileName)
lines.foreach(line => writer.write(line + "\n"))
writer.close
}
def deleteRecursively(file: File): Unit = {
if (file.isDirectory) file.listFiles.foreach(deleteRecursively)
if (file.exists && !file.delete) throw new Exception(s"Unable to delete ${file.getAbsolutePath}")
}
}