blob: 2dfb67a2b7c36028954304e284eab9da7326a244 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.query
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Date, UUID}
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.TitanGraph
import org.apache.atlas.repository.Constants
import org.apache.atlas.repository.graph.TitanGraphProvider
import org.apache.atlas.TestUtils
import org.apache.commons.io.FileUtils
import scala.collection.mutable.ArrayBuffer
object HiveTitanSample {
private var nextVertexId: AtomicInteger = new AtomicInteger(0)
private var nextEdgeId: AtomicInteger = new AtomicInteger(1000)
trait Vertex {
val _id: String
def id = _id
val __version = 0
val __guid = s"""${UUID.randomUUID()}""".stripMargin
def addEdge(to: Vertex, label: String, edges: ArrayBuffer[String]): Int = {
val edgeId = nextEdgeId.incrementAndGet();
edges +=
s"""{"_id" : "${edgeId}", "_type" : "edge", "_inV" : "${to.id}", "_outV" : "$id", "_label" : "$label"}"""
edgeId
}
def toGSon(vertices: ArrayBuffer[String],
edges: ArrayBuffer[String]): Unit = {
val sb = new StringBuilder
sb.append( s"""{"${Constants.ENTITY_TYPE_PROPERTY_KEY}" : "${this.getClass.getSimpleName}", "_type" : "vertex"""")
this.getClass.getDeclaredFields filter (_.getName != "traits") foreach { f =>
f.setAccessible(true)
val fV = f.get(this)
val convertedVal = fV match {
case _: String => s""""$fV""""
case ls: List[_] if isPrimitiveType(ls) =>
s"""["${ls.mkString(",")}"]"""
case d: Date => d.getTime
case _ => fV
}
convertedVal match {
case x: Vertex => addEdge(x, s"__${this.getClass.getSimpleName}.${f.getName}", edges)
case l: List[_] => val edgeList = l.map(x =>
s""""${addEdge(x.asInstanceOf[Vertex], s"__${this.getClass.getSimpleName}.${f.getName}", edges)}""""
)
if(l.head.isInstanceOf[Struct]) {
sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : ${edgeList.mkString("[", ",", "]")}""")
}
case _ => sb.append( s""", "${f.getName}" : $convertedVal""")
sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : $convertedVal""")
}
}
this.getClass.getDeclaredFields filter (_.getName == "traits") foreach { f =>
f.setAccessible(true)
var traits = f.get(this).asInstanceOf[Option[List[Trait]]]
if (traits.isDefined) {
val fV = traits.get.map(_.getClass.getSimpleName).mkString(",")
sb.append( s""", "${Constants.TRAIT_NAMES_PROPERTY_KEY}" : "$fV"""")
}
}
sb.append("}")
vertices += sb.toString()
}
def isPrimitiveType(ls: List[_]) : Boolean = {
ls.head match {
case _: String => true
case _: Byte => true
case _: Short => true
case _: Int => true
case _: Long => true
case _: Float => true
case _: Double => true
case _: BigDecimal => true
case _: BigInt => true
case _: Boolean => true
case default => false
}
}
}
trait Trait extends Vertex
trait Struct extends Vertex
trait Instance extends Vertex {
val traits: Option[List[Trait]]
override def toGSon(vertices: ArrayBuffer[String],
edges: ArrayBuffer[String]): Unit = {
super.toGSon(vertices, edges)
if (traits.isDefined) {
traits.get foreach { t =>
t.toGSon(vertices, edges)
addEdge(t, s"${t.getClass.getSimpleName}", edges)
}
}
}
}
case class JdbcAccess(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class PII(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class Dimension(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class Metric(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class ETL(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class DB(name: String, owner: String, createTime: Int, clusterName: String, traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class HiveOrder(col: String, order: Int,
_id: String = "" + nextVertexId.incrementAndGet()) extends Struct
case class StorageDescriptor(inputFormat: String, outputFormat: String,
sortCols: List[Struct], _id: String = "" + nextVertexId.incrementAndGet()) extends Struct {
override def toGSon(vertices: ArrayBuffer[String],
edges: ArrayBuffer[String]): Unit = {
sortCols.foreach(_.toGSon(vertices, edges))
super.toGSon(vertices, edges)
}
}
case class Column(name: String, dataType: String, sd: StorageDescriptor,
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class Table(name: String, db: DB, sd: StorageDescriptor,
created: Date,
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class TableDef(name: String, db: DB, sd: StorageDescriptor,
columns: List[(String, String, Option[List[Trait]])],
traits: Option[List[Trait]] = None,
created: Option[Date] = None) {
val createdDate : Date = created match {
case Some(x) => x
case None => new Date(TestUtils.TEST_DATE_IN_LONG)
}
val colDefs = columns map { c =>
Column(c._1, c._2, sd, c._3)
}
val tablDef = Table(name, db, sd, createdDate, traits)
def toGSon(vertices: ArrayBuffer[String],
edges: ArrayBuffer[String]): Unit = {
sd.toGSon(vertices, edges)
colDefs foreach {
_.toGSon(vertices, edges)
}
tablDef.toGSon(vertices, edges)
}
}
case class Partition(values: List[String], table: Table, traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class LoadProcess(name: String, inputTables: List[Vertex],
outputTable: Vertex,
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class View(name: String, db: DB, inputTables: List[Vertex],
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
val salesDB = DB("Sales", "John ETL", 1000, "test")
val salesFact = TableDef("sales_fact",
salesDB,
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("time_id", "int", None),
("product_id", "int", None),
("customer_id", "int", None),
("created", "date", None),
("sales", "double", Some(List(Metric())))
))
val productDim = TableDef("product_dim",
salesDB,
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("product_id", 0))),
List(
("product_id", "int", None),
("product_name", "string", None),
("brand_name", "string", None)
),
Some(List(Dimension())))
val timeDim = TableDef("time_dim",
salesDB,
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("time_id", 0))),
List(
("time_id", "int", None),
("dayOfYear", "int", None),
("weekDay", "string", None)
),
Some(List(Dimension())))
val customerDim = TableDef("customer_dim",
salesDB,
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("customer_id", "int", None),
("name", "int", None),
("address", "string", Some(List(PII())))
),
Some(List(Dimension())))
val reportingDB = DB("Reporting", "Jane BI", 1500, "test")
val salesFactDaily = TableDef("sales_fact_daily_mv",
reportingDB,
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("time_id", "int", None),
("product_id", "int", None),
("customer_id", "int", None),
("sales", "double", Some(List(Metric())))
))
val loadSalesFactDaily = LoadProcess("loadSalesDaily",
List(salesFact.tablDef, timeDim.tablDef), salesFactDaily.tablDef,
Some(List(ETL())))
val productDimView = View("product_dim_view", reportingDB,
List(productDim.tablDef),
Some(List(Dimension(), JdbcAccess())))
val customerDimView = View("customer_dim_view", reportingDB,
List(customerDim.tablDef),
Some(List(Dimension(), JdbcAccess())))
val salesFactMonthly = TableDef("sales_fact_monthly_mv",
reportingDB,
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("time_id", "int", None),
("product_id", "int", None),
("customer_id", "int", None),
("sales", "double", Some(List(Metric())))
))
val loadSalesFactMonthly = LoadProcess("loadSalesMonthly",
List(salesFactDaily.tablDef), salesFactMonthly.tablDef,
Some(List(ETL())))
val salesDailyPartition = Partition(List("2015-01-01"),salesFactDaily.tablDef)
val vertices: ArrayBuffer[String] = new ArrayBuffer[String]()
val edges: ArrayBuffer[String] = new ArrayBuffer[String]()
salesDB.toGSon(vertices, edges)
salesFact.toGSon(vertices, edges)
productDim.toGSon(vertices, edges)
timeDim.toGSon(vertices, edges)
customerDim.toGSon(vertices, edges)
reportingDB.toGSon(vertices, edges)
salesFactDaily.toGSon(vertices, edges)
loadSalesFactDaily.toGSon(vertices, edges)
productDimView.toGSon(vertices, edges)
customerDimView.toGSon(vertices, edges)
salesFactMonthly.toGSon(vertices, edges)
loadSalesFactMonthly.toGSon(vertices, edges)
salesDailyPartition.toGSon(vertices, edges)
def toGSon(): String = {
s"""{
"mode":"NORMAL",
"vertices": ${vertices.mkString("[\n\t", ",\n\t", "\n]")},
"edges": ${edges.mkString("[\n\t", ",\n\t", "\n]")}
}
""".stripMargin
}
def writeGson(fileName: String): Unit = {
FileUtils.writeStringToFile(new File(fileName), toGSon())
}
val GremlinQueries = List(
// 1. List all DBs
"""g.V.has("typeName", "DB")""",
// 2. List all DB nmes
"""g.V.has("typeName", "DB").name""",
// 3. List all Tables in Reporting DB
"""g.V.has("typeName", "DB").has("name", "Reporting").inE("Table.db").outV""",
"""g.V.has("typeName", "DB").as("db").inE("Table.db").outV.and(_().back("db").has("name", "Reporting"))""",
// 4. List all Tables in Reporting DB, list as D.name, Tbl.name
"""
g.V.has("typeName", "DB").has("name", "Reporting").as("db").inE("Table.db").outV.as("tbl").select{it.name}{it.name}
""".stripMargin,
// 5. List all tables that are Dimensions and have the TextInputFormat
"""
g.V.as("v").and(_().outE("Table.Dimension"), _().out("Table.sd").has("inputFormat", "TextInputFormat")).name
""".stripMargin,
// 6. List all tables that are Dimensions or have the TextInputFormat
"""
g.V.as("v").or(_().outE("Table.Dimension"), _().out("Table.sd").has("inputFormat", "TextInputFormat")).name
""".stripMargin,
// 7. List tables that have at least 1 PII column
"""
g.V.has("typeName", "Table").as("tab").out("Table.sd").in("Column.sd").as("column"). \
out("Column.PII").select.groupBy{it.getColumn("tab")}{it.getColumn("column")}{[ "c" : it.size]}.cap.scatter.filter{it.value.c > 0}. \
transform{it.key}.name """.stripMargin
// 7.a from Table as tab -> g.V.has("typeName", "Table").as("tab")
// 7.b sd.Column as column -> out("Table.sd").in("Column.sd").as("column")
// 7.c is PII -> out("Column.PII")
// 7.d select tab, column -> select{it}{it}
// 7.e groupBy tab compute count(column) as c
// 7.f where c > 0
// 7.a Alias(Type("Table"), "tab")
// 7b. Field("sd", Alias(Type("Table"), "tab"))
// Alias(Field("Column", Field("sd", Alias(Type("Table"), "tab"))), "column")
// 7.c Filter(is("PII"), Alias(Field("Column", Field("sd", Alias(Type("Table"), "tab"))), "column"))
// 7.d
)
}
object TestApp extends App with GraphUtils {
val g: TitanGraph = TitanGraphProvider.getGraphInstance
val manager: ScriptEngineManager = new ScriptEngineManager
val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
val bindings: Bindings = engine.createBindings
bindings.put("g", g)
val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson"
HiveTitanSample.writeGson(hiveGraphFile)
bindings.put("hiveGraphFile", hiveGraphFile)
try {
engine.eval("g.loadGraphSON(hiveGraphFile)", bindings)
println(engine.eval("g.V.typeName.toList()", bindings))
HiveTitanSample.GremlinQueries.foreach { q =>
println(q)
println("Result: " + engine.eval(q + ".toList()", bindings))
}
} finally {
g.shutdown()
}
}