| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.s2graph.core |
| |
| import java.util |
| |
| import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException} |
| import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} |
| import org.apache.s2graph.core.mysqls._ |
| import org.apache.s2graph.core.types.HBaseType._ |
| import org.apache.s2graph.core.types._ |
| import org.apache.s2graph.core.JSONParser._ |
| import play.api.libs.json._ |
| |
| import scala.util.Try |
| |
| /** |
| * This is designed to be bridge between rest to s2core. |
| * s2core never use this for finding models. |
| */ |
| object Management { |
| |
| import scala.collection.JavaConversions._ |
| |
| def newProp(name: String, defaultValue: String, datatType: String): Prop = { |
| new Prop(name, defaultValue, datatType) |
| } |
| |
| def newIndex(name: String, propNames: java.util.List[String], options: String): Index = { |
| new Index(name, propNames, options = Option(options)) |
| } |
| |
| object JsonModel { |
| |
| case class Prop(name: String, defaultValue: String, datatType: String) |
| |
| object Prop extends ((String, String, String) => Prop) |
| |
| case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) |
| } |
| |
| import HBaseType._ |
| |
| val LABEL_NAME_MAX_LENGTH = 100 |
| val DefaultCompressionAlgorithm = "gz" |
| |
| def findService(serviceName: String) = { |
| Service.findByName(serviceName, useCache = false) |
| } |
| |
| def deleteService(serviceName: String) = { |
| Service.findByName(serviceName).foreach { service => |
| // service.deleteAll() |
| } |
| } |
| |
| def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try { |
| val targetLabel = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(s"Target label $labelName does not exist.")) |
| if (targetLabel.hTableName == newHTableName) throw new InvalidHTableException(s"New HTable name is already in use for target label.") |
| |
| Label.updateHTableName(targetLabel.label, newHTableName) |
| } |
| |
| |
| def createServiceColumn(serviceName: String, |
| columnName: String, |
| columnType: String, |
| props: Seq[Prop], |
| schemaVersion: String = DEFAULT_VERSION) = { |
| |
| Model withTx { implicit session => |
| val serviceOpt = Service.findByName(serviceName, useCache = false) |
| serviceOpt match { |
| case None => throw new RuntimeException(s"create service $serviceName has not been created.") |
| case Some(service) => |
| val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) |
| for { |
| Prop(propName, defaultValue, dataType) <- props |
| } yield { |
| ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, useCache = false) |
| } |
| } |
| } |
| } |
| |
| def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = { |
| Model withTx { implicit session => |
| val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found")) |
| val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false) |
| val columnNames = serviceColumns.map { serviceColumn => |
| ServiceColumn.delete(serviceColumn.id.get) |
| serviceColumn.columnName |
| } |
| |
| columnNames.getOrElse(throw new RuntimeException("column not found")) |
| } |
| } |
| |
| def findLabel(labelName: String, useCache: Boolean = false): Option[Label] = { |
| Label.findByName(labelName, useCache = useCache) |
| } |
| |
| def deleteLabel(labelName: String) = { |
| Model withTx { implicit session => |
| Label.findByName(labelName, useCache = false).foreach { label => |
| Label.deleteAll(label) |
| } |
| labelName |
| } |
| } |
| |
| def markDeletedLabel(labelName: String) = { |
| Model withTx { implicit session => |
| Label.findByName(labelName, useCache = false).foreach { label => |
| // rename & delete_at column filled with current time |
| Label.markDeleted(label) |
| } |
| labelName |
| } |
| } |
| |
| def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = { |
| Model withTx { implicit session => |
| val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found")) |
| val labelMetaMap = label.metaPropsInvMap |
| |
| indices.foreach { index => |
| val metaSeq = index.propNames.map { name => labelMetaMap(name).seq } |
| LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none", index.direction, index.options) |
| } |
| |
| label |
| } |
| } |
| |
| def addProp(labelStr: String, prop: Prop) = { |
| Model withTx { implicit session => |
| val labelOpt = Label.findByName(labelStr) |
| val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) |
| |
| LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType) |
| } |
| } |
| |
| def addProps(labelStr: String, props: Seq[Prop]) = { |
| Model withTx { implicit session => |
| val labelOpt = Label.findByName(labelStr) |
| val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) |
| |
| props.map { |
| case Prop(propName, defaultValue, dataType) => |
| LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType) |
| } |
| } |
| } |
| |
| def addVertexProp(serviceName: String, |
| columnName: String, |
| propsName: String, |
| propsType: String, |
| schemaVersion: String = DEFAULT_VERSION): ColumnMeta = { |
| val result = for { |
| service <- Service.findByName(serviceName, useCache = false) |
| serviceColumn <- ServiceColumn.find(service.id.get, columnName) |
| } yield { |
| ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType) |
| } |
| result.getOrElse({ |
| throw new RuntimeException(s"add property on vertex failed") |
| }) |
| } |
| |
| def getServiceLabel(label: String): Option[Label] = { |
| Label.findByName(label, useCache = true) |
| } |
| |
| /** |
| * |
| */ |
| |
| def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = { |
| for { |
| labelId <- label.id |
| dir = GraphUtil.toDirection(direction) |
| } yield LabelWithDirection(labelId, dir) |
| } |
| |
| def tryOption[A, R](key: A, f: A => Option[R]) = { |
| f(key) match { |
| case None => throw new GraphExceptions.InternalException(s"$key is not found in DB. create $key first.") |
| case Some(r) => r |
| } |
| } |
| |
| def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = { |
| |
| val props = for { |
| (k, v) <- js.fields |
| meta <- column.metasInvMap.get(k) |
| } yield { |
| val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse( |
| throw new RuntimeException(s"$k is not defined. create schema for vertex.")) |
| |
| (meta.seq.toInt, innerVal) |
| } |
| props |
| |
| } |
| |
| def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(LabelMeta, InnerValLike)] = { |
| val props = for { |
| (k, v) <- js |
| meta <- label.metaPropsInvMap.get(k) |
| innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion) |
| } yield (meta, innerVal) |
| |
| props |
| } |
| |
| /** |
| * update label name. |
| */ |
| def updateLabelName(oldLabelName: String, newLabelName: String) = { |
| Model withTx { implicit session => |
| for { |
| old <- Label.findByName(oldLabelName, useCache = false) |
| } { |
| Label.findByName(newLabelName, useCache = false) match { |
| case None => |
| Label.updateName(oldLabelName, newLabelName) |
| case Some(_) => |
| throw new RuntimeException(s"$newLabelName already exist") |
| } |
| } |
| } |
| } |
| |
| /** |
| * swap label names. |
| */ |
| def swapLabelNames(leftLabel: String, rightLabel: String) = { |
| Model withTx { implicit session => |
| val tempLabel = "_" + leftLabel + "_" |
| Label.updateName(leftLabel, tempLabel) |
| Label.updateName(rightLabel, leftLabel) |
| Label.updateName(tempLabel, rightLabel) |
| } |
| } |
| } |
| |
| class Management(graph: S2Graph) { |
| import Management._ |
| import scala.collection.JavaConversions._ |
| |
| def createStorageTable(zkAddr: String, |
| tableName: String, |
| cfs: List[String], |
| regionMultiplier: Int, |
| ttl: Option[Int], |
| compressionAlgorithm: String = DefaultCompressionAlgorithm, |
| replicationScopeOpt: Option[Int] = None, |
| totalRegionCount: Option[Int] = None): Unit = { |
| graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount) |
| } |
| |
| |
| /** HBase specific code */ |
| def createService(serviceName: String, |
| cluster: String, |
| hTableName: String, |
| preSplitSize: Int, |
| hTableTTL: Int, |
| compressionAlgorithm: String): Service = { |
| createService(serviceName, cluster, hTableName, preSplitSize, |
| Option(hTableTTL).filter(_ > -1), compressionAlgorithm).get |
| } |
| |
| def createService(serviceName: String, |
| cluster: String, hTableName: String, |
| preSplitSize: Int, hTableTTL: Option[Int], |
| compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = { |
| |
| Model withTx { implicit session => |
| val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false) |
| /** create hbase table for service */ |
| graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) |
| service |
| } |
| } |
| |
| def createServiceColumn(serviceName: String, |
| columnName: String, |
| columnType: String, |
| props: java.util.List[Prop], |
| schemaVersion: String = DEFAULT_VERSION): ServiceColumn = { |
| |
| val serviceColumnTry = Model withTx { implicit session => |
| val serviceOpt = Service.findByName(serviceName, useCache = false) |
| serviceOpt match { |
| case None => throw new RuntimeException(s"create service $serviceName has not been created.") |
| case Some(service) => |
| val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) |
| for { |
| Prop(propName, defaultValue, dataType) <- props |
| } yield { |
| ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, useCache = false) |
| } |
| serviceColumn |
| } |
| } |
| |
| serviceColumnTry.get |
| } |
| |
| def createLabel(labelName: String, |
| srcColumn: ServiceColumn, |
| tgtColumn: ServiceColumn, |
| isDirected: Boolean, |
| serviceName: String, |
| indices: java.util.List[Index], |
| props: java.util.List[Prop], |
| consistencyLevel: String, |
| hTableName: String, |
| hTableTTL: Int, |
| schemaVersion: String, |
| compressionAlgorithm: String, |
| options: String): Label = { |
| import scala.collection.JavaConversions._ |
| |
| createLabel(labelName, |
| srcColumn.service.serviceName, srcColumn.columnName, srcColumn.columnType, |
| tgtColumn.service.serviceName, tgtColumn.columnName, tgtColumn.columnType, |
| isDirected, serviceName, indices, props, consistencyLevel, |
| Option(hTableName), Option(hTableTTL).filter(_ > -1), |
| schemaVersion, false, compressionAlgorithm, Option(options) |
| ).get |
| } |
| |
| /** HBase specific code */ |
| def createLabel(label: String, |
| srcServiceName: String, |
| srcColumnName: String, |
| srcColumnType: String, |
| tgtServiceName: String, |
| tgtColumnName: String, |
| tgtColumnType: String, |
| isDirected: Boolean = true, |
| serviceName: String, |
| indices: Seq[Index], |
| props: Seq[Prop], |
| consistencyLevel: String = "weak", |
| hTableName: Option[String] = None, |
| hTableTTL: Option[Int] = None, |
| schemaVersion: String = DEFAULT_VERSION, |
| isAsync: Boolean = false, |
| compressionAlgorithm: String = "gz", |
| options: Option[String] = None): Try[Label] = { |
| |
| if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )") |
| if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also") |
| |
| val labelOpt = Label.findByName(label, useCache = false) |
| Model withTx { implicit session => |
| if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.") |
| |
| /** create all models */ |
| val newLabel = Label.insertAll(label, |
| srcServiceName, srcColumnName, srcColumnType, |
| tgtServiceName, tgtColumnName, tgtColumnType, |
| isDirected, serviceName, indices, props, consistencyLevel, |
| hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options) |
| |
| /** create hbase table */ |
| val storage = graph.getStorage(newLabel) |
| val service = newLabel.service |
| storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm) |
| |
| newLabel |
| } |
| } |
| |
| /** |
| * label |
| */ |
| /** |
| * copy label when if oldLabel exist and newLabel do not exist. |
| * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. |
| */ |
| def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]): Try[Label] = { |
| val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists.")) |
| |
| val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } |
| val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames, index.dir, index.options) } |
| |
| createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, |
| old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, |
| old.isDirected, old.serviceName, |
| allIndices, allProps, |
| old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options) |
| } |
| |
| def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): GlobalIndex = |
| buildGlobalIndex(GlobalIndex.VertexType, name, propNames) |
| |
| def buildGlobalVertexIndex(name: String, propNames: Seq[String]): GlobalIndex = |
| buildGlobalIndex(GlobalIndex.VertexType, name, propNames) |
| |
| def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): GlobalIndex = |
| buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) |
| |
| def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex = |
| buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) |
| |
| def buildGlobalIndex(elementType: String, name: String, propNames: Seq[String]): GlobalIndex = { |
| GlobalIndex.findBy(elementType, name, false) match { |
| case None => |
| GlobalIndex.insert(elementType, name, propNames) |
| GlobalIndex.findBy(elementType, name, false).get |
| case Some(oldIndex) => oldIndex |
| } |
| } |
| |
| def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for { |
| label <- Try(Label.findByName(labelName, useCache = false).get) |
| } yield { |
| val storage = graph.getStorage(label) |
| storage.info |
| } |
| |
| def truncateStorage(labelName: String): Unit = { |
| Try(Label.findByName(labelName, useCache = false)).map { labelOpt => |
| labelOpt.map { label => |
| val storage = graph.getStorage(label) |
| val zkAddr = label.service.cluster |
| storage.truncateTable(zkAddr, label.hbaseTableName) |
| } |
| } |
| } |
| |
| def deleteStorage(labelName: String): Unit = { |
| Try(Label.findByName(labelName, useCache = false)).map { labelOpt => |
| labelOpt.map { label => |
| val storage = graph.getStorage(label) |
| val zkAddr = label.service.cluster |
| storage.deleteTable(zkAddr, label.hbaseTableName) |
| } |
| } |
| } |
| } |
| |