| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.atlas.query |
| |
| import java.util |
| import java.util.Date |
| |
| import com.thinkaurelius.titan.core.TitanVertex |
| import com.tinkerpop.blueprints.{Vertex, Direction} |
| import org.apache.atlas.AtlasException |
| import org.apache.atlas.query.Expressions.{ComparisonExpression, ExpressionException} |
| import org.apache.atlas.query.TypeUtils.FieldInfo |
| import org.apache.atlas.repository.graph.{GraphHelper, GraphBackedMetadataRepository} |
| import org.apache.atlas.typesystem.persistence.Id |
| import org.apache.atlas.typesystem.types.DataTypes._ |
| import org.apache.atlas.typesystem.types._ |
| import org.apache.atlas.typesystem.{ITypedInstance, ITypedReferenceableInstance} |
| |
| import scala.collection.JavaConversions._ |
| import scala.collection.mutable |
| import scala.collection.mutable.ArrayBuffer |
| |
| /** |
| * Represents the Bridge between the QueryProcessor and the Graph Persistence scheme used. |
| * Some of the behaviors captured are: |
| * - how is type and id information stored in the Vertex that represents an [[ITypedReferenceableInstance]] |
| * - how are edges representing trait and attribute relationships labelled. |
| * - how are attribute names mapped to Property Keys in Vertices. |
| * |
| * This is a work in progress. |
| */ |
| trait GraphPersistenceStrategies { |
| /** |
| * Name of attribute used to store typeName in vertex |
| */ |
| def typeAttributeName: String |
| |
| /** |
| * Name of attribute used to store super type names in vertex. |
| */ |
| def superTypeAttributeName: String |
| |
| /** |
| * Name of attribute used to store guid in vertex |
| */ |
| def idAttributeName : String |
| |
| /** |
| * Name of attribute used to store state in vertex |
| */ |
| def stateAttributeName : String |
| |
| /** |
| * Given a dataType and a reference attribute, how is edge labeled |
| */ |
| def edgeLabel(iDataType: IDataType[_], aInfo: AttributeInfo): String |
| |
| def traitLabel(cls: IDataType[_], traitName: String): String |
| |
| def instanceToTraitEdgeDirection : String = "out" |
| def traitToInstanceEdgeDirection = instanceToTraitEdgeDirection match { |
| case "out" => "in" |
| case "in" => "out" |
| case x => x |
| } |
| |
| /** |
| * The propertyKey used to store the attribute in a Graph Vertex. |
| * @param dataType |
| * @param aInfo |
| * @return |
| */ |
| def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo): String |
| |
| /** |
| * from a vertex for an [[ITypedReferenceableInstance]] get the traits that it has. |
| * @param v |
| * @return |
| */ |
| def traitNames(v: TitanVertex): java.util.List[String] |
| |
| def edgeLabel(fInfo: FieldInfo): String = fInfo match { |
| case FieldInfo(dataType, aInfo, null, null) => edgeLabel(dataType, aInfo) |
| case FieldInfo(dataType, aInfo, reverseDataType, null) => edgeLabel(reverseDataType, aInfo) |
| case FieldInfo(dataType, null, null, traitName) => traitLabel(dataType, traitName) |
| } |
| |
| def fieldPrefixInSelect: String |
| |
| /** |
| * extract the Id from a Vertex. |
| * @param dataTypeNm the dataType of the instance that the given vertex represents |
| * @param v |
| * @return |
| */ |
| def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id |
| |
| def constructInstance[U](dataType: IDataType[U], v: java.lang.Object): U |
| |
| def gremlinCompOp(op: ComparisonExpression) = op.symbol match { |
| case "=" => "T.eq" |
| case "!=" => "T.neq" |
| case ">" => "T.gt" |
| case ">=" => "T.gte" |
| case "<" => "T.lt" |
| case "<=" => "T.lte" |
| case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin") |
| } |
| |
| def loopObjectExpression(dataType: IDataType[_]) = { |
| _typeTestExpression(dataType.getName, "it.object") |
| } |
| |
| def addGraphVertexPrefix(preStatements : Traversable[String]) = !collectTypeInstancesIntoVar |
| |
| /** |
| * Controls behavior of how instances of a Type are discovered. |
| * - query is generated in a way that indexes are exercised using a local set variable across multiple lookups |
| * - query is generated using an 'or' expression. |
| * |
| * '''This is a very bad idea: controlling query execution behavior via query generation.''' But our current |
| * knowledge of seems to indicate we have no choice. See |
| * [[https://groups.google.com/forum/#!topic/gremlin-users/n1oV86yr4yU discussion in Gremlin group]]. |
| * Also this seems a fragile solution, dependend on the memory requirements of the Set variable. |
| * For now enabling via the '''collectTypeInstancesIntoVar''' behavior setting. Reverting back would require |
| * setting this to false. |
| * |
| * Long term have to get to the bottom of Gremlin: |
| * - there doesn't seem to be way to see the physical query plan. Maybe we should directly interface with Titan. |
| * - At least from querying perspective a columnar db maybe a better route. Daniel Abadi did some good work |
| * on showing how to use a columnar store as a Graph Db. |
| * |
| * |
| * @return |
| */ |
| def collectTypeInstancesIntoVar = true |
| |
| def typeTestExpression(typeName : String, intSeq : IntSequence) : Seq[String] = { |
| if (collectTypeInstancesIntoVar) |
| typeTestExpressionMultiStep(typeName, intSeq) |
| else |
| typeTestExpressionUsingFilter(typeName) |
| } |
| |
| private def typeTestExpressionUsingFilter(typeName : String) : Seq[String] = { |
| Seq(s"""filter${_typeTestExpression(typeName, "it")}""") |
| } |
| |
| private def _typeTestExpression(typeName: String, itRef: String): String = { |
| s"""{(${itRef}.'${typeAttributeName}' == '${typeName}') | |
| |(${itRef}.'${superTypeAttributeName}' ? |
| |${itRef}.'${superTypeAttributeName}'.contains('${typeName}') : false)}""". |
| stripMargin.replace(System.getProperty("line.separator"), "") |
| } |
| |
| private def typeTestExpressionMultiStep(typeName : String, intSeq : IntSequence) : Seq[String] = { |
| |
| val varName = s"_var_${intSeq.next}" |
| Seq( |
| newSetVar(varName), |
| fillVarWithTypeInstances(typeName, varName), |
| fillVarWithSubTypeInstances(typeName, varName), |
| s"$varName._()" |
| ) |
| } |
| |
| private def newSetVar(varName : String) = s"$varName = [] as Set" |
| |
| private def fillVarWithTypeInstances(typeName : String, fillVar : String) = { |
| s"""g.V().has("${typeAttributeName}", "${typeName}").fill($fillVar)""" |
| } |
| |
| private def fillVarWithSubTypeInstances(typeName : String, fillVar : String) = { |
| s"""g.V().has("${superTypeAttributeName}", "${typeName}").fill($fillVar)""" |
| } |
| } |
| |
| object GraphPersistenceStrategy1 extends GraphPersistenceStrategies { |
| val typeAttributeName = "typeName" |
| val superTypeAttributeName = "superTypeNames" |
| val idAttributeName = "guid" |
| val stateAttributeName = "state" |
| |
| def edgeLabel(dataType: IDataType[_], aInfo: AttributeInfo) = s"__${dataType.getName}.${aInfo.name}" |
| |
| def edgeLabel(propertyName: String) = s"__${propertyName}" |
| |
| val fieldPrefixInSelect = "it" |
| |
| def traitLabel(cls: IDataType[_], traitName: String) = s"${cls.getName}.$traitName" |
| |
| def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo) = GraphHelper.getQualifiedFieldName(dataType, aInfo.name) |
| |
| def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id = |
| new Id(v.getId.toString, 0, dataTypeNm) |
| |
| def traitNames(v: TitanVertex): java.util.List[String] = { |
| val s = v.getProperty[String]("traitNames") |
| if (s != null) { |
| Seq[String](s.split(","): _*) |
| } else { |
| Seq() |
| } |
| } |
| |
| def constructInstance[U](dataType: IDataType[U], v: AnyRef): U = { |
| dataType.getTypeCategory match { |
| case DataTypes.TypeCategory.PRIMITIVE => dataType.convert(v, Multiplicity.OPTIONAL) |
| case DataTypes.TypeCategory.ARRAY => |
| dataType.convert(v, Multiplicity.OPTIONAL) |
| case DataTypes.TypeCategory.STRUCT |
| if dataType.getName == TypeSystem.getInstance().getIdType.getName => { |
| val sType = dataType.asInstanceOf[StructType] |
| val sInstance = sType.createInstance() |
| val tV = v.asInstanceOf[TitanVertex] |
| sInstance.set(TypeSystem.getInstance().getIdType.typeNameAttrName, |
| tV.getProperty[java.lang.String](typeAttributeName)) |
| sInstance.set(TypeSystem.getInstance().getIdType.idAttrName, |
| tV.getProperty[java.lang.String](idAttributeName)) |
| dataType.convert(sInstance, Multiplicity.OPTIONAL) |
| } |
| case DataTypes.TypeCategory.STRUCT => { |
| val sType = dataType.asInstanceOf[StructType] |
| val sInstance = sType.createInstance() |
| loadStructInstance(sType, sInstance, v.asInstanceOf[TitanVertex]) |
| dataType.convert(sInstance, Multiplicity.OPTIONAL) |
| } |
| case DataTypes.TypeCategory.TRAIT => { |
| val tType = dataType.asInstanceOf[TraitType] |
| val tInstance = tType.createInstance() |
| /* |
| * this is not right, we should load the Instance associated with this trait. |
| * for now just loading the trait struct. |
| */ |
| loadStructInstance(tType, tInstance, v.asInstanceOf[TitanVertex]) |
| dataType.convert(tInstance, Multiplicity.OPTIONAL) |
| } |
| case DataTypes.TypeCategory.CLASS => { |
| val cType = dataType.asInstanceOf[ClassType] |
| val cInstance = constructClassInstance(dataType.asInstanceOf[ClassType], v.asInstanceOf[TitanVertex]) |
| dataType.convert(cInstance, Multiplicity.OPTIONAL) |
| } |
| case DataTypes.TypeCategory.ENUM => dataType.convert(v, Multiplicity.OPTIONAL) |
| case x => throw new UnsupportedOperationException(s"load for ${dataType} not supported") |
| } |
| } |
| |
| def loadStructInstance(dataType: IConstructableType[_, _ <: ITypedInstance], |
| typInstance: ITypedInstance, v: TitanVertex): Unit = { |
| import scala.collection.JavaConversions._ |
| dataType.fieldMapping().fields.foreach { t => |
| val fName = t._1 |
| val aInfo = t._2 |
| loadAttribute(dataType, aInfo, typInstance, v) |
| } |
| } |
| |
| def constructClassInstance(dataType: ClassType, v: TitanVertex): ITypedReferenceableInstance = { |
| val id = getIdFromVertex(dataType.name, v) |
| val tNms = traitNames(v) |
| val cInstance = dataType.createInstance(id, tNms: _*) |
| // load traits |
| tNms.foreach { tNm => |
| val tLabel = traitLabel(dataType, tNm) |
| val edges = v.getEdges(Direction.OUT, tLabel) |
| val tVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex] |
| val tType = TypeSystem.getInstance().getDataType[TraitType](classOf[TraitType], tNm) |
| val tInstance = cInstance.getTrait(tNm).asInstanceOf[ITypedInstance] |
| loadStructInstance(tType, tInstance, tVertex) |
| } |
| loadStructInstance(dataType, cInstance, v) |
| cInstance |
| } |
| |
| def loadAttribute(dataType: IDataType[_], aInfo: AttributeInfo, i: ITypedInstance, v: TitanVertex): Unit = { |
| aInfo.dataType.getTypeCategory match { |
| case DataTypes.TypeCategory.PRIMITIVE => loadPrimitiveAttribute(dataType, aInfo, i, v) |
| case DataTypes.TypeCategory.ENUM => loadEnumAttribute(dataType, aInfo, i, v) |
| case DataTypes.TypeCategory.ARRAY => |
| loadArrayAttribute(dataType, aInfo, i, v) |
| case DataTypes.TypeCategory.MAP => |
| throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") |
| case DataTypes.TypeCategory.STRUCT => loadStructAttribute(dataType, aInfo, i, v) |
| case DataTypes.TypeCategory.TRAIT => |
| throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") |
| case DataTypes.TypeCategory.CLASS => loadStructAttribute(dataType, aInfo, i, v) |
| } |
| } |
| |
| private def loadEnumAttribute(dataType: IDataType[_], aInfo: AttributeInfo, i: ITypedInstance, v: TitanVertex) |
| : Unit = { |
| val fName = fieldNameInVertex(dataType, aInfo) |
| i.setInt(aInfo.name, v.getProperty[java.lang.Integer](fName)) |
| } |
| |
| private def loadPrimitiveAttribute(dataType: IDataType[_], aInfo: AttributeInfo, |
| i: ITypedInstance, v: TitanVertex): Unit = { |
| val fName = fieldNameInVertex(dataType, aInfo) |
| aInfo.dataType() match { |
| case x: BooleanType => i.setBoolean(aInfo.name, v.getProperty[java.lang.Boolean](fName)) |
| case x: ByteType => i.setByte(aInfo.name, v.getProperty[java.lang.Byte](fName)) |
| case x: ShortType => i.setShort(aInfo.name, v.getProperty[java.lang.Short](fName)) |
| case x: IntType => i.setInt(aInfo.name, v.getProperty[java.lang.Integer](fName)) |
| case x: LongType => i.setLong(aInfo.name, v.getProperty[java.lang.Long](fName)) |
| case x: FloatType => i.setFloat(aInfo.name, v.getProperty[java.lang.Float](fName)) |
| case x: DoubleType => i.setDouble(aInfo.name, v.getProperty[java.lang.Double](fName)) |
| case x: StringType => i.setString(aInfo.name, v.getProperty[java.lang.String](fName)) |
| case x: DateType => { |
| val dateVal = v.getProperty[java.lang.Long](fName) |
| i.setDate(aInfo.name, new Date(dateVal)) |
| } |
| case _ => throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") |
| } |
| } |
| |
| |
| private def loadArrayAttribute[T](dataType: IDataType[_], aInfo: AttributeInfo, |
| i: ITypedInstance, v: TitanVertex): Unit = { |
| import scala.collection.JavaConversions._ |
| val list: java.util.List[_] = v.getProperty(aInfo.name) |
| val arrayType: DataTypes.ArrayType = aInfo.dataType.asInstanceOf[ArrayType] |
| |
| var values = new util.ArrayList[Any] |
| list.foreach( listElement => |
| values += mapVertexToCollectionEntry(v, aInfo, arrayType.getElemType, i, listElement) |
| ) |
| i.set(aInfo.name, values) |
| } |
| |
| private def loadStructAttribute(dataType: IDataType[_], aInfo: AttributeInfo, |
| i: ITypedInstance, v: TitanVertex, edgeLbl: Option[String] = None): Unit = { |
| val eLabel = edgeLbl match { |
| case Some(x) => x |
| case None => edgeLabel(FieldInfo(dataType, aInfo, null)) |
| } |
| val edges = v.getEdges(Direction.OUT, eLabel) |
| val sVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex] |
| if (aInfo.dataType().getTypeCategory == DataTypes.TypeCategory.STRUCT) { |
| val sType = aInfo.dataType().asInstanceOf[StructType] |
| val sInstance = sType.createInstance() |
| loadStructInstance(sType, sInstance, sVertex) |
| i.set(aInfo.name, sInstance) |
| } else { |
| val cInstance = constructClassInstance(aInfo.dataType().asInstanceOf[ClassType], sVertex) |
| i.set(aInfo.name, cInstance) |
| } |
| } |
| |
| |
| |
| private def mapVertexToCollectionEntry(instanceVertex: TitanVertex, attributeInfo: AttributeInfo, elementType: IDataType[_], i: ITypedInstance, value: Any): Any = { |
| elementType.getTypeCategory match { |
| case DataTypes.TypeCategory.PRIMITIVE => value |
| case DataTypes.TypeCategory.ENUM => value |
| case DataTypes.TypeCategory.STRUCT => |
| throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported") |
| case DataTypes.TypeCategory.TRAIT => |
| throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported") |
| case DataTypes.TypeCategory.CLASS => //loadStructAttribute(elementType, attributeInfo, i, v) |
| throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported") |
| case _ => |
| throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported") |
| } |
| } |
| } |
| |