blob: 486cfb0db2007a823f24e408f13c7ea6d595947e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.File;
import java.text.ParseException;
import java.util.*;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class HiveHookIT extends HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class);
private static final String PART_FILE = "2015-01-01";
@Test
public void testCreateDatabase() throws Exception {
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')");
String dbId = assertDatabaseIsRegistered(dbName);
AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity();
Map params = (Map) dbEntity.getAttribute(ATTRIBUTE_PARAMETERS);
Assert.assertNotNull(params);
Assert.assertEquals(params.size(), 2);
Assert.assertEquals(params.get("p1"), "v1");
//There should be just one entity per dbname
runCommand("drop database " + dbName);
assertDBIsNotRegistered(dbName);
runCommand("create database " + dbName);
dbId = assertDatabaseIsRegistered(dbName);
//assert on qualified name
dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity();
Assert.assertEquals(dbEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) , dbName.toLowerCase() + "@" + CLUSTER_NAME);
}
@Test
public void testCreateTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
String colName = columnName();
runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)");
String tableId = assertTableIsRegistered(dbName, tableName);
String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); //there is only one instance of column registered
AtlasEntity colEntity = atlasClientV2.getEntityByGuid(colId).getEntity();
Assert.assertEquals(colEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), String.format("%s.%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME));
Assert.assertNotNull(colEntity.getAttribute(ATTRIBUTE_TABLE));
AtlasObjectId tblObjId = toAtlasObjectId(colEntity.getAttribute(ATTRIBUTE_TABLE));
Assert.assertEquals(tblObjId.getGuid(), tableId);
//assert that column.owner = table.owner
AtlasEntity tblEntity1 = atlasClientV2.getEntityByGuid(tableId).getEntity();
AtlasEntity colEntity1 = atlasClientV2.getEntityByGuid(colId).getEntity();
assertEquals(tblEntity1.getAttribute(ATTRIBUTE_OWNER), colEntity1.getAttribute(ATTRIBUTE_OWNER));
//create table where db is not registered
tableName = createTable();
tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tableId).getEntity();
Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_TABLE_TYPE), TableType.MANAGED_TABLE.name());
Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
Assert.assertEquals(tblEntity2.getAttribute(AtlasClient.NAME), tableName.toLowerCase());
Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entityName);
Table t = hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, tableName);
long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * MILLIS_CONVERT_FACTOR;
verifyTimestamps(tblEntity2, ATTRIBUTE_CREATE_TIME, createTime);
verifyTimestamps(tblEntity2, ATTRIBUTE_LAST_ACCESS_TIME, createTime);
final AtlasObjectId sdEntity = toAtlasObjectId(tblEntity2.getAttribute(ATTRIBUTE_STORAGEDESC));
Assert.assertNotNull(sdEntity);
// Assert.assertEquals(((Id) sdRef.getAttribute(HiveMetaStoreBridge.TABLE))._getId(), tableId);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered(DEFAULT_DB);
}
private void verifyTimestamps(AtlasEntity ref, String property, long expectedTime) throws ParseException {
//Verify timestamps.
Object createTime = ref.getAttribute(property);
Assert.assertNotNull(createTime);
if (expectedTime > 0) {
Assert.assertEquals(expectedTime, createTime);
}
}
private void verifyTimestamps(AtlasEntity ref, String property) throws ParseException {
verifyTimestamps(ref, property, 0);
}
//ATLAS-1321: Disable problematic tests. Need to revisit and fix them later
@Test(enabled = false)
public void testCreateExternalTable() throws Exception {
String tableName = tableName();
String colName = columnName();
String pFile = createTestDFSPath("parentPath");
String query = String.format("create EXTERNAL table %s.%s(%s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
AtlasEntity processsEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
assertEquals(processsEntity.getAttribute("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
verifyTimestamps(processsEntity, "startTime");
verifyTimestamps(processsEntity, "endTime");
validateHDFSPaths(processsEntity, INPUTS, pFile);
}
private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) throws HiveException {
final ReadEntity entity = new ReadEntity();
if (Entity.Type.DFS_DIR.equals(entityType)) {
entity.setName(lower(new Path(inputName).toString()));
entity.setTyp(Entity.Type.DFS_DIR);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(entityType);
}
if (entityType == Entity.Type.TABLE) {
entity.setT(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, inputName));
}
return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
}
private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) throws HiveException {
final WriteEntity entity = new WriteEntity();
if (Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
entity.setName(lower(new Path(inputName).toString()));
entity.setTyp(entityType);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(entityType);
}
if (entityType == Entity.Type.TABLE) {
entity.setT(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, inputName));
}
return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
}
private void validateOutputTables(AtlasEntity processEntity, Set<WriteEntity> expectedTables) throws Exception {
validateTables(toAtlasObjectIdList(processEntity.getAttribute(ATTRIBUTE_OUTPUTS)), expectedTables);
}
private void validateInputTables(AtlasEntity processEntity, Set<ReadEntity> expectedTables) throws Exception {
validateTables(toAtlasObjectIdList(processEntity.getAttribute(ATTRIBUTE_INPUTS)), expectedTables);
}
private void validateTables(List<AtlasObjectId> tableIds, Set<? extends Entity> expectedTables) throws Exception {
if (tableIds == null) {
Assert.assertTrue(CollectionUtils.isEmpty(expectedTables));
} else if (expectedTables == null) {
Assert.assertTrue(CollectionUtils.isEmpty(tableIds));
} else {
Assert.assertEquals(tableIds.size(), expectedTables.size());
List<String> entityQualifiedNames = new ArrayList<>(tableIds.size());
List<String> expectedTableNames = new ArrayList<>(expectedTables.size());
for (AtlasObjectId tableId : tableIds) {
AtlasEntity atlasEntity = atlasClientV2.getEntityByGuid(tableId.getGuid()).getEntity();
entityQualifiedNames.add((String) atlasEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
for (Iterator<? extends Entity> iterator = expectedTables.iterator(); iterator.hasNext(); ) {
Entity hiveEntity = iterator.next();
expectedTableNames.add(hiveEntity.getName());
}
for (String entityQualifiedName : entityQualifiedNames) {
boolean found = false;
for (String expectedTableName : expectedTableNames) {
if (entityQualifiedName.startsWith(expectedTableName)) {
found = true;
break;
}
}
assertTrue(found, "Table name '" + entityQualifiedName + "' does not start with any name in the expected list " + expectedTableNames);
}
}
}
private String assertColumnIsRegistered(String colName) throws Exception {
return assertColumnIsRegistered(colName, null);
}
private String assertColumnIsRegistered(String colName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for column {}", colName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), ATTRIBUTE_QUALIFIED_NAME, colName, assertPredicate);
}
private String assertSDIsRegistered(String sdQFName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for sd {}", sdQFName.toLowerCase());
return assertEntityIsRegistered(HiveDataTypes.HIVE_STORAGEDESC.getName(), ATTRIBUTE_QUALIFIED_NAME, sdQFName.toLowerCase(), assertPredicate);
}
private void assertColumnIsNotRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), ATTRIBUTE_QUALIFIED_NAME, colName);
}
@Test
public void testCTAS() throws Exception {
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, Entity.Type.TABLE);
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities));
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
}
private HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
HiveEventContext event = new HiveEventContext();
event.setQueryStr(query);
event.setOperation(op);
event.setInputs(inputs);
event.setOutputs(outputs);
return event;
}
@Test
public void testEmptyStringAsValue() throws Exception{
String tableName = tableName();
String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
public void testDropAndRecreateCTASOutput() throws Exception {
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
String processId = assertProcessIsRegistered(hiveEventContext);
String drpquery = String.format("drop table %s ", ctasTableName);
runCommandWithDelay(drpquery, 100);
assertTableIsNotRegistered(DEFAULT_DB, ctasTableName);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs);
assertNotEquals(process2Id, processId);
AtlasEntity processsEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
validateOutputTables(processsEntity, outputs);
}
@Test
public void testCreateView() throws Exception {
String tableName = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
assertTableIsRegistered(DEFAULT_DB, viewName);
}
@Test
public void testAlterViewAsSelect() throws Exception {
//Create the view from table1
String table1Name = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + table1Name;
runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
//Check lineage which includes table1
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
String tableId = assertTableIsRegistered(DEFAULT_DB, viewName);
AtlasLineageInfo inputLineageInfo = atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT, 0);
Map<String, AtlasEntityHeader> entityMap = inputLineageInfo.getGuidEntityMap();
assertTrue(entityMap.containsKey(viewId));
assertTrue(entityMap.containsKey(table1Id));
//Alter the view from table2
String table2Name = createTable();
query = "alter view " + viewName + " as select * from " + table2Name;
runCommand(query);
//Check if alter view process is reqistered
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
String tableId1 = assertTableIsRegistered(DEFAULT_DB, viewName);
AtlasLineageInfo inputLineageInfo1 = atlasClientV2.getLineageInfo(tableId1, AtlasLineageInfo.LineageDirection.INPUT, 0);
Map<String, AtlasEntityHeader> entityMap1 = inputLineageInfo1.getGuidEntityMap();
assertTrue(entityMap1.containsKey(viewId));
//This is through the alter view process
assertTrue(entityMap1.containsKey(table2Id));
//This is through the Create view process
assertTrue(entityMap1.containsKey(table1Id));
//Outputs dont exist
AtlasLineageInfo outputLineageInfo = atlasClientV2.getLineageInfo(tableId1, AtlasLineageInfo.LineageDirection.OUTPUT, 0);
Map<String, AtlasEntityHeader> entityMap2 = outputLineageInfo.getGuidEntityMap();
assertEquals(entityMap2.size(),0);
}
private String createTestDFSFile(String path) throws Exception {
return "pfile://" + file(path);
}
@Test
public void testLoadLocalPath() throws Exception {
String tableName = createTable(false);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
runCommand(query);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@Test
public void testLoadLocalPathIntoPartition() throws Exception {
String tableName = createTable(true);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@Test
public void testLoadDFSPathPartitioned() throws Exception {
String tableName = createTable(true, true, false);
assertTableIsRegistered(DEFAULT_DB, tableName);
String loadFile = createTestDFSFile("loadDFSFile");
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR);
Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs);
partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION));
AtlasEntity processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs);
validateHDFSPaths(processReference, INPUTS, loadFile);
validateOutputTables(processReference, outputs);
String loadFile2 = createTestDFSFile("loadDFSFile1");
query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR);
Set<ReadEntity> expectedInputs = new LinkedHashSet<>();
expectedInputs.addAll(process2Inputs);
expectedInputs.addAll(inputs);
validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs);
}
private String getQualifiedTblName(String inputTable) {
String inputtblQlfdName = inputTable;
if (inputTable != null && !inputTable.contains("@")) {
inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
}
return inputtblQlfdName;
}
private AtlasEntity validateProcess(HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception {
String processId = assertProcessIsRegistered(event, inputTables, outputTables);
AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
List<AtlasObjectId> inputs = toAtlasObjectIdList(processEntity.getAttribute(INPUTS));
List<AtlasObjectId> outputs = toAtlasObjectIdList(processEntity.getAttribute(OUTPUTS));
if (inputTables == null) {
Assert.assertTrue(CollectionUtils.isEmpty(inputs));
} else {
Assert.assertEquals(inputs.size(), inputTables.size());
validateInputTables(processEntity, inputTables);
}
if (outputTables == null) {
Assert.assertTrue(CollectionUtils.isEmpty(outputs));
} else {
Assert.assertEquals(outputs.size(), outputTables.size());
validateOutputTables(processEntity, outputTables);
}
return processEntity;
}
private AtlasEntity validateProcess(HiveEventContext event) throws Exception {
return validateProcess(event, event.getInputs(), event.getOutputs());
}
@Test
public void testInsertIntoTable() throws Exception {
String inputTable1Name = createTable();
String inputTable2Name = createTable();
String insertTableName = createTable();
assertTableIsRegistered(DEFAULT_DB, inputTable1Name);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id";
runCommand(query);
Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE);
inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE));
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
(outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{
addAll(inputs);
}};
assertTableIsRegistered(DEFAULT_DB, insertTableName);
AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs);
//Test sorting of tbl names
SortedSet<String> sortedTblNames = new TreeSet<>();
sortedTblNames.add(inputTable1Name.toLowerCase());
sortedTblNames.add(inputTable2Name.toLowerCase());
//Verify sorted order of inputs in qualified name
Assert.assertEquals(processEntity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
Joiner.on(SEP).join("QUERY",
getQualifiedTblName(sortedTblNames.first()),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.first())),
getQualifiedTblName(sortedTblNames.last()),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.last())))
+ IO_SEP + SEP
+ Joiner.on(SEP).
join(WriteEntity.WriteType.INSERT.name(),
getQualifiedTblName(insertTableName),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, insertTableName)))
);
//Rerun same query. Should result in same process
runCommandWithDelay(query, 1000);
AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs);
Assert.assertEquals(processEntity1.getGuid(), processEntity2.getGuid());
}
@Test
public void testInsertIntoLocalDir() throws Exception {
String tableName = createTable();
File randomLocalPath = File.createTempFile("hiverandom", ".tmp");
String query = "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
validateProcess(constructEvent(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null));
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
public void testUpdateProcess() throws Exception {
String tableName = createTable();
String pFile1 = createTestDFSPath("somedfspath1");
String query = "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE);
HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
AtlasEntity processEntity = validateProcess(hiveEventContext);
validateHDFSPaths(processEntity, OUTPUTS, pFile1);
assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processEntity, inputs);
//Rerun same query with same HDFS path
runCommandWithDelay(query, 1000);
assertTableIsRegistered(DEFAULT_DB, tableName);
AtlasEntity process2Entity = validateProcess(hiveEventContext);
validateHDFSPaths(process2Entity, OUTPUTS, pFile1);
Assert.assertEquals(process2Entity.getGuid(), processEntity.getGuid());
//Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations
String pFile2 = createTestDFSPath("somedfspath2");
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
runCommandWithDelay(query, 1000);
assertTableIsRegistered(DEFAULT_DB, tableName);
Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
addAll(outputs);
}};
AtlasEntity process3Entity = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs));
validateHDFSPaths(process3Entity, OUTPUTS, pFile2);
Assert.assertEquals(process3Entity.getGuid(), processEntity.getGuid());
}
@Test
public void testInsertIntoDFSDirPartitioned() throws Exception {
//Test with partitioned table
String tableName = createTable(true);
String pFile1 = createTestDFSPath("somedfspath1");
String query = "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE);
Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs);
partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION));
AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs);
//Rerun same query with different HDFS path. Should not create another process and should update it.
String pFile2 = createTestDFSPath("somedfspath2");
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
runCommand(query);
Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR);
pFile2Outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE);
//Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition
Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(pFile2Outputs);
addAll(outputs);
}};
AtlasEntity process2Entity = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs);
validateHDFSPaths(process2Entity, OUTPUTS, pFile2);
Assert.assertEquals(process2Entity.getGuid(), processEntity.getGuid());
}
//Disabling test as temporary table is not captured by hiveHook(https://issues.apache.org/jira/browse/ATLAS-1274)
@Test(enabled = false)
public void testInsertIntoTempTable() throws Exception {
String tableName = createTable();
String insertTableName = createTable(false, false, true);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsNotRegistered(DEFAULT_DB, insertTableName, true);
String query = "insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT);
validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, outputs));
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
}
@Test
public void testInsertIntoPartition() throws Exception {
boolean isPartitionedTable = true;
String tableName = createTable(isPartitionedTable);
String insertTableName = createTable(isPartitionedTable);
String query = "insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName + " where dt = '"+ PART_FILE + "'";
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT);
Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() {
{
addAll(inputs);
add(getPartitionInput());
}
};
Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() {
{
addAll(outputs);
add(getPartitionOutput());
}
};
validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, partitionOps), inputs, outputs);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
//TODO -Add update test case
}
@Test
public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("exportUnPartitioned");
String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
validateHDFSPaths(processEntity, OUTPUTS, filename);
validateInputTables(processEntity, inputs);
//Import
String importTableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, importTableName);
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(importTableName, Entity.Type.TABLE);
validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
//Should create another process
filename = "pfile://" + mkdir("export2UnPartitioned");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
inputs = getInputs(tableName, Entity.Type.TABLE);
outputs = getOutputs(filename, Entity.Type.DFS_DIR);
validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
//import again shouyld create another process
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(importTableName, Entity.Type.TABLE);
validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
}
@Test
public void testExportImportPartitionedTable() throws Exception {
boolean isPartitionedTable = true;
String tableName = createTable(isPartitionedTable);
assertTableIsRegistered(DEFAULT_DB, tableName);
//Add a partition
String partFile = "pfile://" + mkdir("partition");
String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'";
runCommand(query);
String filename = "pfile://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case
partitionIps.addAll(expectedExportInputs);
AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs);
validateHDFSPaths(processEntity, OUTPUTS, filename);
//Import
String importTableName = createTable(true);
assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE);
Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
partitionOps.addAll(importOutputs);
validateProcess(constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps), expectedImportInputs, importOutputs);
//Export should update same process
filename = "pfile://" + mkdir("export2");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR);
Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(outputs2);
addAll(outputs);
}};
validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2), expectedExportInputs, p3Outputs);
query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')";
runCommand(query);
//Import should update same process
query = "import table " + importTableName + " from '" + filename + "'";
runCommandWithDelay(query, 1000);
Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR);
Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{
addAll(importInputs);
addAll(expectedImportInputs);
}};
validateProcess(constructEvent(query, HiveOperation.IMPORT, importInputs, partitionOps), expectedImport2Inputs, importOutputs);
}
@Test
public void testIgnoreSelect() throws Exception {
String tableName = createTable();
String query = "select * from " + tableName;
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null);
assertProcessIsNotRegistered(hiveEventContext);
//check with uppercase table name
query = "SELECT * from " + tableName.toUpperCase();
runCommand(query);
assertProcessIsNotRegistered(hiveEventContext);
}
@Test
public void testAlterTableRenameAliasRegistered() throws Exception{
String tableName = createTable(false);
String tableGuid = assertTableIsRegistered(DEFAULT_DB, tableName);
String newTableName = tableName();
String query = String.format("alter table %s rename to %s", tableName, newTableName);
runCommand(query);
String newTableGuid = assertTableIsRegistered(DEFAULT_DB, newTableName);
assertEquals(tableGuid, newTableGuid);
AtlasEntity atlasEntity = atlasClientV2.getEntityByGuid(newTableGuid).getEntity();
Map<String, Object> valueMap = atlasEntity.getAttributes();
Iterable<String> aliasList = (Iterable<String>) valueMap.get("aliases");
String aliasTableName = aliasList.iterator().next();
assert tableName.toLowerCase().equals(aliasTableName);
}
@Test
public void testAlterTableRename() throws Exception {
String tableName = createTable(true);
String newDBName = createDatabase();
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
AtlasEntity tableEntity = atlasClientV2.getEntityByGuid(tableId).getEntity();
String createTime = String.valueOf(tableEntity.getAttribute(ATTRIBUTE_CREATE_TIME));
Assert.assertNotNull(createTime);
String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null);
assertDatabaseIsRegistered(newDBName);
String colTraitDetails = createTrait(columnGuid); //Add trait to column
String sdTraitDetails = createTrait(sdGuid); //Add trait to sd
String partColumnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt"));
String partColTraitDetails = createTrait(partColumnGuid); //Add trait to part col keys
String newTableName = tableName();
String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName);
runCommandWithDelay(query, 1000);
String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME));
Assert.assertEquals(newColGuid, columnGuid);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), NAME));
assertTrait(columnGuid, colTraitDetails);
String newSdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName)), null);
Assert.assertEquals(newSdGuid, sdGuid);
assertTrait(sdGuid, sdTraitDetails);
assertTrait(partColumnGuid, partColTraitDetails);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(newDBName, newTableName, new AssertPredicate() {
@Override
public void assertOnEntity(final AtlasEntity entity) throws Exception {
AtlasObjectId sd = toAtlasObjectId(entity.getAttribute(ATTRIBUTE_STORAGEDESC));
assertNotNull(sd);
}
});
}
private List<AtlasEntity> getColumns(String dbName, String tableName) throws Exception {
String tableId = assertTableIsRegistered(dbName, tableName);
AtlasEntityWithExtInfo tblEntityWithExtInfo = atlasClientV2.getEntityByGuid(tableId);
AtlasEntity tableEntity = tblEntityWithExtInfo.getEntity();
//with soft delete, the deleted columns are returned as well. So, filter the deleted ones
List<AtlasObjectId> columns = toAtlasObjectIdList(tableEntity.getAttribute(ATTRIBUTE_COLUMNS));
List<AtlasEntity> activeColumns = new ArrayList<>();
for (AtlasObjectId col : columns) {
AtlasEntity columnEntity = tblEntityWithExtInfo.getEntity(col.getGuid());
if (columnEntity.getStatus() == AtlasEntity.Status.ACTIVE) {
activeColumns.add(columnEntity);
}
}
return activeColumns;
}
private String createTrait(String guid) throws AtlasServiceException {
//add trait
//valid type names in v2 must consist of a letter followed by a sequence of letter, number, or _ characters
String traitName = "PII_Trait" + random();
AtlasClassificationDef piiTrait = AtlasTypeUtil.createTraitTypeDef(traitName, ImmutableSet.<String>of());
atlasClientV2.createAtlasTypeDefs(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(), Collections.singletonList(piiTrait), Collections.emptyList()));
atlasClientV2.addClassifications(guid, Collections.singletonList(new AtlasClassification(piiTrait.getName())));
return traitName;
}
private void assertTrait(String guid, String traitName) throws AtlasServiceException {
AtlasClassification.AtlasClassifications classifications = atlasClientV2.getClassifications(guid);
Assert.assertEquals(classifications.getList().get(0).getTypeName(), traitName);
}
@Test
public void testAlterTableAddColumn() throws Exception {
String tableName = createTable();
String column = columnName();
String query = "alter table " + tableName + " add columns (" + column + " string)";
runCommand(query);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), column));
//Verify the number of columns present in the table
List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 3);
}
//ATLAS-1321: Disable problematic tests. Need to revisit and fix them later
@Test(enabled = false)
public void testAlterTableDropColumn() throws Exception {
String tableName = createTable();
String colDropped = "id";
String query = "alter table " + tableName + " replace columns (name string)";
runCommand(query);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), colDropped));
//Verify the number of columns present in the table
List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName);
assertEquals(columns.size(), 1);
assertEquals(columns.get(0).getAttribute(NAME), "name");
}
@Test
public void testAlterTableChangeColumn() throws Exception {
//Change name
String oldColName = NAME;
String newColName = "name1";
String tableName = createTable();
String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
runCommandWithDelay(query, 1000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName));
//Verify the number of columns present in the table
List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
//Change column type
oldColName = "name1";
newColName = "name2";
String newColType = "int";
query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
assertEquals(entity.getAttribute("type"), "int");
}
});
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
//Change name and add comment
oldColName = "name2";
newColName = "name3";
String comment = "added comment";
query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
assertEquals(entity.getAttribute(ATTRIBUTE_COMMENT), comment);
}
});
//Change column position
oldColName = "name3";
newColName = "name4";
query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName);
String finalNewColName = newColName;
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS));
assertEquals(columns.size(), 2);
}
}
);
//Change col position again
oldColName = "name4";
newColName = "name5";
query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName);
//Check col position
String finalNewColName2 = newColName;
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS));
assertEquals(columns.size(), 2);
}
}
);
}
/*
The test is disabled by default
Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column level lineage is not
committed in Hive version 1.2.x
This test will fail if the lineage information is not available from Hive
Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled
Please track HIVE-14706 to know the status of column lineage availability in latest Hive versions i.e 2.1.x
*/
@Test(enabled = false)
public void testColumnLevelLineage() throws Exception {
String sourceTable = "table" + random();
runCommand("create table " + sourceTable + "(a int, b int)");
String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable);
String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a"));
String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b"));
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as " + "select sum(a+b) as a, count(*) as b from " + sourceTable;
runCommand(query);
String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "a"));
String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "b"));
Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
assertProcessIsRegistered(event);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String processQName = sortEventsAndGetProcessQualifiedName(event);
List<String> aLineageInputs = Arrays.asList(a_guid, b_guid);
String aLineageProcessName = processQName + ":" + "a";
LOG.debug("Searching for column lineage process {} ", aLineageProcessName);
String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), ATTRIBUTE_QUALIFIED_NAME, aLineageProcessName, null);
AtlasEntity colLineageEntity = atlasClientV2.getEntityByGuid(guid).getEntity();
List<AtlasObjectId> processInputs = toAtlasObjectIdList(colLineageEntity.getAttribute("inputs"));
List<String> processInputsAsString = new ArrayList<>();
for(AtlasObjectId input: processInputs){
processInputsAsString.add(input.getGuid());
}
Collections.sort(processInputsAsString);
Collections.sort(aLineageInputs);
Assert.assertEquals(processInputsAsString, aLineageInputs);
List<String> bLineageInputs = Arrays.asList(sourceTableGUID);
String bLineageProcessName = processQName + ":" + "b";
LOG.debug("Searching for column lineage process {} ", bLineageProcessName);
String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), ATTRIBUTE_QUALIFIED_NAME, bLineageProcessName, null);
AtlasEntity colLineageEntity1 = atlasClientV2.getEntityByGuid(guid1).getEntity();
List<AtlasObjectId> bProcessInputs = toAtlasObjectIdList(colLineageEntity1.getAttribute("inputs"));
List<String> bProcessInputsAsString = new ArrayList<>();
for(AtlasObjectId input: bProcessInputs){
bProcessInputsAsString.add(input.getGuid());
}
Collections.sort(bProcessInputsAsString);
Collections.sort(bLineageInputs);
Assert.assertEquals(bProcessInputsAsString, bLineageInputs);
//Test lineage API response
AtlasLineageInfo atlasLineageInfoInput = atlasClientV2.getLineageInfo(dest_a_guid, AtlasLineageInfo.LineageDirection.INPUT,0);
Map<String, AtlasEntityHeader> entityMap = atlasLineageInfoInput.getGuidEntityMap();
JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
JSONObject dest_a_val = vertices.getJSONObject(dest_a_guid);
JSONObject src_a_val = vertices.getJSONObject(a_guid);
JSONObject src_b_val = vertices.getJSONObject(b_guid);
Assert.assertNotNull(dest_a_val);
Assert.assertNotNull(src_a_val);
Assert.assertNotNull(src_b_val);
JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid);
JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices");
JSONObject b_val = b_vertices.getJSONObject(dest_b_guid);
JSONObject src_tbl_val = b_vertices.getJSONObject(sourceTableGUID);
Assert.assertNotNull(b_val);
Assert.assertNotNull(src_tbl_val);
}
@Test
public void testTruncateTable() throws Exception {
String tableName = createTable(false);
String query = String.format("truncate table %s", tableName);
runCommand(query);
Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateProcess(constructEvent(query, HiveOperation.TRUNCATETABLE, null, outputs));
//Check lineage
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
AtlasLineageInfo atlasLineageInfoInput = atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT,0);
Map<String, AtlasEntityHeader> entityMap = atlasLineageInfoInput.getGuidEntityMap();
//Below should be assertTrue - Fix https://issues.apache.org/jira/browse/ATLAS-653
Assert.assertTrue(entityMap.containsKey(tableId));
}
@Test
public void testAlterTablePartitionColumnType() throws Exception {
String tableName = createTable(true, true, false);
String newType = "int";
String query = String.format("ALTER TABLE %s PARTITION COLUMN (dt %s)", tableName, newType);
runCommand(query);
String colQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt");
String dtColId = assertColumnIsRegistered(colQualifiedName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity column) throws Exception {
Assert.assertEquals(column.getAttribute("type"), newType);
}
});
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity table) throws Exception {
final List<AtlasObjectId> partitionKeys = toAtlasObjectIdList(table.getAttribute("partitionKeys"));
Assert.assertEquals(partitionKeys.size(), 1);
Assert.assertEquals(partitionKeys.get(0).getGuid(), dtColId);
}
});
}
@Test
public void testAlterTableWithoutHookConf() throws Exception {
String tableName = tableName();
String createCommand = "create table " + tableName + " (id int, name string)";
driverWithoutContext.run(createCommand);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
String command = "alter table " + tableName + " change id id_new string";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new"));
}
@Test
public void testTraitsPreservedOnColumnRename() throws Exception {
String dbName = createDatabase();
String tableName = tableName();
String createQuery = String.format("create table %s.%s (id int, name string)", dbName, tableName);
runCommand(createQuery);
String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName);
String guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id"));
String trait = createTrait(guid);
String oldColName = "id";
String newColName = "id_new";
String query = String.format("alter table %s.%s change %s %s string", dbName, tableName, oldColName, newColName);
runCommand(query);
String guid2 = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new"));
assertEquals(guid2, guid);
assertTrue(atlasClient.getEntity(guid2).getTraits().contains(trait));
}
@Test
public void testAlterViewRename() throws Exception {
String tableName = createTable();
String viewName = tableName();
String newName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
query = "alter view " + viewName + " rename to " + newName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
@Test
public void testAlterTableLocation() throws Exception {
//Its an external table, so the HDFS location should also be registered as an entity
String tableName = createTable(true, true, false);
String testPath = createTestDFSPath("testBaseDir");
String query = "alter table " + tableName + " set location '" + testPath + "'";
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity tableRef) throws Exception {
AtlasObjectId sd = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
assertNotNull(sd);
}
});
String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQualifiedName, null);
AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
validateHDFSPaths(processEntity, INPUTS, testPath);
}
@Test
public void testAlterTableFileFormat() throws Exception {
String tableName = createTable();
String testFormat = "orc";
String query = "alter table " + tableName + " set FILEFORMAT " + testFormat;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity tableRef) throws Exception {
AtlasObjectId sdObjectId = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
AtlasEntity sdEntity = atlasClientV2.getEntityByGuid(sdObjectId.getGuid()).getEntity();
Assert.assertEquals(sdEntity.getAttribute(ATTRIBUTE_INPUT_FORMAT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
Assert.assertEquals(sdEntity.getAttribute(ATTRIBUTE_OUTPUT_FORMAT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
Assert.assertNotNull(sdEntity.getAttribute(ATTRIBUTE_SERDE_INFO));
AtlasStruct serdeInfo = toAtlasStruct(sdEntity.getAttribute(ATTRIBUTE_SERDE_INFO));
Assert.assertEquals(serdeInfo.getAttribute(ATTRIBUTE_SERIALIZATION_LIB), "org.apache.hadoop.hive.ql.io.orc.OrcSerde");
Assert.assertNotNull(serdeInfo.getAttribute(ATTRIBUTE_PARAMETERS));
Assert.assertEquals(((Map<String, String>) serdeInfo.getAttribute(ATTRIBUTE_PARAMETERS)).get("serialization.format"), "1");
}
});
/**
* Hive 'alter table stored as' is not supported - See https://issues.apache.org/jira/browse/HIVE-9576
* query = "alter table " + tableName + " STORED AS " + testFormat.toUpperCase();
* runCommand(query);
* tableRef = atlasClientV1.getEntity(tableId);
* sdRef = (AtlasEntity)tableRef.getAttribute(HiveMetaStoreBridge.STORAGE_DESC);
* Assert.assertEquals(sdRef.getAttribute(HiveMetaStoreBridge.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
* Assert.assertEquals(sdRef.getAttribute(HiveMetaStoreBridge.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
* Assert.assertEquals(((Map) sdRef.getAttribute(HiveMetaStoreBridge.PARAMETERS)).getAttribute("orc.compress"), "ZLIB");
*/
}
@Test
public void testAlterTableBucketingClusterSort() throws Exception {
String tableName = createTable();
List<String> cols = Collections.singletonList("id");
runBucketSortQuery(tableName, 5, cols, cols);
cols = Arrays.asList("id", NAME);
runBucketSortQuery(tableName, 2, cols, cols);
}
private void runBucketSortQuery(String tableName, final int numBuckets, final List<String> bucketCols, final List<String> sortCols) throws Exception {
String fmtQuery = "alter table %s CLUSTERED BY (%s) SORTED BY (%s) INTO %s BUCKETS";
String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()), stripListBrackets(sortCols.toString()), numBuckets);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
verifyBucketSortingProperties(entity, numBuckets, bucketCols, sortCols);
}
});
}
private String stripListBrackets(String listElements) {
return StringUtils.strip(StringUtils.strip(listElements, "["), "]");
}
private void verifyBucketSortingProperties(AtlasEntity tableRef, int numBuckets, List<String> bucketColNames, List<String> sortcolNames) throws Exception {
AtlasObjectId sdObjectId = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
AtlasEntity sdEntity = atlasClientV2.getEntityByGuid(sdObjectId.getGuid()).getEntity();
Assert.assertEquals((sdEntity.getAttribute(ATTRIBUTE_NUM_BUCKETS)), numBuckets);
Assert.assertEquals(sdEntity.getAttribute(ATTRIBUTE_BUCKET_COLS), bucketColNames);
List<AtlasStruct> hiveOrderStructList = toAtlasStructList(sdEntity.getAttribute(ATTRIBUTE_SORT_COLS));
Assert.assertNotNull(hiveOrderStructList);
Assert.assertEquals(hiveOrderStructList.size(), sortcolNames.size());
for (int i = 0; i < sortcolNames.size(); i++) {
AtlasStruct hiveOrderStruct = hiveOrderStructList.get(i);
Assert.assertNotNull(hiveOrderStruct);
Assert.assertEquals(hiveOrderStruct.getAttribute("col"), sortcolNames.get(i));
Assert.assertEquals(hiveOrderStruct.getAttribute("order"), 1);
}
}
@Test
public void testAlterTableSerde() throws Exception {
//SERDE PROPERTIES
String tableName = createTable();
Map<String, String> expectedProps = new HashMap<String, String>() {{
put("key1", "value1");
}};
runSerdePropsQuery(tableName, expectedProps);
expectedProps.put("key2", "value2");
//Add another property
runSerdePropsQuery(tableName, expectedProps);
}
@Test
public void testDropTable() throws Exception {
//Test Deletion of tables and its corrresponding columns
String tableName = createTable(true, true, false);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id"));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
String query = String.format("drop table %s ", tableName);
runCommandWithDelay(query, 1000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
private WriteEntity getPartitionOutput() {
WriteEntity partEntity = new WriteEntity();
partEntity.setName(PART_FILE);
partEntity.setTyp(Entity.Type.PARTITION);
return partEntity;
}
private ReadEntity getPartitionInput() {
ReadEntity partEntity = new ReadEntity();
partEntity.setName(PART_FILE);
partEntity.setTyp(Entity.Type.PARTITION);
return partEntity;
}
@Test
public void testDropDatabaseWithCascade() throws Exception {
//Test Deletion of database and its corresponding tables
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1')");
int numTables = 10;
String[] tableNames = new String[numTables];
for(int i = 0; i < numTables; i++) {
tableNames[i] = createTable(true, true, false);
}
String query = String.format("drop database %s cascade", dbName);
runCommand(query);
//Verify columns are not registered for one of the tables
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), NAME));
for(int i = 0; i < numTables; i++) {
assertTableIsNotRegistered(dbName, tableNames[i]);
}
assertDBIsNotRegistered(dbName);
}
@Test
public void testDropDatabaseWithoutCascade() throws Exception {
//Test Deletion of database and its corresponding tables
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1')");
int numTables = 5;
String[] tableNames = new String[numTables];
for(int i = 0; i < numTables; i++) {
tableNames[i] = createTable(true, true, false);
String query = String.format("drop table %s", tableNames[i]);
runCommand(query);
assertTableIsNotRegistered(dbName, tableNames[i]);
}
String query = String.format("drop database %s", dbName);
runCommand(query);
assertDBIsNotRegistered(dbName);
}
@Test
public void testDropNonExistingDB() throws Exception {
//Test Deletion of a non existing DB
String dbName = "nonexistingdb";
assertDBIsNotRegistered(dbName);
String query = String.format("drop database if exists %s cascade", dbName);
runCommand(query);
//Should have no effect
assertDBIsNotRegistered(dbName);
}
@Test
public void testDropNonExistingTable() throws Exception {
//Test Deletion of a non existing table
String tableName = "nonexistingtable";
assertTableIsNotRegistered(DEFAULT_DB, tableName);
String query = String.format("drop table if exists %s", tableName);
runCommand(query);
//Should have no effect
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
@Test
public void testDropView() throws Exception {
//Test Deletion of tables and its corrresponding columns
String tableName = createTable(true, true, false);
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, viewName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME));
query = String.format("drop view %s ", viewName);
runCommandWithDelay(query, 1000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME));
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
private void runSerdePropsQuery(String tableName, Map<String, String> expectedProps) throws Exception {
String serdeLib = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
String serializedProps = getSerializedProps(expectedProps);
String query = String.format("alter table %s set SERDE '%s' WITH SERDEPROPERTIES (%s)", tableName, serdeLib, serializedProps);
runCommand(query);
verifyTableSdProperties(tableName, serdeLib, expectedProps);
}
private String getSerializedProps(Map<String, String> expectedProps) {
StringBuilder sb = new StringBuilder();
for(String expectedPropKey : expectedProps.keySet()) {
if(sb.length() > 0) {
sb.append(",");
}
sb.append("'").append(expectedPropKey).append("'");
sb.append("=");
sb.append("'").append(expectedProps.get(expectedPropKey)).append("'");
}
return sb.toString();
}
@Test
public void testAlterDBOwner() throws Exception {
String dbName = createDatabase();
assertDatabaseIsRegistered(dbName);
String owner = "testOwner";
String fmtQuery = "alter database %s set OWNER %s %s";
String query = String.format(fmtQuery, dbName, "USER", owner);
runCommandWithDelay(query, 1000);
assertDatabaseIsRegistered(dbName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) {
assertEquals(entity.getAttribute(AtlasClient.OWNER), owner);
}
});
}
@Test
public void testAlterDBProperties() throws Exception {
String dbName = createDatabase();
String fmtQuery = "alter database %s %s DBPROPERTIES (%s)";
testAlterProperties(Entity.Type.DATABASE, dbName, fmtQuery);
}
@Test
public void testAlterTableProperties() throws Exception {
String tableName = createTable();
String fmtQuery = "alter table %s %s TBLPROPERTIES (%s)";
testAlterProperties(Entity.Type.TABLE, tableName, fmtQuery);
}
private void testAlterProperties(Entity.Type entityType, String entityName, String fmtQuery) throws Exception {
String SET_OP = "set";
String UNSET_OP = "unset";
Map<String, String> expectedProps = new HashMap<String, String>() {{
put("testPropKey1", "testPropValue1");
put("comment", "test comment");
}};
String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, false);
expectedProps.put("testPropKey2", "testPropValue2");
//Add another property
query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, false);
if (entityType != Entity.Type.DATABASE) {
//Database unset properties doesnt work - alter database %s unset DBPROPERTIES doesnt work
//Unset all the props
StringBuilder sb = new StringBuilder("'");
query = String.format(fmtQuery, entityName, UNSET_OP, Joiner.on("','").skipNulls().appendTo(sb, expectedProps.keySet()).append('\''));
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, true);
}
}
@Test
public void testAlterViewProperties() throws Exception {
String tableName = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
String fmtQuery = "alter view %s %s TBLPROPERTIES (%s)";
testAlterProperties(Entity.Type.TABLE, viewName, fmtQuery);
}
private void verifyEntityProperties(Entity.Type type, String entityName, final Map<String, String> expectedProps, final boolean checkIfNotExists) throws Exception {
switch(type) {
case TABLE:
assertTableIsRegistered(DEFAULT_DB, entityName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
verifyProperties(entity, expectedProps, checkIfNotExists);
}
});
break;
case DATABASE:
assertDatabaseIsRegistered(entityName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
verifyProperties(entity, expectedProps, checkIfNotExists);
}
});
break;
}
}
private void verifyTableSdProperties(String tableName, final String serdeLib, final Map<String, String> expectedProps) throws Exception {
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity tableRef) throws Exception {
AtlasObjectId sdEntity = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
assertNotNull(sdEntity);
}
});
}
private void verifyProperties(AtlasStruct referenceable, Map<String, String> expectedProps, boolean checkIfNotExists) {
Map<String, String> parameters = (Map<String, String>) referenceable.getAttribute(ATTRIBUTE_PARAMETERS);
if (!checkIfNotExists) {
//Check if properties exist
Assert.assertNotNull(parameters);
for (String propKey : expectedProps.keySet()) {
Assert.assertEquals(parameters.get(propKey), expectedProps.get(propKey));
}
} else {
//Check if properties dont exist
if (expectedProps != null && parameters != null) {
for (String propKey : expectedProps.keySet()) {
Assert.assertFalse(parameters.containsKey(propKey));
}
}
}
}
private String sortEventsAndGetProcessQualifiedName(final HiveEventContext event) throws HiveException{
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if (event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if (event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
return getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
}
private String assertProcessIsRegistered(final HiveEventContext event) throws Exception {
try {
String processQFName = sortEventsAndGetProcessQualifiedName(event);
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final AtlasEntity entity) throws Exception {
List<String> recentQueries = (List<String>) entity.getAttribute(ATTRIBUTE_RECENT_QUERIES);
Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
}
});
} catch (Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
private String assertProcessIsRegistered(final HiveEventContext event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if (event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if (event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final AtlasEntity entity) throws Exception {
List<String> recentQueries = (List<String>) entity.getAttribute(BaseHiveEvent.ATTRIBUTE_RECENT_QUERIES);
Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
}
});
} catch(Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
private String getDSTypeName(Entity entity) {
return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : HiveMetaStoreBridge.HDFS_PATH;
}
private <T extends Entity> SortedMap<T, AtlasEntity> getSortedProcessDataSets(Set<T> inputTbls) {
SortedMap<T, AtlasEntity> inputs = new TreeMap<>(entityComparator);
if (inputTbls != null) {
for (final T tbl : inputTbls) {
AtlasEntity inputTableRef = new AtlasEntity(getDSTypeName(tbl), new HashMap<String, Object>() {{
put(ATTRIBUTE_QUALIFIED_NAME, tbl.getName());
}});
inputs.put(tbl, inputTableRef);
}
}
return inputs;
}
private void assertProcessIsNotRegistered(HiveEventContext event) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if (event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if (event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName);
} catch(Exception e) {
LOG.error("Exception : ", e);
}
}
private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporaryTable);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
}
private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, false);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
}
private void assertDBIsNotRegistered(String dbName) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
}
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception {
return assertTableIsRegistered(dbName, tableName, assertPredicate, false);
}
private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
try {
atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
} catch (AtlasServiceException e) {
if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
return;
}
}
fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, "
+ "attributeValue = %s", typeName, property, value));
}
});
}
@Test
public void testLineage() throws Exception {
String table1 = createTable(false);
String db2 = createDatabase();
String table2 = tableName();
String query = String.format("create table %s.%s as select * from %s", db2, table2, table1);
runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1);
String table2Id = assertTableIsRegistered(db2, table2);
AtlasLineageInfo inputLineage = atlasClientV2.getLineageInfo(table2Id, AtlasLineageInfo.LineageDirection.INPUT, 0);
Map<String, AtlasEntityHeader> entityMap = inputLineage.getGuidEntityMap();
assertTrue(entityMap.containsKey(table1Id));
assertTrue(entityMap.containsKey(table2Id));
AtlasLineageInfo inputLineage1 = atlasClientV2.getLineageInfo(table1Id, AtlasLineageInfo.LineageDirection.OUTPUT, 0);
Map<String, AtlasEntityHeader> entityMap1 = inputLineage1.getGuidEntityMap();
assertTrue(entityMap1.containsKey(table1Id));
assertTrue(entityMap1.containsKey(table2Id));
}
//For ATLAS-448
@Test
public void testNoopOperation() throws Exception {
runCommand("show compactions");
runCommand("show transactions");
}
private String dbName() {
return "db" + random();
}
private String createDatabase() throws Exception {
String dbName = dbName();
runCommand("create database " + dbName);
return dbName;
}
private String columnName() {
return "col" + random();
}
private String createTable() throws Exception {
return createTable(false);
}
private String createTable(boolean isPartitioned) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : ""));
return tableName;
}
private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception {
String tableName = tableName();
String location = "";
if (isExternal) {
location = " location '" + createTestDFSPath("someTestPath") + "'";
}
runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "") + location);
return tableName;
}
}