package org.apache.atlas.query
import java.util
import java.util.Date
import com.thinkaurelius.titan.core.TitanVertex
import com.tinkerpop.blueprints.{Vertex, Direction}
import org.apache.atlas.AtlasException
import org.apache.atlas.query.Expressions.{ComparisonExpression, ExpressionException}
import org.apache.atlas.query.TypeUtils.FieldInfo
import org.apache.atlas.repository.graph.{GraphHelper, GraphBackedMetadataRepository}
import org.apache.atlas.typesystem.persistence.Id
import org.apache.atlas.typesystem.types.DataTypes._
import org.apache.atlas.typesystem.types._
import org.apache.atlas.typesystem.types.cache.TypeCache
import org.apache.atlas.typesystem.{ITypedInstance, ITypedReferenceableInstance}
import org.elasticsearch.common.collect.ImmutableList
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
* Represents the Bridge between the QueryProcessor and the Graph Persistence scheme used.
* Some of the behaviors captured are:
* - how is type and id information stored in the Vertex that represents an [[ITypedReferenceableInstance]]
* - how are edges representing trait and attribute relationships labelled.
* - how are attribute names mapped to Property Keys in Vertices.
* This is a work in progress.
trait GraphPersistenceStrategies {
* Name of attribute used to store typeName in vertex
def typeAttributeName: String
* Name of attribute used to store super type names in vertex.
def superTypeAttributeName: String
* Name of attribute used to store guid in vertex
def idAttributeName : String
* Name of attribute used to store state in vertex
def stateAttributeName : String
* Given a dataType and a reference attribute, how is edge labeled
def edgeLabel(iDataType: IDataType[_], aInfo: AttributeInfo): String
def traitLabel(cls: IDataType[_], traitName: String): String
def instanceToTraitEdgeDirection : String = "out"
def traitToInstanceEdgeDirection = instanceToTraitEdgeDirection match {
case "out" => "in"
case "in" => "out"
case x => x
* The propertyKey used to store the attribute in a Graph Vertex.
* @param dataType
* @param aInfo
* @return
def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo): String
* from a vertex for an [[ITypedReferenceableInstance]] get the traits that it has.
* @param v
* @return
def traitNames(v: TitanVertex): java.util.List[String]
def edgeLabel(fInfo: FieldInfo): String = fInfo match {
case FieldInfo(dataType, aInfo, null, null) => edgeLabel(dataType, aInfo)
case FieldInfo(dataType, aInfo, reverseDataType, null) => edgeLabel(reverseDataType, aInfo)
case FieldInfo(dataType, null, null, traitName) => traitLabel(dataType, traitName)
def fieldPrefixInSelect: String
* extract the Id from a Vertex.
* @param dataTypeNm the dataType of the instance that the given vertex represents
* @param v
* @return
def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id
def constructInstance[U](dataType: IDataType[U], v: java.lang.Object): U
def gremlinCompOp(op: ComparisonExpression) = op.symbol match {
case "=" => "T.eq"
case "!=" => "T.neq"
case ">" => ""
case ">=" => "T.gte"
case "<" => ""
case "<=" => "T.lte"
case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin")
def loopObjectExpression(dataType: IDataType[_]) = {
_typeTestExpression(dataType.getName, "it.object")
def addGraphVertexPrefix(preStatements : Traversable[String]) = !collectTypeInstancesIntoVar
* Controls behavior of how instances of a Type are discovered.
* - query is generated in a way that indexes are exercised using a local set variable across multiple lookups
* - query is generated using an 'or' expression.
* '''This is a very bad idea: controlling query execution behavior via query generation.''' But our current
* knowledge of seems to indicate we have no choice. See
* [[!topic/gremlin-users/n1oV86yr4yU discussion in Gremlin group]].
* Also this seems a fragile solution, dependend on the memory requirements of the Set variable.
* For now enabling via the '''collectTypeInstancesIntoVar''' behavior setting. Reverting back would require
* setting this to false.
* Long term have to get to the bottom of Gremlin:
* - there doesn't seem to be way to see the physical query plan. Maybe we should directly interface with Titan.
* - At least from querying perspective a columnar db maybe a better route. Daniel Abadi did some good work
* on showing how to use a columnar store as a Graph Db.
* @return
def collectTypeInstancesIntoVar = false
def filterBySubTypes = true
def typeTestExpression(typeName : String, intSeq : IntSequence) : Seq[String] = {
if (filterBySubTypes)
else if (collectTypeInstancesIntoVar)
typeTestExpressionMultiStep(typeName, intSeq)
private def typeTestExpressionUsingFilter(typeName : String) : Seq[String] = {
Seq(s"""filter${_typeTestExpression(typeName, "it")}""")
private def _typeTestExpression(typeName: String, itRef: String): String = {
s"""{(${itRef}.'${typeAttributeName}' == '${typeName}') |
|(${itRef}.'${superTypeAttributeName}' ?
|${itRef}.'${superTypeAttributeName}'.contains('${typeName}') : false)}""".
stripMargin.replace(System.getProperty("line.separator"), "")
private def typeTestExpressionMultiStep(typeName : String, intSeq : IntSequence) : Seq[String] = {
val varName = s"_var_${}"
fillVarWithTypeInstances(typeName, varName),
fillVarWithSubTypeInstances(typeName, varName),
private def typeTestExpressionUsingInFilter(typeName: String) = {
val filters = collection.mutable.Map[TypeCache.TYPE_FILTER, String]();
filters put (TypeCache.TYPE_FILTER.SUPERTYPE, typeName)
val subTypes :[String] = TypeSystem.getInstance().getTypeNames(filters)
val typeNames = new util.ArrayList[String]()
if ( !subTypes.isEmpty )
Seq(s"""has("${typeAttributeName}",, ${typeNames.mkString("['", "','", "']")})""")
private def newSetVar(varName : String) = s"$varName = [] as Set"
private def fillVarWithTypeInstances(typeName : String, fillVar : String) = {
s"""g.V().has("${typeAttributeName}", "${typeName}").fill($fillVar)"""
private def fillVarWithSubTypeInstances(typeName : String, fillVar : String) = {
s"""g.V().has("${superTypeAttributeName}", "${typeName}").fill($fillVar)"""
object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
val typeAttributeName = "typeName"
val superTypeAttributeName = "superTypeNames"
val idAttributeName = "guid"
val stateAttributeName = "state"
def edgeLabel(dataType: IDataType[_], aInfo: AttributeInfo) = s"__${dataType.getName}.${}"
def edgeLabel(propertyName: String) = s"__${propertyName}"
val fieldPrefixInSelect = "it"
def traitLabel(cls: IDataType[_], traitName: String) = s"$traitName"
def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo) = GraphHelper.getQualifiedFieldName(dataType,
def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id =
new Id(v.getId.toString, 0, dataTypeNm)
def traitNames(v: TitanVertex): java.util.List[String] = {
val s = v.getProperty[String]("traitNames")
if (s != null) {
Seq[String](s.split(","): _*)
} else {
def constructInstance[U](dataType: IDataType[U], v: AnyRef): U = {
dataType.getTypeCategory match {
case DataTypes.TypeCategory.PRIMITIVE => dataType.convert(v, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.ARRAY =>
dataType.convert(v, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.STRUCT
if dataType.getName == TypeSystem.getInstance().getIdType.getName => {
val sType = dataType.asInstanceOf[StructType]
val sInstance = sType.createInstance()
val tV = v.asInstanceOf[TitanVertex]
dataType.convert(sInstance, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.STRUCT => {
val sType = dataType.asInstanceOf[StructType]
val sInstance = sType.createInstance()
loadStructInstance(sType, sInstance, v.asInstanceOf[TitanVertex])
dataType.convert(sInstance, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.TRAIT => {
val tType = dataType.asInstanceOf[TraitType]
val tInstance = tType.createInstance()
* this is not right, we should load the Instance associated with this trait.
* for now just loading the trait struct.
loadStructInstance(tType, tInstance, v.asInstanceOf[TitanVertex])
dataType.convert(tInstance, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.CLASS => {
val cType = dataType.asInstanceOf[ClassType]
val cInstance = constructClassInstance(dataType.asInstanceOf[ClassType], v.asInstanceOf[TitanVertex])
dataType.convert(cInstance, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.ENUM => dataType.convert(v, Multiplicity.OPTIONAL)
case x => throw new UnsupportedOperationException(s"load for ${dataType} not supported")
def loadStructInstance(dataType: IConstructableType[_, _ <: ITypedInstance],
typInstance: ITypedInstance, v: TitanVertex): Unit = {
import scala.collection.JavaConversions._
dataType.fieldMapping().fields.foreach { t =>
val fName = t._1
val aInfo = t._2
loadAttribute(dataType, aInfo, typInstance, v)
def constructClassInstance(dataType: ClassType, v: TitanVertex): ITypedReferenceableInstance = {
val id = getIdFromVertex(, v)
val tNms = traitNames(v)
val cInstance = dataType.createInstance(id, tNms: _*)
// load traits
tNms.foreach { tNm =>
val tLabel = traitLabel(dataType, tNm)
val edges = v.getEdges(Direction.OUT, tLabel)
val tVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex]
val tType = TypeSystem.getInstance().getDataType[TraitType](classOf[TraitType], tNm)
val tInstance = cInstance.getTrait(tNm).asInstanceOf[ITypedInstance]
loadStructInstance(tType, tInstance, tVertex)
loadStructInstance(dataType, cInstance, v)
def loadAttribute(dataType: IDataType[_], aInfo: AttributeInfo, i: ITypedInstance, v: TitanVertex): Unit = {
aInfo.dataType.getTypeCategory match {
case DataTypes.TypeCategory.PRIMITIVE => loadPrimitiveAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.ENUM => loadEnumAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.ARRAY =>
loadArrayAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.MAP =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
case DataTypes.TypeCategory.STRUCT => loadStructAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.TRAIT =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
case DataTypes.TypeCategory.CLASS => loadStructAttribute(dataType, aInfo, i, v)
private def loadEnumAttribute(dataType: IDataType[_], aInfo: AttributeInfo, i: ITypedInstance, v: TitanVertex)
: Unit = {
val fName = fieldNameInVertex(dataType, aInfo)
i.setInt(, v.getProperty[java.lang.Integer](fName))
private def loadPrimitiveAttribute(dataType: IDataType[_], aInfo: AttributeInfo,
i: ITypedInstance, v: TitanVertex): Unit = {
val fName = fieldNameInVertex(dataType, aInfo)
aInfo.dataType() match {
case x: BooleanType => i.setBoolean(, v.getProperty[java.lang.Boolean](fName))
case x: ByteType => i.setByte(, v.getProperty[java.lang.Byte](fName))
case x: ShortType => i.setShort(, v.getProperty[java.lang.Short](fName))
case x: IntType => i.setInt(, v.getProperty[java.lang.Integer](fName))
case x: LongType => i.setLong(, v.getProperty[java.lang.Long](fName))
case x: FloatType => i.setFloat(, v.getProperty[java.lang.Float](fName))
case x: DoubleType => i.setDouble(, v.getProperty[java.lang.Double](fName))
case x: StringType => i.setString(, v.getProperty[java.lang.String](fName))
case x: DateType => {
val dateVal = v.getProperty[java.lang.Long](fName)
i.setDate(, new Date(dateVal))
case _ => throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
private def loadArrayAttribute[T](dataType: IDataType[_], aInfo: AttributeInfo,
i: ITypedInstance, v: TitanVertex): Unit = {
import scala.collection.JavaConversions._
val list: java.util.List[_] = v.getProperty(
val arrayType: DataTypes.ArrayType = aInfo.dataType.asInstanceOf[ArrayType]
var values = new util.ArrayList[Any]
list.foreach( listElement =>
values += mapVertexToCollectionEntry(v, aInfo, arrayType.getElemType, i, listElement)
i.set(, values)
private def loadStructAttribute(dataType: IDataType[_], aInfo: AttributeInfo,
i: ITypedInstance, v: TitanVertex, edgeLbl: Option[String] = None): Unit = {
val eLabel = edgeLbl match {
case Some(x) => x
case None => edgeLabel(FieldInfo(dataType, aInfo, null))
val edges = v.getEdges(Direction.OUT, eLabel)
val sVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex]
if (aInfo.dataType().getTypeCategory == DataTypes.TypeCategory.STRUCT) {
val sType = aInfo.dataType().asInstanceOf[StructType]
val sInstance = sType.createInstance()
loadStructInstance(sType, sInstance, sVertex)
i.set(, sInstance)
} else {
val cInstance = constructClassInstance(aInfo.dataType().asInstanceOf[ClassType], sVertex)
i.set(, cInstance)
private def mapVertexToCollectionEntry(instanceVertex: TitanVertex, attributeInfo: AttributeInfo, elementType: IDataType[_], i: ITypedInstance, value: Any): Any = {
elementType.getTypeCategory match {
case DataTypes.TypeCategory.PRIMITIVE => value
case DataTypes.TypeCategory.ENUM => value
case DataTypes.TypeCategory.STRUCT =>
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
case DataTypes.TypeCategory.TRAIT =>
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
case DataTypes.TypeCategory.CLASS => //loadStructAttribute(elementType, attributeInfo, i, v)
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
case _ =>
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")