blob: d026e5b4c2c2f66bf9d1f03974a6420acb41ce81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.core
import java.util
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.HBaseType._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
import play.api.libs.json._
import scala.util.Try
/**
* This is designed to be bridge between rest to s2core.
* s2core never use this for finding models.
*/
object Management {
import HBaseType._
import scala.collection.JavaConversions._
val ZookeeperQuorum = "hbase.zookeeper.quorum"
val ColumnFamilies = "hbase.table.column.family"
val RegionMultiplier = "hbase.table.region.multiplier"
val Ttl = "hbase.table.ttl"
val CompressionAlgorithm = "hbase.table.compression.algorithm"
val ReplicationScope = "hbase.table.replication.scope"
val TotalRegionCount = "hbase.table.total.region.count"
val DefaultColumnFamilies = Seq("e", "v")
val DefaultCompressionAlgorithm = "gz"
val LABEL_NAME_MAX_LENGTH = 100
def newProp(name: String, defaultValue: String, datatType: String): Prop = {
new Prop(name, defaultValue, datatType)
}
def newIndex(name: String, propNames: java.util.List[String], options: String): Index = {
new Index(name, propNames, options = Option(options))
}
object JsonModel {
case class Prop(name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)
object Prop extends ((String, String, String, Boolean) => Prop)
case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
}
def findService(serviceName: String) = {
Service.findByName(serviceName, useCache = false)
}
def deleteService(serviceName: String) = {
Service.findByName(serviceName).foreach { service =>
// service.deleteAll()
}
}
def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try {
val targetLabel = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(s"Target label $labelName does not exist."))
if (targetLabel.hTableName == newHTableName) throw new InvalidHTableException(s"New HTable name is already in use for target label.")
Label.updateHTableName(targetLabel.label, newHTableName)
}
def createServiceColumn(serviceName: String,
columnName: String,
columnType: String,
props: Seq[Prop],
schemaVersion: String = DEFAULT_VERSION) = {
Schema withTx { implicit session =>
val serviceOpt = Service.findByName(serviceName, useCache = false)
serviceOpt match {
case None => throw new RuntimeException(s"create service $serviceName has not been created.")
case Some(service) =>
val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false)
for {
Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
} yield {
ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType,
defaultValue,
storeInGlobalIndex = storeInGlobalIndex, useCache = false)
}
}
}
}
def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = {
Schema withTx { implicit session =>
val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found"))
val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false)
val columnNames = serviceColumns.map { serviceColumn =>
ServiceColumn.delete(serviceColumn.id.get)
serviceColumn
}
columnNames.getOrElse(throw new RuntimeException("column not found"))
}
}
def findLabel(labelName: String, useCache: Boolean = false): Option[Label] = {
Label.findByName(labelName, useCache = useCache)
}
def deleteLabel(labelName: String): Try[Label] = {
Schema withTx { implicit session =>
val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName))
Label.deleteAll(label)
label
}
}
def markDeletedLabel(labelName: String) = {
Schema withTx { implicit session =>
Label.findByName(labelName, useCache = false).foreach { label =>
// rename & delete_at column filled with current time
Label.markDeleted(label)
}
labelName
}
}
def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
Schema withTx { implicit session =>
val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
val labelMetaMap = label.metaPropsInvMap
indices.foreach { index =>
val metaSeq = index.propNames.map { name => labelMetaMap(name).seq }
LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none", index.direction, index.options)
}
label
}
}
def addProp(labelStr: String, prop: Prop) = {
Schema withTx { implicit session =>
val labelOpt = Label.findByName(labelStr)
val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.dataType, prop.storeInGlobalIndex)
}
}
def addProps(labelStr: String, props: Seq[Prop]) = {
Schema withTx { implicit session =>
val labelOpt = Label.findByName(labelStr)
val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
props.map {
case Prop(propName, defaultValue, dataType, storeInGlobalIndex) =>
LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType, storeInGlobalIndex)
}
}
}
def addVertexProp(serviceName: String,
columnName: String,
propsName: String,
propsType: String,
defaultValue: String,
storeInGlobalIndex: Boolean = false,
schemaVersion: String = DEFAULT_VERSION): ColumnMeta = {
val result = for {
service <- Service.findByName(serviceName, useCache = false)
serviceColumn <- ServiceColumn.find(service.id.get, columnName)
} yield {
ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, defaultValue, storeInGlobalIndex)
}
result.getOrElse({
throw new RuntimeException(s"add property on vertex failed")
})
}
def getServiceLabel(label: String): Option[Label] = {
Label.findByName(label, useCache = true)
}
/**
*
*/
def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = {
for {
labelId <- label.id
dir = GraphUtil.toDirection(direction)
} yield LabelWithDirection(labelId, dir)
}
def tryOption[A, R](key: A, f: A => Option[R]) = {
f(key) match {
case None => throw new GraphExceptions.InternalException(s"$key is not found in DB. create $key first.")
case Some(r) => r
}
}
def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
val props = for {
(k, v) <- js.fields
meta <- column.metasInvMap.get(k)
} yield {
val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse(
throw new RuntimeException(s"$k is not defined. create schema for vertex."))
(meta.seq.toInt, innerVal)
}
props
}
def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(LabelMeta, InnerValLike)] = {
val props = for {
(k, v) <- js
meta <- label.metaPropsInvMap.get(k)
innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion)
} yield (meta, innerVal)
props
}
/**
* update label name.
*/
def updateLabelName(oldLabelName: String, newLabelName: String) = {
Schema withTx { implicit session =>
for {
old <- Label.findByName(oldLabelName, useCache = false)
} {
Label.findByName(newLabelName, useCache = false) match {
case None =>
Label.updateName(oldLabelName, newLabelName)
case Some(_) =>
throw new RuntimeException(s"$newLabelName already exist")
}
}
}
}
/**
* swap label names.
*/
def swapLabelNames(leftLabel: String, rightLabel: String) = {
Schema withTx { implicit session =>
val tempLabel = "_" + leftLabel + "_"
Label.updateName(leftLabel, tempLabel)
Label.updateName(rightLabel, leftLabel)
Label.updateName(tempLabel, rightLabel)
}
}
def toConfig(params: Map[String, Any]): Config = {
import scala.collection.JavaConversions._
val filtered = params.filter { case (k, v) =>
v match {
case None => false
case _ => true
}
}.map { case (k, v) =>
val newV = v match {
case Some(value) => value
case _ => v
}
k -> newV
}
ConfigFactory.parseMap(filtered)
}
}
class Management(graph: S2GraphLike) {
import Management._
import scala.collection.JavaConversions._
def createStorageTable(zkAddr: String,
tableName: String,
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
compressionAlgorithm: String = DefaultCompressionAlgorithm,
replicationScopeOpt: Option[Int] = None,
totalRegionCount: Option[Int] = None): Unit = {
val config = toConfig(Map(
ZookeeperQuorum -> zkAddr,
// ColumnFamilies -> cfs,
RegionMultiplier -> regionMultiplier,
Ttl -> ttl,
CompressionAlgorithm -> compressionAlgorithm,
TotalRegionCount -> totalRegionCount
))
graph.defaultStorage.createTable(config, tableName)
}
/** HBase specific code */
def createService(serviceName: String,
cluster: String,
hTableName: String,
preSplitSize: Int,
hTableTTL: Int,
compressionAlgorithm: String): Service = {
createService(serviceName, cluster, hTableName, preSplitSize,
Option(hTableTTL).filter(_ > -1), compressionAlgorithm).get
}
def createService(serviceName: String,
cluster: String, hTableName: String,
preSplitSize: Int, hTableTTL: Option[Int],
compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
Schema withTx { implicit session =>
val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
val config = toConfig(Map(
ZookeeperQuorum -> service.cluster,
// ColumnFamilies -> List("e", "v"),
RegionMultiplier -> service.preSplitSize,
Ttl -> service.hTableTTL,
CompressionAlgorithm -> compressionAlgorithm
))
/* create hbase table for service */
graph.getStorage(service).createTable(config, service.hTableName)
service
}
}
// def createServiceColumn(serviceName: String,
// columnName: String,
// columnType: String,
// props: java.util.List[Prop],
// schemaVersion: String = DEFAULT_VERSION): ServiceColumn =
// createServiceColumn(serviceName, columnName, columnType, props.toSeq, schemaVersion)
def createServiceColumn(serviceName: String,
columnName: String,
columnType: String,
props: Seq[Prop],
schemaVersion: String = DEFAULT_VERSION): ServiceColumn = {
val serviceColumnTry = Schema withTx { implicit session =>
val serviceOpt = Service.findByName(serviceName, useCache = false)
serviceOpt match {
case None => throw new RuntimeException(s"create service $serviceName has not been created.")
case Some(service) =>
val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false)
for {
Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
} yield {
ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, defaultValue,
storeInGlobalIndex = storeInGlobalIndex, useCache = false)
}
serviceColumn
}
}
serviceColumnTry.get
}
def createLabel(labelName: String,
srcColumn: ServiceColumn,
tgtColumn: ServiceColumn,
isDirected: Boolean,
serviceName: String,
indices: java.util.List[Index],
props: java.util.List[Prop],
consistencyLevel: String,
hTableName: String,
hTableTTL: Int,
schemaVersion: String,
compressionAlgorithm: String,
options: String): Label = {
import scala.collection.JavaConversions._
createLabel(labelName,
srcColumn.service.serviceName, srcColumn.columnName, srcColumn.columnType,
tgtColumn.service.serviceName, tgtColumn.columnName, tgtColumn.columnType,
isDirected, serviceName, indices, props, consistencyLevel,
Option(hTableName), Option(hTableTTL).filter(_ > -1),
schemaVersion, false, compressionAlgorithm, Option(options)
).get
}
/** HBase specific code */
def createLabel(label: String,
srcServiceName: String,
srcColumnName: String,
srcColumnType: String,
tgtServiceName: String,
tgtColumnName: String,
tgtColumnType: String,
isDirected: Boolean = true,
serviceName: String,
indices: Seq[Index],
props: Seq[Prop],
consistencyLevel: String = "weak",
hTableName: Option[String] = None,
hTableTTL: Option[Int] = None,
schemaVersion: String = DEFAULT_VERSION,
isAsync: Boolean = false,
compressionAlgorithm: String = "gz",
options: Option[String] = None): Try[Label] = {
if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )")
if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
val labelOpt = Label.findByName(label, useCache = false)
Schema withTx { implicit session =>
if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
/* create all models */
val newLabel = Label.insertAll(label,
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
isDirected, serviceName, indices, props, consistencyLevel,
hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
/* create hbase table */
val storage = graph.getStorage(newLabel)
val service = newLabel.service
val config = toConfig(Map(
ZookeeperQuorum -> service.cluster,
// ColumnFamilies -> List("e", "v"),
RegionMultiplier -> service.preSplitSize,
Ttl -> newLabel.hTableTTL,
CompressionAlgorithm -> newLabel.compressionAlgorithm
))
storage.createTable(config, newLabel.hbaseTableName)
newLabel
}
}
/**
* label
*/
/**
* copy label when if oldLabel exist and newLabel do not exist.
* copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
*/
def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]): Try[Label] = {
val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists."))
val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames, index.dir, index.options) }
createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
old.isDirected, old.serviceName,
allIndices, allProps,
old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options)
}
def enableVertexGlobalIndex(columnMeats: Seq[ColumnMeta]): Boolean = {
val successes = columnMeats.map { cm =>
ColumnMeta.updateStoreInGlobalIndex(cm.id.get, cm.storeInGlobalIndex)
}.map(_.isSuccess)
successes.forall(identity)
}
def enableEdgeGlobalIndex(labelMetas: Seq[LabelMeta]): Boolean = {
val successes = labelMetas.map { lm =>
LabelMeta.updateStoreInGlobalIndex(lm.id.get, lm.storeInGlobalIndex)
}.map(_.isSuccess)
successes.forall(identity)
}
// def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): GlobalIndex =
// buildGlobalIndex(GlobalIndex.VertexType, name, propNames)
//
// def buildGlobalVertexIndex(name: String, propNames: Seq[String]): GlobalIndex =
// buildGlobalIndex(GlobalIndex.VertexType, name, propNames)
//
// def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): GlobalIndex =
// buildGlobalIndex(GlobalIndex.EdgeType, name, propNames)
//
// def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex =
// buildGlobalIndex(GlobalIndex.EdgeType, name, propNames)
//
// def buildGlobalIndex(elementType: String, name: String, propNames: Seq[String]): GlobalIndex = {
// GlobalIndex.findBy(elementType, name, false) match {
// case None =>
// GlobalIndex.insert(elementType, name, propNames)
// GlobalIndex.findBy(elementType, name, false).get
// case Some(oldIndex) => oldIndex
// }
// }
def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for {
label <- Try(Label.findByName(labelName, useCache = false).get)
} yield {
val storage = graph.getStorage(label)
storage.info
}
def truncateStorage(labelName: String): Unit = {
Try(Label.findByName(labelName, useCache = false)).map { labelOpt =>
labelOpt.map { label =>
val storage = graph.getStorage(label)
val zkAddr = label.service.cluster
val config = toConfig(Map(ZookeeperQuorum -> zkAddr))
storage.truncateTable(config, label.hbaseTableName)
}
}
}
def deleteStorage(labelName: String): Unit = {
Try(Label.findByName(labelName, useCache = false)).map { labelOpt =>
labelOpt.map { label =>
val storage = graph.getStorage(label)
val zkAddr = label.service.cluster
val config = toConfig(Map(ZookeeperQuorum -> zkAddr))
storage.deleteTable(config, label.hbaseTableName)
}
}
}
}