blob: 1c3d9a40eda66142a54ca5920557bf47b7eed731 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.atlas.hive.hook;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Map;
public class HiveHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class);
private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "test";
public static final String DEFAULT_DB = "default";
private Driver driver;
private AtlasClient dgiCLient;
private SessionState ss;
public void setUp() throws Exception {
//Set-up hive session
HiveConf conf = new HiveConf();
driver = new Driver(conf);
ss = new SessionState(conf, System.getProperty(""));
ss = SessionState.start(ss);
Configuration configuration = ApplicationProperties.get();
dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
private void runCommand(String cmd) throws Exception {
public void testCreateDatabase() throws Exception {
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')");
String dbId = assertDatabaseIsRegistered(dbName);
Referenceable definition = dgiCLient.getEntity(dbId);
Map params = (Map) definition.get("parameters");
Assert.assertEquals(params.size(), 2);
Assert.assertEquals(params.get("p1"), "v1");
//There should be just one entity per dbname
runCommand("drop database " + dbName);
runCommand("create database " + dbName);
String dbid = assertDatabaseIsRegistered(dbName);
//assert on qualified name
Referenceable dbEntity = dgiCLient.getEntity(dbid);
Assert.assertEquals(dbEntity.get("qualifiedName"), dbName.toLowerCase() + "@" + CLUSTER_NAME);
private String dbName() {
return "db" + random();
private String createDatabase() throws Exception {
String dbName = dbName();
runCommand("create database " + dbName);
return dbName;
private String tableName() {
return "table" + random();
private String createTable() throws Exception {
return createTable(true);
private String createTable(boolean partition) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (partition ?
" partitioned by(dt string)" : ""));
return tableName;
public void testCreateTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
String colName = "col" + random();
runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)");
assertTableIsRegistered(dbName, tableName);
//there is only one instance of column registered
String colId = assertColumnIsRegistered(colName);
Referenceable colEntity = dgiCLient.getEntity(colId);
Assert.assertEquals(colEntity.get("qualifiedName"), String.format("%s.%s.%s@%s", dbName.toLowerCase(),
tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME));
tableName = createTable();
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableRef = dgiCLient.getEntity(tableId);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName);
Assert.assertEquals(tableRef.get("name"), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME);
final Referenceable sdRef = (Referenceable) tableRef.get("sd");
Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS), false);
//Create table where database doesn't exist, will create database instance as well
public void testRenameTable() throws Exception {
String tableName = createTable();
String newTableName = tableName();
runCommand(String.format("alter table %s rename to %s", tableName, newTableName));
assertTableIsRegistered(DEFAULT_DB, newTableName);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
private String assertColumnIsRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName);
String query =
String.format("%s where name = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
return assertEntityIsRegistered(query);
public void testCTAS() throws Exception {
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
public void testCreateView() throws Exception {
String tableName = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
assertTableIsRegistered(DEFAULT_DB, viewName);
public void testLoadData() throws Exception {
String tableName = createTable(false);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
public void testInsert() throws Exception {
String tableName = createTable();
String insertTableName = createTable();
String query =
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
String partId = assertPartitionIsRegistered(DEFAULT_DB, insertTableName, "2015-01-01");
Referenceable partitionEntity = dgiCLient.getEntity(partId);
String.format("%s.%s.%s@%s", "default", insertTableName.toLowerCase(), "2015-01-01", CLUSTER_NAME));
private String random() {
return RandomStringUtils.randomAlphanumeric(10);
private String file(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
return file.getAbsolutePath();
private String mkdir(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
return file.getAbsolutePath();
public void testExportImport() throws Exception {
String tableName = createTable(false);
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to \"" + filename + "\"";
tableName = createTable(false);
query = "import table " + tableName + " from '" + filename + "'";
public void testSelect() throws Exception {
String tableName = createTable();
String query = "select * from " + tableName;
String pid = assertProcessIsRegistered(query);
Referenceable processEntity = dgiCLient.getEntity(pid);
Assert.assertEquals(processEntity.get("name"), query.toLowerCase());
//single entity per query
query = "SELECT * from " + tableName.toUpperCase();
@Test(enabled = false)
public void testAlterTable() throws Exception {
String tableName = createTable();
String newName = tableName();
String query = "alter table " + tableName + " rename to " + newName;
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
@Test(enabled = false)
public void testAlterView() throws Exception {
String tableName = createTable();
String viewName = tableName();
String newName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
query = "alter view " + viewName + " rename to " + newName;
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName);
private String assertProcessIsRegistered(String queryStr) throws Exception {
// String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
// normalize(queryStr));
// assertEntityIsRegistered(dslQuery, true);
//todo replace with DSL
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
String gremlinQuery =
String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName,
return assertEntityIsRegistered(gremlinQuery);
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
return StringEscapeUtils.escapeJava(str.toLowerCase());
private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String query = String.format(
"%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String query = String.format(
"%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(query, "t");
private String assertDatabaseIsRegistered(String dbName) throws Exception {
LOG.debug("Searching for database {}", dbName);
String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(query);
private String assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception {
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', "
+ "db where name = '%s' and clusterName = '%s' select p", typeName, value,
tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(dslQuery, "p");
private String assertEntityIsRegistered(final String query, String... arg) throws Exception {
waitFor(2000, new Predicate() {
public boolean evaluate() throws Exception {
JSONArray results =;
return results.length() == 1;
String column = (arg.length > 0) ? arg[0] : "_col_0";
JSONArray results =;
JSONObject row = results.getJSONObject(0);
if (row.has("__guid")) {
return row.getString("__guid");
} else if (row.has("$id$")) {
return row.getJSONObject("$id$").getString("id");
} else {
return row.getJSONObject(column).getString("id");
private void assertEntityIsNotRegistered(String dslQuery) throws Exception {
JSONArray results = dgiCLient.searchByDSL(dslQuery);
Assert.assertEquals(results.length(), 0);
public void testLineage() throws Exception {
String table1 = createTable(false);
String db2 = createDatabase();
String table2 = tableName();
String query = String.format("create table %s.%s as select * from %s", db2, table2, table1);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1);
String table2Id = assertTableIsRegistered(db2, table2);
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2);
JSONObject response = dgiCLient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1);
response = dgiCLient.getOutputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices");
public interface Predicate {
* Perform a predicate evaluation.
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
boolean evaluate() throws Exception;
* Wait for a condition, expressed via a {@link Predicate} to become true.
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
boolean eval;
while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {"Waiting up to {} msec", mustEnd - System.currentTimeMillis());
if (!eval) {
throw new Exception("Waiting timed out after " + timeout + " msec");