blob: 1190114c38e315c6ea19629008382358f6526b85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.query
import java.util
import org.apache.atlas.repository.graphdb.AtlasGraph
import org.apache.atlas.query.Expressions._
import org.apache.atlas.typesystem.ITypedStruct
import org.apache.atlas.typesystem.json.{InstanceSerialization, Serialization}
import org.apache.atlas.typesystem.persistence.StructInstance
import org.apache.atlas.typesystem.types.DataTypes.{MapType, PrimitiveType}
import org.apache.atlas.typesystem.types.{DataTypes, StructType, TypeSystem}
/**
* Represents a Query to compute the closure based on a relationship between entities of a particular type.
* For e.g. Database Tables are related to each other to capture the '''Lineage''' of data in a Table based
* on other Tables.
*
* A Closure Query is specified by the following information:
* - The Type whose instances are in a closure relationship. For e.g. 'Table'
* - The Closure relation. This is specified as an ''Attribute path''. For e.g. if we have the following model:
* {{{
* class Table {
* name : String,
* ...
* }
*
* class LoadTableProcess {
* name : String,
* inputTables : List[Table],
* outputTable : Table,
* ...
* }
* }}}
* ''LoadTable'' instance captures the relationship between the data in an output Table and a set of input Tables.
* In order to compute the '''Lineage''' of a Table, the ''Attribute path'' that relates 2 Tables is
* '''[(LoadTableProcess,outputTable), inputTables]'''. This list is saying that for any Table I want to connect to other
* tables via the LoadProcess.outputTable attribute, and then via the inputTables attribute. So each entry in the
* Attribute Path represents an attribute in an object. For reverse relations the Type and attribute must be specified,
* as in 'LoadTableProcess,outputTable)', whereas for forward relations the attribute name is sufficient.
* - The depth of the traversal. Certain times you are not interested in the complete closure, but to only
* discover related instances up to a certain depth. Specify the depth as number of hops, or you can ask for the
* complete closure.
* - You can ask for certain attributes to be returned. For e.g. you may only want the Table name, owner and
* creationDate. By default only the Ids of the related instances is returned.
* - For pair of related instances, you optionally ask for the Path of the relation to be returned. This is
* returned as a list of ''Id''s.
*
* Given these 5 things the ClosureQuery can be executed, it returns a GremlinQueryResult of the Closure Query.
*/
trait ClosureQuery {
val SRC_PREFIX = TypeUtils.GraphResultStruct.SRC_PREFIX
val DEST_PREFIX = TypeUtils.GraphResultStruct.DEST_PREFIX
sealed trait PathAttribute {
def toExpr : Expression = this match {
case r : Relation => fieldId(r.attributeName)
case rr : ReverseRelation => fieldId(s"${rr.typeName}->${rr.attributeName}")
}
def toFieldName : String = this match {
case r : Relation => r.attributeName
case rr : ReverseRelation => rr.typeName
}
}
case class ReverseRelation(typeName : String, attributeName : String) extends PathAttribute
case class Relation(attributeName : String) extends PathAttribute
/**
* Type on whose instances the closure needs to be computed
* @return
*/
def closureType : String
/**
* specify how instances are related.
*/
def closureRelation : List[PathAttribute]
/**
* The maximum hops between related instances. A [[None]] implies there is maximum.
* @return
*/
def depth : Option[Int]
/**
* The attributes to return for the instances. These will be prefixed by 'src_' and 'dest_' in the
* output rows.
* @return
*/
def selectAttributes : Option[List[String]]
/**
* specify if the Path should be returned.
* @return
*/
def withPath : Boolean
def persistenceStrategy: GraphPersistenceStrategies
def g: AtlasGraph[_,_]
def pathExpr : Expressions.Expression = {
closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName))
}
def selectExpr(alias : String) : List[Expression] = {
selectAttributes.map { _.map { a =>
fieldId(alias).field(a).as(s"${alias}_$a")
}
}.getOrElse(List(fieldId(alias)))
}
/**
* hook to allow a filter to be added for the closureType
* @param expr
* @return
*/
def srcCondition(expr : Expression) : Expression = expr
def expr : Expressions.Expression = {
val e = srcCondition(Expressions._class(closureType)).as(SRC_PREFIX).loop(pathExpr).as(DEST_PREFIX).
select((selectExpr(SRC_PREFIX) ++ selectExpr(DEST_PREFIX)):_*)
if (withPath) e.path else e
}
def evaluate(): GremlinQueryResult = {
var e = expr
QueryProcessor.evaluate(e, g, persistenceStrategy)
}
def graph(res: GremlinQueryResult) : GraphResult = {
if (!withPath) {
throw new ExpressionException(expr, "Graph requested for non Path Query")
}
import scala.collection.JavaConverters._
val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType])
val vertexPayloadType = {
val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName).
dataType().asInstanceOf[MapType]
mT.getValueType.asInstanceOf[StructType]
}
def id(idObj : StructInstance) : String = idObj.getString(TypeSystem.ID_STRUCT_ID_ATTRNAME)
def vertexStruct(idObj : StructInstance, resRow : ITypedStruct, attrPrefix : String) : StructInstance = {
val vP = vertexPayloadType.createInstance()
vP.set(TypeUtils.GraphResultStruct.vertexIdAttrName, idObj)
vertexPayloadType.fieldMapping.fields.asScala.keys.
filter(_ != TypeUtils.GraphResultStruct.vertexIdAttrName).foreach{a =>
vP.set(a, resRow.get(s"${attrPrefix}$a"))
}
vP.asInstanceOf[StructInstance]
}
val instance = graphResType.createInstance()
val vertices = new util.HashMap[String, AnyRef]()
val edges = new util.HashMap[String,java.util.List[String]]()
/**
* foreach resultRow
* for each Path entry
* add an entry in the edges Map
* add an entry for the Src vertex to the vertex Map
* add an entry for the Dest vertex to the vertex Map
*/
res.rows.asScala.map(_.asInstanceOf[StructInstance]).foreach { r =>
val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala
val srcVertex = path.head.asInstanceOf[StructInstance]
var currVertex = srcVertex
path.tail.foreach { n =>
val nextVertex = n.asInstanceOf[StructInstance]
val iList = if (!edges.containsKey(id(currVertex))) {
val l = new util.ArrayList[String]()
edges.put(id(currVertex), l)
l
} else {
edges.get(id(currVertex))
}
if ( !iList.contains(id(nextVertex))) {
iList.add(id(nextVertex))
}
currVertex = nextVertex
}
val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName)
vertices.put(id(srcVertex), vertexStruct(srcVertex,
r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
s"${SRC_PREFIX}_"))
vertices.put(id(currVertex), vertexStruct(currVertex,
r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct],
s"${DEST_PREFIX}_"))
}
instance.set(TypeUtils.GraphResultStruct.verticesAttrName, vertices)
instance.set(TypeUtils.GraphResultStruct.edgesAttrName, edges)
GraphResult(res.query, instance)
}
}
/**
* Closure for a single instance. Instance is specified by an ''attributeToSelectInstance'' and the value
* for the attribute.
*
* @tparam T
*/
trait SingleInstanceClosureQuery[T] extends ClosureQuery {
def attributeToSelectInstance : String
def attributeTyp : PrimitiveType[T]
def instanceValue : T
override def srcCondition(expr : Expression) : Expression = {
expr.where(
Expressions.fieldId(attributeToSelectInstance).`=`(Expressions.literal(attributeTyp, instanceValue))
)
}
}
import scala.language.existentials;
/**
* A ClosureQuery to compute '''Lineage''' for Hive tables. Assumes the Lineage relation is captured in a ''CTAS''
* type, and the table relations are captured as attributes from a CTAS instance to Table instances.
*
* @param tableTypeName The name of the Table Type.
* @param ctasTypeName The name of the Create Table As Select(CTAS) Type.
* @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables.
* @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables.
* @param depth depth as needed by the closure Query.
* @param selectAttributes as needed by the closure Query.
* @param withPath as needed by the closure Query.
* @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query.
*/
case class InputLineageClosureQuery(tableTypeName : String,
attributeToSelectInstance : String,
tableName : String,
ctasTypeName : String,
ctasInputTableAttribute : String,
ctasOutputTableAttribute : String,
depth : Option[Int],
selectAttributes : Option[List[String]],
withPath : Boolean,
persistenceStrategy: GraphPersistenceStrategies,
g: AtlasGraph[_,_]
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
lazy val closureRelation = List(
ReverseRelation(ctasTypeName, ctasOutputTableAttribute),
Relation(ctasInputTableAttribute)
)
}
/**
* A ClosureQuery to compute where a table is used based on the '''Lineage''' for Hive tables.
* Assumes the Lineage relation is captured in a ''CTAS''
* type, and the table relations are captured as attributes from a CTAS instance to Table instances.
*
* @param tableTypeName The name of the Table Type.
* @param ctasTypeName The name of the Create Table As Select(CTAS) Type.
* @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables.
* @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables.
* @param depth depth as needed by the closure Query.
* @param selectAttributes as needed by the closure Query.
* @param withPath as needed by the closure Query.
* @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query.
*/
case class OutputLineageClosureQuery(tableTypeName : String,
attributeToSelectInstance : String,
tableName : String,
ctasTypeName : String,
ctasInputTableAttribute : String,
ctasOutputTableAttribute : String,
depth : Option[Int],
selectAttributes : Option[List[String]],
withPath : Boolean,
persistenceStrategy: GraphPersistenceStrategies,
g: AtlasGraph[_,_]
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
lazy val closureRelation = List(
ReverseRelation(ctasTypeName, ctasInputTableAttribute),
Relation(ctasOutputTableAttribute)
)
}
case class GraphResult(query: String, result : ITypedStruct) {
def toTypedJson = Serialization.toJson(result)
def toInstanceJson = InstanceSerialization.toJson(result)
}