blob: 979e7299245d5b34b6b6f33749d51a31ec8cb13e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.File;
import java.text.ParseException;
import java.util.*;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
import static org.apache.atlas.hive.hook.HiveHook.SEP;
import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class HiveHookIT extends HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class);
private static final String PART_FILE = "2015-01-01";
@Test
public void testCreateDatabase() throws Exception {
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')");
String dbId = assertDatabaseIsRegistered(dbName);
Referenceable definition = atlasClient.getEntity(dbId);
Map params = (Map) definition.get(HiveMetaStoreBridge.PARAMETERS);
Assert.assertNotNull(params);
Assert.assertEquals(params.size(), 2);
Assert.assertEquals(params.get("p1"), "v1");
//There should be just one entity per dbname
runCommand("drop database " + dbName);
assertDBIsNotRegistered(dbName);
runCommand("create database " + dbName);
String dbid = assertDatabaseIsRegistered(dbName);
//assert on qualified name
Referenceable dbEntity = atlasClient.getEntity(dbid);
Assert.assertEquals(dbEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), dbName.toLowerCase() + "@" + CLUSTER_NAME);
}
private String dbName() {
return "db" + random();
}
private String createDatabase() throws Exception {
String dbName = dbName();
runCommand("create database " + dbName);
return dbName;
}
private String columnName() {
return "col" + random();
}
private String createTable() throws Exception {
return createTable(false);
}
private String createTable(boolean isPartitioned) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
" partitioned by(dt string)" : ""));
return tableName;
}
private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception {
String tableName = tableName();
String location = "";
if (isExternal) {
location = " location '" + createTestDFSPath("someTestPath") + "'";
}
runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
" partitioned by(dt string)" : "") + location);
return tableName;
}
@Test
public void testCreateTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
String colName = columnName();
runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)");
String tableId = assertTableIsRegistered(dbName, tableName);
//there is only one instance of column registered
String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName));
Referenceable colEntity = atlasClient.getEntity(colId);
Assert.assertEquals(colEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), String.format("%s.%s.%s@%s", dbName.toLowerCase(),
tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME));
Assert.assertNotNull(colEntity.get(HiveMetaStoreBridge.TABLE));
Assert.assertEquals(((Id) colEntity.get(HiveMetaStoreBridge.TABLE))._getId(), tableId);
//assert that column.owner = table.owner
Referenceable tableRef = atlasClient.getEntity(tableId);
assertEquals(tableRef.get(AtlasClient.OWNER), colEntity.get(AtlasClient.OWNER));
//create table where db is not registered
tableName = createTable();
tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
tableRef = atlasClient.getEntity(tableId);
Assert.assertEquals(tableRef.get(HiveMetaStoreBridge.TABLE_TYPE_ATTR), TableType.MANAGED_TABLE.name());
Assert.assertEquals(tableRef.get(HiveMetaStoreBridge.COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
Assert.assertEquals(tableRef.get(AtlasClient.NAME), tableName.toLowerCase());
Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName);
Table t = hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, tableName);
long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * HiveMetaStoreBridge.MILLIS_CONVERT_FACTOR;
verifyTimestamps(tableRef, HiveMetaStoreBridge.CREATE_TIME, createTime);
verifyTimestamps(tableRef, HiveMetaStoreBridge.LAST_ACCESS_TIME, createTime);
final Referenceable sdRef = (Referenceable) tableRef.get(HiveMetaStoreBridge.STORAGE_DESC);
Assert.assertEquals(sdRef.get(HiveMetaStoreBridge.STORAGE_IS_STORED_AS_SUB_DIRS), false);
Assert.assertNotNull(sdRef.get(HiveMetaStoreBridge.TABLE));
Assert.assertEquals(((Id) sdRef.get(HiveMetaStoreBridge.TABLE))._getId(), tableId);
//Create table where database doesn't exist, will create database instance as well
assertDatabaseIsRegistered(DEFAULT_DB);
}
private void verifyTimestamps(Referenceable ref, String property, long expectedTime) throws ParseException {
//Verify timestamps.
String createTimeStr = (String) ref.get(property);
Date createDate = TypeSystem.getInstance().getDateFormat().parse(createTimeStr);
Assert.assertNotNull(createTimeStr);
if (expectedTime > 0) {
Assert.assertEquals(expectedTime, createDate.getTime());
}
}
private void verifyTimestamps(Referenceable ref, String property) throws ParseException {
verifyTimestamps(ref, property, 0);
}
//ATLAS-1321: Disable problematic tests. Need to revisit and fix them later
@Test(enabled = false)
public void testCreateExternalTable() throws Exception {
String tableName = tableName();
String colName = columnName();
String pFile = createTestDFSPath("parentPath");
final String query = String.format("create EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
Referenceable processReference = atlasClient.getEntity(processId);
assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
verifyTimestamps(processReference, "startTime");
verifyTimestamps(processReference, "endTime");
validateHDFSPaths(processReference, INPUTS, pFile);
}
private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) throws HiveException {
final ReadEntity entity = new ReadEntity();
if ( Entity.Type.DFS_DIR.equals(entityType)) {
entity.setName(lower(new Path(inputName).toString()));
entity.setTyp(Entity.Type.DFS_DIR);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(entityType);
}
if (entityType == Entity.Type.TABLE) {
entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName));
}
return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
}
private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) throws HiveException {
final WriteEntity entity = new WriteEntity();
if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
entity.setName(lower(new Path(inputName).toString()));
entity.setTyp(entityType);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(entityType);
}
if (entityType == Entity.Type.TABLE) {
entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName));
}
return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
}
private void validateOutputTables(Referenceable processReference, Set<WriteEntity> expectedTables) throws Exception {
validateTables(processReference, OUTPUTS, expectedTables);
}
private void validateInputTables(Referenceable processReference, Set<ReadEntity> expectedTables) throws Exception {
validateTables(processReference, INPUTS, expectedTables);
}
private void validateTables(Referenceable processReference, String attrName, Set<? extends Entity> expectedTables) throws Exception {
List<Id> tableRef = (List<Id>) processReference.get(attrName);
Iterator<? extends Entity> iterator = expectedTables.iterator();
for(int i = 0; i < expectedTables.size(); i++) {
Entity hiveEntity = iterator.next();
if (Entity.Type.TABLE.equals(hiveEntity.getType()) ||
Entity.Type.DFS_DIR.equals(hiveEntity.getType())) {
Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
LOG.debug("Validating output {} {} ", i, entity);
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), hiveEntity.getName());
}
}
}
private String assertColumnIsRegistered(String colName) throws Exception {
return assertColumnIsRegistered(colName, null);
}
private String assertColumnIsRegistered(String colName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for column {}", colName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
colName, assertPredicate);
}
private String assertSDIsRegistered(String sdQFName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for sd {}", sdQFName.toLowerCase());
return assertEntityIsRegistered(HiveDataTypes.HIVE_STORAGEDESC.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
sdQFName.toLowerCase(), assertPredicate);
}
private void assertColumnIsNotRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
colName);
}
@Test
public void testCTAS() throws Exception {
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, Entity.Type.TABLE);
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities));
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
}
private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
HiveHook.HiveEventContext event = new HiveHook.HiveEventContext();
event.setQueryStr(query);
event.setOperation(op);
event.setInputs(inputs);
event.setOutputs(outputs);
return event;
}
@Test
public void testEmptyStringAsValue() throws Exception{
String tableName = tableName();
String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
public void testDropAndRecreateCTASOutput() throws Exception {
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
String processId = assertProcessIsRegistered(hiveEventContext);
final String drpquery = String.format("drop table %s ", ctasTableName);
runCommandWithDelay(drpquery, 100);
assertTableIsNotRegistered(DEFAULT_DB, ctasTableName);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs);
assertNotEquals(process2Id, processId);
Referenceable processRef = atlasClient.getEntity(processId);
validateOutputTables(processRef, outputs);
}
@Test
public void testCreateView() throws Exception {
String tableName = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
assertTableIsRegistered(DEFAULT_DB, viewName);
}
@Test
public void testAlterViewAsSelect() throws Exception {
//Create the view from table1
String table1Name = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + table1Name;
runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
//Check lineage which includes table1
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
JSONObject response = atlasClient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
assertTrue(vertices.has(viewId));
assertTrue(vertices.has(table1Id));
//Alter the view from table2
String table2Name = createTable();
query = "alter view " + viewName + " as select * from " + table2Name;
runCommand(query);
//Check if alter view process is reqistered
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
response = atlasClient.getInputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices");
assertTrue(vertices.has(viewId));
//This is through the alter view process
assertTrue(vertices.has(table2Id));
//This is through the Create view process
assertTrue(vertices.has(table1Id));
//Outputs dont exist
response = atlasClient.getOutputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 0);
}
private String createTestDFSFile(String path) throws Exception {
return "pfile://" + file(path);
}
@Test
public void testLoadLocalPath() throws Exception {
String tableName = createTable(false);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
runCommand(query);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@Test
public void testLoadLocalPathIntoPartition() throws Exception {
String tableName = createTable(true);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@Test
public void testLoadDFSPathPartitioned() throws Exception {
String tableName = createTable(true, true, false);
assertTableIsRegistered(DEFAULT_DB, tableName);
final String loadFile = createTestDFSFile("loadDFSFile");
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
final Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
final Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR);
final Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs);
partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION));
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs);
validateHDFSPaths(processReference, INPUTS, loadFile);
validateOutputTables(processReference, outputs);
final String loadFile2 = createTestDFSFile("loadDFSFile1");
query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR);
Set<ReadEntity> expectedInputs = new LinkedHashSet<>();
expectedInputs.addAll(process2Inputs);
expectedInputs.addAll(inputs);
validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs);
}
private String getQualifiedTblName(String inputTable) {
String inputtblQlfdName = inputTable;
if (inputTable != null && !inputTable.contains("@")) {
inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
}
return inputtblQlfdName;
}
private Referenceable validateProcess(HiveHook.HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception {
String processId = assertProcessIsRegistered(event, inputTables, outputTables);
Referenceable process = atlasClient.getEntity(processId);
if (inputTables == null) {
Assert.assertNull(process.get(INPUTS));
} else {
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputTables.size());
validateInputTables(process, inputTables);
}
if (outputTables == null) {
Assert.assertNull(process.get(OUTPUTS));
} else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputTables.size());
validateOutputTables(process, outputTables);
}
return process;
}
private Referenceable validateProcess(HiveHook.HiveEventContext event) throws Exception {
return validateProcess(event, event.getInputs(), event.getOutputs());
}
@Test
public void testInsertIntoTable() throws Exception {
String inputTable1Name = createTable();
String inputTable2Name = createTable();
String insertTableName = createTable();
assertTableIsRegistered(DEFAULT_DB, inputTable1Name);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id";
runCommand(query);
final Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE);
inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE));
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
(outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{
addAll(inputs);
}};
assertTableIsRegistered(DEFAULT_DB, insertTableName);
Referenceable processRef1 = validateProcess(event, expectedInputs, outputs);
//Test sorting of tbl names
SortedSet<String> sortedTblNames = new TreeSet<>();
sortedTblNames.add(inputTable1Name.toLowerCase());
sortedTblNames.add(inputTable2Name.toLowerCase());
//Verify sorted order of inputs in qualified name
Assert.assertEquals(
processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
Joiner.on(SEP).join("QUERY",
getQualifiedTblName(sortedTblNames.first()),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.first())).getTime(),
getQualifiedTblName(sortedTblNames.last()),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.last())).getTime())
+ IO_SEP + SEP
+ Joiner.on(SEP).
join(WriteEntity.WriteType.INSERT.name(),
getQualifiedTblName(insertTableName),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, insertTableName)).getTime())
);
//Rerun same query. Should result in same process
runCommandWithDelay(query, 1000);
Referenceable processRef2 = validateProcess(event, expectedInputs, outputs);
Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
}
@Test
public void testInsertIntoLocalDir() throws Exception {
String tableName = createTable();
File randomLocalPath = File.createTempFile("hiverandom", ".tmp");
String query =
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
validateProcess(constructEvent(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null));
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
public void testUpdateProcess() throws Exception {
String tableName = createTable();
String pFile1 = createTestDFSPath("somedfspath1");
String query =
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE);
final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
Referenceable processReference = validateProcess(hiveEventContext);
validateHDFSPaths(processReference, OUTPUTS, pFile1);
assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, inputs);
//Rerun same query with same HDFS path
runCommandWithDelay(query, 1000);
assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable process2Reference = validateProcess(hiveEventContext);
validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
//Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations
final String pFile2 = createTestDFSPath("somedfspath2");
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
runCommandWithDelay(query, 1000);
assertTableIsRegistered(DEFAULT_DB, tableName);
Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
addAll(outputs);
}};
Referenceable process3Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs));
validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
}
@Test
public void testInsertIntoDFSDirPartitioned() throws Exception {
//Test with partitioned table
String tableName = createTable(true);
String pFile1 = createTestDFSPath("somedfspath1");
String query =
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE);
final Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs);
partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION));
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs);
//Rerun same query with different HDFS path. Should not create another process and should update it.
final String pFile2 = createTestDFSPath("somedfspath2");
query =
"insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
runCommand(query);
final Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR);
pFile2Outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE);
//Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition
Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(pFile2Outputs);
addAll(outputs);
}};
Referenceable process2Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs);
validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
}
//Disabling test as temporary table is not captured by hiveHook(https://issues.apache.org/jira/browse/ATLAS-1274)
@Test(enabled = false)
public void testInsertIntoTempTable() throws Exception {
String tableName = createTable();
String insertTableName = createTable(false, false, true);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsNotRegistered(DEFAULT_DB, insertTableName, true);
String query =
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.iterator().next().setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT);
validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, outputs));
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
}
@Test
public void testInsertIntoPartition() throws Exception {
final boolean isPartitionedTable = true;
String tableName = createTable(isPartitionedTable);
String insertTableName = createTable(isPartitionedTable);
String query =
"insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName
+ " where dt = '"+ PART_FILE + "'";
runCommand(query);
final Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT);
final Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() {
{
addAll(inputs);
add(getPartitionInput());
}
};
final Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() {
{
addAll(outputs);
add(getPartitionOutput());
}
};
validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, partitionOps), inputs, outputs);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
//TODO -Add update test case
}
private String file(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
file.createNewFile();
return file.getAbsolutePath();
}
@Test
public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("exportUnPartitioned");
String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
validateHDFSPaths(processReference, OUTPUTS, filename);
validateInputTables(processReference, inputs);
//Import
String importTableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, importTableName);
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(importTableName, Entity.Type.TABLE);
validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
//Should create another process
filename = "pfile://" + mkdir("export2UnPartitioned");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
inputs = getInputs(tableName, Entity.Type.TABLE);
outputs = getOutputs(filename, Entity.Type.DFS_DIR);
validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
//import again shouyld create another process
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(importTableName, Entity.Type.TABLE);
validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
}
@Test
public void testExportImportPartitionedTable() throws Exception {
boolean isPartitionedTable = true;
final String tableName = createTable(isPartitionedTable);
assertTableIsRegistered(DEFAULT_DB, tableName);
//Add a partition
String partFile = "pfile://" + mkdir("partition");
String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'";
runCommand(query);
String filename = "pfile://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
final Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
//Note that export has only partition as input in this case
final Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
partitionIps.addAll(expectedExportInputs);
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, filename);
//Import
String importTableName = createTable(true);
assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
final Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
final Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE);
final Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
partitionOps.addAll(importOutputs);
validateProcess(constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps), expectedImportInputs, importOutputs);
//Export should update same process
filename = "pfile://" + mkdir("export2");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
final Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR);
Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(outputs2);
addAll(outputs);
}};
validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2), expectedExportInputs, p3Outputs);
query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')";
runCommand(query);
//Import should update same process
query = "import table " + importTableName + " from '" + filename + "'";
runCommandWithDelay(query, 1000);
final Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR);
final Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{
addAll(importInputs);
addAll(expectedImportInputs);
}};
validateProcess(constructEvent(query, HiveOperation.IMPORT, importInputs, partitionOps), expectedImport2Inputs, importOutputs);
}
@Test
public void testIgnoreSelect() throws Exception {
String tableName = createTable();
String query = "select * from " + tableName;
runCommand(query);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null);
assertProcessIsNotRegistered(hiveEventContext);
//check with uppercase table name
query = "SELECT * from " + tableName.toUpperCase();
runCommand(query);
assertProcessIsNotRegistered(hiveEventContext);
}
@Test
public void testAlterTableRenameAliasRegistered() throws Exception{
String tableName = createTable(false);
String tableGuid = assertTableIsRegistered(DEFAULT_DB, tableName);
String newTableName = tableName();
String query = String.format("alter table %s rename to %s", tableName, newTableName);
runCommand(query);
String newTableGuid = assertTableIsRegistered(DEFAULT_DB, newTableName);
Map<String, Object> valueMap = atlasClient.getEntity(newTableGuid).getValuesMap();
Iterable<String> aliasList = (Iterable<String>) valueMap.get("aliases");
String aliasTableName = aliasList.iterator().next();
assert tableName.toLowerCase().equals(aliasTableName);
}
@Test
public void testAlterTableRename() throws Exception {
String tableName = createTable(true);
final String newDBName = createDatabase();
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableEntity = atlasClient.getEntity(tableId);
final String createTime = (String)tableEntity.get(HiveMetaStoreBridge.CREATE_TIME);
Assert.assertNotNull(createTime);
String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null);
assertDatabaseIsRegistered(newDBName);
//Add trait to column
String colTraitDetails = createTrait(columnGuid);
//Add trait to sd
String sdTraitDetails = createTrait(sdGuid);
String partColumnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt"));
//Add trait to part col keys
String partColTraitDetails = createTrait(partColumnGuid);
final String newTableName = tableName();
String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName);
runCommandWithDelay(query, 1000);
String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME));
Assert.assertEquals(newColGuid, columnGuid);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), NAME));
assertTrait(columnGuid, colTraitDetails);
String newSdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName)), null);
Assert.assertEquals(newSdGuid, sdGuid);
assertTrait(sdGuid, sdTraitDetails);
assertTrait(partColumnGuid, partColTraitDetails);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(newDBName, newTableName, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
Referenceable sd = ((Referenceable) entity.get(HiveMetaStoreBridge.STORAGE_DESC));
String location = (String) sd.get(HiveMetaStoreBridge.LOCATION);
assertTrue(location.contains(newTableName));
Assert.assertEquals(entity.get(HiveMetaStoreBridge.CREATE_TIME), createTime);
}
});
}
private List<Referenceable> getColumns(String dbName, String tableName) throws Exception {
String tableId = assertTableIsRegistered(dbName, tableName);
Referenceable tableRef = atlasClient.getEntity(tableId);
//with soft delete, the deleted columns are returned as well. So, filter the deleted ones
List<Referenceable> columns = ((List<Referenceable>) tableRef.get(HiveMetaStoreBridge.COLUMNS));
List<Referenceable> activeColumns = new ArrayList<>();
for (Referenceable col : columns) {
if (col.getId().getState() == Id.EntityState.ACTIVE) {
activeColumns.add(col);
}
}
return activeColumns;
}
private String createTrait(String guid) throws AtlasServiceException, JSONException {
//add trait
//valid type names in v2 must consist of a letter followed by a sequence of letter, number, or _ characters
String traitName = "PII_Trait" + random();
atlasClient.createTraitType(traitName);
Struct traitInstance = new Struct(traitName);
atlasClient.addTrait(guid, traitInstance);
return traitName;
}
private void assertTrait(String guid, String traitName) throws AtlasServiceException, JSONException {
List<String> traits = atlasClient.listTraits(guid);
Assert.assertEquals(traits.get(0), traitName);
}
@Test
public void testAlterTableAddColumn() throws Exception {
String tableName = createTable();
String column = columnName();
String query = "alter table " + tableName + " add columns (" + column + " string)";
runCommand(query);
assertColumnIsRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
column));
//Verify the number of columns present in the table
final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 3);
}
//ATLAS-1321: Disable problematic tests. Need to revisit and fix them later
@Test(enabled = false)
public void testAlterTableDropColumn() throws Exception {
String tableName = createTable();
final String colDropped = "id";
String query = "alter table " + tableName + " replace columns (name string)";
runCommand(query);
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
colDropped));
//Verify the number of columns present in the table
final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
assertEquals(columns.size(), 1);
assertEquals(columns.get(0).get(NAME), "name");
}
@Test
public void testAlterTableChangeColumn() throws Exception {
//Change name
String oldColName = NAME;
String newColName = "name1";
String tableName = createTable();
String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
runCommandWithDelay(query, 1000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName));
//Verify the number of columns present in the table
List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
//Change column type
oldColName = "name1";
newColName = "name2";
final String newColType = "int";
query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
assertEquals(entity.get("type"), "int");
}
});
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
//Change name and add comment
oldColName = "name2";
newColName = "name3";
final String comment = "added comment";
query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName,
newColName, newColType, comment);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
assertEquals(entity.get(HiveMetaStoreBridge.COMMENT), comment);
}
});
//Change column position
oldColName = "name3";
newColName = "name4";
query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName,
newColType);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName);
final String finalNewColName = newColName;
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
List<Referenceable> columns = (List<Referenceable>) entity.get(HiveMetaStoreBridge.COLUMNS);
assertEquals(columns.get(0).get(NAME), finalNewColName);
assertEquals(columns.get(1).get(NAME), "id");
}
}
);
//Change col position again
oldColName = "name4";
newColName = "name5";
query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName);
//Check col position
final String finalNewColName2 = newColName;
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
List<Referenceable> columns = (List<Referenceable>) entity.get(HiveMetaStoreBridge.COLUMNS);
assertEquals(columns.get(1).get(NAME), finalNewColName2);
assertEquals(columns.get(0).get(NAME), "id");
}
}
);
}
/*
The test is disabled by default
Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column level lineage is not
committed in Hive version 1.2.x
This test will fail if the lineage information is not available from Hive
Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled
Please track HIVE-14706 to know the status of column lineage availability in latest Hive versions i.e 2.1.x
*/
@Test(enabled = false)
public void testColumnLevelLineage() throws Exception {
String sourceTable = "table" + random();
runCommand("create table " + sourceTable + "(a int, b int)");
String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable);
String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a"));
String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b"));
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as " +
"select sum(a+b) as a, count(*) as b from " + sourceTable;
runCommand(query);
String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "a"));
String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "b"));
final Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
assertProcessIsRegistered(event);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String processQName = sortEventsAndGetProcessQualifiedName(event);
List<String> aLineageInputs = Arrays.asList(a_guid, b_guid);
String aLineageProcessName = processQName + ":" + "a";
LOG.debug("Searching for column lineage process {} ", aLineageProcessName);
String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, aLineageProcessName, null);
List<Id> processInputs = (List<Id>) atlasClient.getEntity(guid).get("inputs");
List<String> processInputsAsString = new ArrayList<>();
for(Id input: processInputs){
processInputsAsString.add(input._getId());
}
Collections.sort(processInputsAsString);
Collections.sort(aLineageInputs);
Assert.assertEquals(processInputsAsString, aLineageInputs);
List<String> bLineageInputs = Arrays.asList(sourceTableGUID);
String bLineageProcessName = processQName + ":" + "b";
LOG.debug("Searching for column lineage process {} ", bLineageProcessName);
String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, bLineageProcessName, null);
List<Id> bProcessInputs = (List<Id>) atlasClient.getEntity(guid1).get("inputs");
List<String> bProcessInputsAsString = new ArrayList<>();
for(Id input: bProcessInputs){
bProcessInputsAsString.add(input._getId());
}
Collections.sort(bProcessInputsAsString);
Collections.sort(bLineageInputs);
Assert.assertEquals(bProcessInputsAsString, bLineageInputs);
//Test lineage API response
JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
JSONObject dest_a_val = (JSONObject) vertices.get(dest_a_guid);
JSONObject src_a_val = (JSONObject) vertices.get(a_guid);
JSONObject src_b_val = (JSONObject) vertices.get(b_guid);
Assert.assertNotNull(dest_a_val);
Assert.assertNotNull(src_a_val);
Assert.assertNotNull(src_b_val);
JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid);
JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices");
JSONObject b_val = (JSONObject) b_vertices.get(dest_b_guid);
JSONObject src_tbl_val = (JSONObject) b_vertices.get(sourceTableGUID);
Assert.assertNotNull(b_val);
Assert.assertNotNull(src_tbl_val);
}
@Test
public void testTruncateTable() throws Exception {
String tableName = createTable(false);
String query = String.format("truncate table %s", tableName);
runCommand(query);
Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateProcess(constructEvent(query, HiveOperation.TRUNCATETABLE, null, outputs));
//Check lineage
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
JSONObject response = atlasClient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
//Below should be assertTrue - Fix https://issues.apache.org/jira/browse/ATLAS-653
Assert.assertFalse(vertices.has(tableId));
}
@Test
public void testAlterTablePartitionColumnType() throws Exception {
String tableName = createTable(true, true, false);
final String newType = "int";
String query = String.format("ALTER TABLE %s PARTITION COLUMN (dt %s)", tableName, newType);
runCommand(query);
String colQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt");
final String dtColId = assertColumnIsRegistered(colQualifiedName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable column) throws Exception {
Assert.assertEquals(column.get("type"), newType);
}
});
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable table) throws Exception {
final List<Referenceable> partitionKeys = (List<Referenceable>) table.get("partitionKeys");
Assert.assertEquals(partitionKeys.size(), 1);
Assert.assertEquals(partitionKeys.get(0).getId()._getId(), dtColId);
}
});
}
@Test
public void testAlterTableWithoutHookConf() throws Exception {
String tableName = tableName();
String createCommand = "create table " + tableName + " (id int, name string)";
driverWithoutContext.run(createCommand);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
String command = "alter table " + tableName + " change id id_new string";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new"));
}
@Test
public void testTraitsPreservedOnColumnRename() throws Exception {
String dbName = createDatabase();
String tableName = tableName();
String createQuery = String.format("create table %s.%s (id int, name string)", dbName, tableName);
runCommand(createQuery);
String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName);
String guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id"));
String trait = createTrait(guid);
String oldColName = "id";
String newColName = "id_new";
String query = String.format("alter table %s.%s change %s %s string", dbName, tableName, oldColName, newColName);
runCommand(query);
String guid2 = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new"));
assertEquals(guid2, guid);
assertTrue(atlasClient.getEntity(guid2).getTraits().contains(trait));
}
@Test
public void testAlterViewRename() throws Exception {
String tableName = createTable();
String viewName = tableName();
String newName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
query = "alter view " + viewName + " rename to " + newName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
@Test
public void testAlterTableLocation() throws Exception {
//Its an external table, so the HDFS location should also be registered as an entity
String tableName = createTable(true, true, false);
final String testPath = createTestDFSPath("testBaseDir");
String query = "alter table " + tableName + " set location '" + testPath + "'";
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable tableRef) throws Exception {
Referenceable sdRef = (Referenceable) tableRef.get(HiveMetaStoreBridge.STORAGE_DESC);
Assert.assertEquals(new Path((String)sdRef.get(HiveMetaStoreBridge.LOCATION)).toString(), new Path(testPath).toString());
}
});
String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName, null);
Referenceable processReference = atlasClient.getEntity(processId);
validateHDFSPaths(processReference, INPUTS, testPath);
}
@Test
public void testAlterTableFileFormat() throws Exception {
String tableName = createTable();
final String testFormat = "orc";
String query = "alter table " + tableName + " set FILEFORMAT " + testFormat;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable tableRef) throws Exception {
Referenceable sdRef = (Referenceable) tableRef.get(HiveMetaStoreBridge.STORAGE_DESC);
Assert.assertEquals(sdRef.get(HiveMetaStoreBridge.STORAGE_DESC_INPUT_FMT),
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
Assert.assertEquals(sdRef.get(HiveMetaStoreBridge.STORAGE_DESC_OUTPUT_FMT),
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
Assert.assertNotNull(sdRef.get("serdeInfo"));
Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
Assert.assertEquals(serdeInfo.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde");
Assert.assertNotNull(serdeInfo.get(HiveMetaStoreBridge.PARAMETERS));
Assert.assertEquals(
((Map<String, String>) serdeInfo.get(HiveMetaStoreBridge.PARAMETERS))
.get("serialization.format"),
"1");
}
});
/**
* Hive 'alter table stored as' is not supported - See https://issues.apache.org/jira/browse/HIVE-9576
* query = "alter table " + tableName + " STORED AS " + testFormat.toUpperCase();
* runCommand(query);
* tableRef = atlasClientV1.getEntity(tableId);
* sdRef = (Referenceable)tableRef.get(HiveMetaStoreBridge.STORAGE_DESC);
* Assert.assertEquals(sdRef.get(HiveMetaStoreBridge.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
* Assert.assertEquals(sdRef.get(HiveMetaStoreBridge.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
* Assert.assertEquals(((Map) sdRef.get(HiveMetaStoreBridge.PARAMETERS)).get("orc.compress"), "ZLIB");
*/
}
@Test
public void testAlterTableBucketingClusterSort() throws Exception {
String tableName = createTable();
ImmutableList<String> cols = ImmutableList.of("id");
runBucketSortQuery(tableName, 5, cols, cols);
cols = ImmutableList.of("id", NAME);
runBucketSortQuery(tableName, 2, cols, cols);
}
private void runBucketSortQuery(String tableName, final int numBuckets, final ImmutableList<String> bucketCols,
final ImmutableList<String> sortCols) throws Exception {
final String fmtQuery = "alter table %s CLUSTERED BY (%s) SORTED BY (%s) INTO %s BUCKETS";
String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()),
stripListBrackets(sortCols.toString()), numBuckets);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
verifyBucketSortingProperties(entity, numBuckets, bucketCols, sortCols);
}
});
}
private String stripListBrackets(String listElements) {
return StringUtils.strip(StringUtils.strip(listElements, "["), "]");
}
private void verifyBucketSortingProperties(Referenceable tableRef, int numBuckets,
ImmutableList<String> bucketColNames,
ImmutableList<String> sortcolNames) throws Exception {
Referenceable sdRef = (Referenceable) tableRef.get(HiveMetaStoreBridge.STORAGE_DESC);
Assert.assertEquals(((scala.math.BigInt) sdRef.get(HiveMetaStoreBridge.STORAGE_NUM_BUCKETS)).intValue(),
numBuckets);
Assert.assertEquals(sdRef.get("bucketCols"), bucketColNames);
List<Struct> hiveOrderStructList = (List<Struct>) sdRef.get("sortCols");
Assert.assertNotNull(hiveOrderStructList);
Assert.assertEquals(hiveOrderStructList.size(), sortcolNames.size());
for (int i = 0; i < sortcolNames.size(); i++) {
Assert.assertEquals(hiveOrderStructList.get(i).get("col"), sortcolNames.get(i));
Assert.assertEquals(((scala.math.BigInt) hiveOrderStructList.get(i).get("order")).intValue(), 1);
}
}
@Test
public void testAlterTableSerde() throws Exception {
//SERDE PROPERTIES
String tableName = createTable();
Map<String, String> expectedProps = new HashMap<String, String>() {{
put("key1", "value1");
}};
runSerdePropsQuery(tableName, expectedProps);
expectedProps.put("key2", "value2");
//Add another property
runSerdePropsQuery(tableName, expectedProps);
}
@Test
public void testDropTable() throws Exception {
//Test Deletion of tables and its corrresponding columns
String tableName = createTable(true, true, false);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id"));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
final String query = String.format("drop table %s ", tableName);
runCommandWithDelay(query, 1000);
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
"id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
NAME));
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
private WriteEntity getPartitionOutput() {
WriteEntity partEntity = new WriteEntity();
partEntity.setName(PART_FILE);
partEntity.setTyp(Entity.Type.PARTITION);
return partEntity;
}
private ReadEntity getPartitionInput() {
ReadEntity partEntity = new ReadEntity();
partEntity.setName(PART_FILE);
partEntity.setTyp(Entity.Type.PARTITION);
return partEntity;
}
@Test
public void testDropDatabaseWithCascade() throws Exception {
//Test Deletion of database and its corresponding tables
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1')");
final int numTables = 10;
String[] tableNames = new String[numTables];
for(int i = 0; i < numTables; i++) {
tableNames[i] = createTable(true, true, false);
}
final String query = String.format("drop database %s cascade", dbName);
runCommand(query);
//Verify columns are not registered for one of the tables
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]),
NAME));
for(int i = 0; i < numTables; i++) {
assertTableIsNotRegistered(dbName, tableNames[i]);
}
assertDBIsNotRegistered(dbName);
}
@Test
public void testDropDatabaseWithoutCascade() throws Exception {
//Test Deletion of database and its corresponding tables
String dbName = "db" + random();
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1')");
final int numTables = 10;
String[] tableNames = new String[numTables];
for(int i = 0; i < numTables; i++) {
tableNames[i] = createTable(true, true, false);
String query = String.format("drop table %s", tableNames[i]);
runCommand(query);
assertTableIsNotRegistered(dbName, tableNames[i]);
}
final String query = String.format("drop database %s", dbName);
runCommand(query);
assertDBIsNotRegistered(dbName);
}
@Test
public void testDropNonExistingDB() throws Exception {
//Test Deletion of a non existing DB
final String dbName = "nonexistingdb";
assertDBIsNotRegistered(dbName);
final String query = String.format("drop database if exists %s cascade", dbName);
runCommand(query);
//Should have no effect
assertDBIsNotRegistered(dbName);
}
@Test
public void testDropNonExistingTable() throws Exception {
//Test Deletion of a non existing table
final String tableName = "nonexistingtable";
assertTableIsNotRegistered(DEFAULT_DB, tableName);
final String query = String.format("drop table if exists %s", tableName);
runCommand(query);
//Should have no effect
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
@Test
public void testDropView() throws Exception {
//Test Deletion of tables and its corrresponding columns
String tableName = createTable(true, true, false);
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, viewName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME));
query = String.format("drop view %s ", viewName);
runCommandWithDelay(query, 1000);
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName),
"id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName),
NAME));
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
private void runSerdePropsQuery(String tableName, Map<String, String> expectedProps) throws Exception {
final String serdeLib = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
final String serializedProps = getSerializedProps(expectedProps);
String query = String.format("alter table %s set SERDE '%s' WITH SERDEPROPERTIES (%s)", tableName, serdeLib, serializedProps);
runCommand(query);
verifyTableSdProperties(tableName, serdeLib, expectedProps);
}
private String getSerializedProps(Map<String, String> expectedProps) {
StringBuilder sb = new StringBuilder();
for(String expectedPropKey : expectedProps.keySet()) {
if(sb.length() > 0) {
sb.append(",");
}
sb.append("'").append(expectedPropKey).append("'");
sb.append("=");
sb.append("'").append(expectedProps.get(expectedPropKey)).append("'");
}
return sb.toString();
}
@Test
public void testAlterDBOwner() throws Exception {
String dbName = createDatabase();
assertDatabaseIsRegistered(dbName);
final String owner = "testOwner";
final String fmtQuery = "alter database %s set OWNER %s %s";
String query = String.format(fmtQuery, dbName, "USER", owner);
runCommandWithDelay(query, 1000);
assertDatabaseIsRegistered(dbName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) {
assertEquals(entity.get(AtlasClient.OWNER), owner);
}
});
}
@Test
public void testAlterDBProperties() throws Exception {
String dbName = createDatabase();
final String fmtQuery = "alter database %s %s DBPROPERTIES (%s)";
testAlterProperties(Entity.Type.DATABASE, dbName, fmtQuery);
}
@Test
public void testAlterTableProperties() throws Exception {
String tableName = createTable();
final String fmtQuery = "alter table %s %s TBLPROPERTIES (%s)";
testAlterProperties(Entity.Type.TABLE, tableName, fmtQuery);
}
private void testAlterProperties(Entity.Type entityType, String entityName, String fmtQuery) throws Exception {
final String SET_OP = "set";
final String UNSET_OP = "unset";
final Map<String, String> expectedProps = new HashMap<String, String>() {{
put("testPropKey1", "testPropValue1");
put("comment", "test comment");
}};
String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, false);
expectedProps.put("testPropKey2", "testPropValue2");
//Add another property
query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, false);
if (entityType != Entity.Type.DATABASE) {
//Database unset properties doesnt work - alter database %s unset DBPROPERTIES doesnt work
//Unset all the props
StringBuilder sb = new StringBuilder("'");
query = String.format(fmtQuery, entityName, UNSET_OP, Joiner.on("','").skipNulls().appendTo(sb, expectedProps.keySet()).append('\''));
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, true);
}
}
@Test
public void testAlterViewProperties() throws Exception {
String tableName = createTable();
String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
final String fmtQuery = "alter view %s %s TBLPROPERTIES (%s)";
testAlterProperties(Entity.Type.TABLE, viewName, fmtQuery);
}
private void verifyEntityProperties(Entity.Type type, String entityName, final Map<String, String> expectedProps,
final boolean checkIfNotExists) throws Exception {
switch(type) {
case TABLE:
assertTableIsRegistered(DEFAULT_DB, entityName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
verifyProperties(entity, expectedProps, checkIfNotExists);
}
});
break;
case DATABASE:
assertDatabaseIsRegistered(entityName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) throws Exception {
verifyProperties(entity, expectedProps, checkIfNotExists);
}
});
break;
}
}
private void verifyTableSdProperties(String tableName, final String serdeLib, final Map<String, String> expectedProps) throws Exception {
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable tableRef) throws Exception {
Referenceable sdRef = (Referenceable) tableRef.get(HiveMetaStoreBridge.STORAGE_DESC);
Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
Assert.assertEquals(serdeInfo.get("serializationLib"), serdeLib);
verifyProperties(serdeInfo, expectedProps, false);
}
});
}
private void verifyProperties(Struct referenceable, Map<String, String> expectedProps, boolean checkIfNotExists) {
Map<String, String> parameters = (Map<String, String>) referenceable.get(HiveMetaStoreBridge.PARAMETERS);
if (!checkIfNotExists) {
//Check if properties exist
Assert.assertNotNull(parameters);
for (String propKey : expectedProps.keySet()) {
Assert.assertEquals(parameters.get(propKey), expectedProps.get(propKey));
}
} else {
//Check if properties dont exist
if (expectedProps != null && parameters != null) {
for (String propKey : expectedProps.keySet()) {
Assert.assertFalse(parameters.containsKey(propKey));
}
}
}
}
private String sortEventsAndGetProcessQualifiedName(final HiveHook.HiveEventContext event) throws HiveException{
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
return getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
}
private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception {
try {
String processQFName = sortEventsAndGetProcessQualifiedName(event);
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
}
});
} catch (Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
private String assertProcessIsRegistered(final HiveHook.HiveEventContext event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
}
});
} catch(Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
private String getDSTypeName(Entity entity) {
return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : HiveMetaStoreBridge.HDFS_PATH;
}
private <T extends Entity> SortedMap<T, Referenceable> getSortedProcessDataSets(Set<T> inputTbls) {
SortedMap<T, Referenceable> inputs = new TreeMap<>(entityComparator);
if (inputTbls != null) {
for (final T tbl : inputTbls) {
Referenceable inputTableRef = new Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tbl.getName());
}});
inputs.put(tbl, inputTableRef);
}
}
return inputs;
}
private void assertProcessIsNotRegistered(HiveHook.HiveEventContext event) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
} catch( Exception e) {
LOG.error("Exception : ", e);
}
}
private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporaryTable);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
}
private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, false);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
}
private void assertDBIsNotRegistered(String dbName) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName);
}
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception {
return assertTableIsRegistered(dbName, tableName, assertPredicate, false);
}
private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
try {
atlasClient.getEntity(typeName, property, value);
} catch (AtlasServiceException e) {
if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
return;
}
}
fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, "
+ "attributeValue = %s", typeName, property, value));
}
});
}
@Test
public void testLineage() throws Exception {
String table1 = createTable(false);
String db2 = createDatabase();
String table2 = tableName();
String query = String.format("create table %s.%s as select * from %s", db2, table2, table1);
runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1);
String table2Id = assertTableIsRegistered(db2, table2);
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2);
JSONObject response = atlasClient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
assertTrue(vertices.has(table1Id));
assertTrue(vertices.has(table2Id));
datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1);
response = atlasClient.getOutputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices");
assertTrue(vertices.has(table1Id));
assertTrue(vertices.has(table2Id));
}
//For ATLAS-448
@Test
public void testNoopOperation() throws Exception {
runCommand("show compactions");
runCommand("show transactions");
}
}