| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.atlas.hive.hook; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import com.google.common.base.Joiner; |
| import com.sun.jersey.api.client.ClientResponse; |
| import org.apache.atlas.AtlasClient; |
| import org.apache.atlas.AtlasServiceException; |
| import org.apache.atlas.hive.HiveITBase; |
| import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; |
| import org.apache.atlas.hive.hook.events.BaseHiveEvent; |
| import org.apache.atlas.hive.model.HiveDataTypes; |
| import org.apache.atlas.model.instance.*; |
| import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; |
| import org.apache.atlas.model.lineage.AtlasLineageInfo; |
| import org.apache.atlas.model.typedef.AtlasClassificationDef; |
| import org.apache.atlas.model.typedef.AtlasEntityDef; |
| import org.apache.atlas.model.typedef.AtlasTypesDef; |
| import org.apache.atlas.type.AtlasTypeUtil; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.TableType; |
| import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; |
| import org.apache.hadoop.hive.ql.Driver; |
| import org.apache.hadoop.hive.ql.hooks.Entity; |
| import org.apache.hadoop.hive.ql.hooks.ReadEntity; |
| import org.apache.hadoop.hive.ql.hooks.WriteEntity; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.metadata.Table; |
| import org.apache.hadoop.hive.ql.plan.HiveOperation; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import java.text.ParseException; |
| import java.util.*; |
| |
| import static org.apache.atlas.AtlasClient.NAME; |
| import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*; |
| import static org.testng.Assert.*; |
| import static org.testng.AssertJUnit.assertEquals; |
| |
| public class HiveHookIT extends HiveITBase { |
| private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class); |
| |
| private static final String PART_FILE = "2015-01-01"; |
| |
| private Driver driverWithNoHook; |
| |
| @BeforeClass |
| public void setUp() throws Exception { |
| // initialize 'driverWithNoHook' with HiveServer2 hook and HiveMetastore hook disabled |
| HiveConf conf = new HiveConf(); |
| conf.set("hive.exec.post.hooks", ""); |
| conf.set("hive.metastore.event.listeners", ""); |
| |
| SessionState ss = new SessionState(conf); |
| ss = SessionState.start(ss); |
| SessionState.setCurrentSessionState(ss); |
| |
| // Initialize 'driverWithNoHook' with HS2 hook disabled and HMS hook disabled. |
| driverWithNoHook = new Driver(conf); |
| |
| super.setUp(); |
| } |
| |
| @Test |
| public void testCreateDatabase() throws Exception { |
| String dbName = "db" + random(); |
| |
| runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')"); |
| |
| String dbId = assertDatabaseIsRegistered(dbName); |
| AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); |
| Map params = (Map) dbEntity.getAttribute(ATTRIBUTE_PARAMETERS); |
| List ddlQueries = (List) dbEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(),1); |
| |
| Assert.assertNotNull(params); |
| Assert.assertEquals(params.size(), 2); |
| Assert.assertEquals(params.get("p1"), "v1"); |
| |
| //There should be just one entity per dbname |
| runCommandWithDelay("drop database " + dbName, 3000); |
| assertDatabaseIsNotRegistered(dbName); |
| |
| runCommandWithDelay("create database " + dbName, 3000); |
| dbId = assertDatabaseIsRegistered(dbName); |
| |
| //assert on qualified name |
| dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); |
| |
| Assert.assertEquals(dbEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) , dbName.toLowerCase() + "@" + CLUSTER_NAME); |
| } |
| |
| @Test |
| public void testPathEntityDefAvailable() throws Exception { |
| //Check if Path entity definition created or not |
| AtlasEntityDef pathEntityDef = atlasClientV2.getEntityDefByName("Path"); |
| assertNotNull(pathEntityDef); |
| } |
| |
| @Test |
| public void testCreateDatabaseWithLocation() throws Exception { |
| String dbName = dbName(); |
| String query = "CREATE DATABASE " + dbName; |
| |
| runCommand(query); |
| String dbId = assertDatabaseIsRegistered(dbName); |
| |
| //HDFS Location |
| String hdfsLocation = "hdfs://localhost:8020/warehouse/tablespace/external/hive/reports.db"; |
| alterDatabaseLocation(dbName, hdfsLocation); |
| assertDatabaseLocationRelationship(dbId); |
| |
| //AWS location |
| String s3Location = "s3://localhost:8020/warehouse/tablespace/external/hive/reports.db"; |
| alterDatabaseLocation(dbName, s3Location); |
| assertDatabaseLocationRelationship(dbId); |
| |
| //ABFS location |
| String abfsLocation = "abfs://localhost:8020/warehouse/tablespace/external/hive/reports.db"; |
| alterDatabaseLocation(dbName, abfsLocation); |
| assertDatabaseLocationRelationship(dbId); |
| } |
| |
| //alter database location |
| public void alterDatabaseLocation(String dbName, String location) throws Exception { |
| int timeDelay = 5000; |
| String query = String.format("ALTER DATABASE %s SET LOCATION \"%s\"", dbName, location); |
| runCommandWithDelay(query, timeDelay); |
| } |
| |
| public void assertDatabaseLocationRelationship(String dbId) throws Exception { |
| AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); |
| AtlasEntityDef pathEntityDef = atlasClientV2.getEntityDefByName("Path"); |
| |
| //Check if dbEntity has location attribute |
| assertTrue(dbEntity.hasAttribute(ATTRIBUTE_LOCATION)); |
| //Check if dbEntity has value for location attribute |
| assertNotNull(dbEntity.getAttribute(ATTRIBUTE_LOCATION)); |
| //Check if dbEntity has location relationship attribute |
| assertEquals(((List) dbEntity.getRelationshipAttribute(ATTRIBUTE_LOCATION_PATH)).size(), 1); |
| AtlasObjectId locationEntityObject = toAtlasObjectId(dbEntity.getRelationshipAttribute(ATTRIBUTE_LOCATION_PATH)); |
| //Check if location relationship attribute is subtype of "Path" |
| assertTrue(pathEntityDef.getSubTypes().contains(locationEntityObject.getTypeName())); |
| } |
| |
| @Test |
| public void testCreateTable() throws Exception { |
| String tableName = tableName(); |
| String dbName = createDatabase(); |
| String colName = columnName(); |
| |
| runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)"); |
| |
| String tableId = assertTableIsRegistered(dbName, tableName); |
| String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); //there is only one instance of column registered |
| AtlasEntity colEntity = atlasClientV2.getEntityByGuid(colId).getEntity(); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tableId).getEntity(); |
| |
| Assert.assertEquals(colEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), String.format("%s.%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME)); |
| Assert.assertNotNull(colEntity.getAttribute(ATTRIBUTE_TABLE)); |
| |
| Assert.assertNotNull(tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)); |
| Assert.assertEquals(((List)tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 1); |
| |
| AtlasObjectId tblObjId = toAtlasObjectId(colEntity.getAttribute(ATTRIBUTE_TABLE)); |
| |
| Assert.assertEquals(tblObjId.getGuid(), tableId); |
| |
| //assert that column.owner = table.owner |
| AtlasEntity tblEntity1 = atlasClientV2.getEntityByGuid(tableId).getEntity(); |
| AtlasEntity colEntity1 = atlasClientV2.getEntityByGuid(colId).getEntity(); |
| |
| assertEquals(tblEntity1.getAttribute(ATTRIBUTE_OWNER), colEntity1.getAttribute(ATTRIBUTE_OWNER)); |
| |
| //create table where db is not registered |
| tableName = createTable(); |
| tableId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tableId).getEntity(); |
| |
| Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_TABLE_TYPE), TableType.MANAGED_TABLE.name()); |
| Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_COMMENT), "table comment"); |
| |
| String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(tblEntity2.getAttribute(AtlasClient.NAME), tableName.toLowerCase()); |
| Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entityName); |
| |
| Table t = hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, tableName); |
| long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * MILLIS_CONVERT_FACTOR; |
| |
| verifyTimestamps(tblEntity2, ATTRIBUTE_CREATE_TIME, createTime); |
| verifyTimestamps(tblEntity2, ATTRIBUTE_LAST_ACCESS_TIME, createTime); |
| |
| final AtlasObjectId sdEntity = toAtlasObjectId(tblEntity2.getAttribute(ATTRIBUTE_STORAGEDESC)); |
| |
| Assert.assertNotNull(sdEntity); |
| |
| // Assert.assertEquals(((Id) sdRef.getAttribute(HiveMetaStoreBridge.TABLE))._getId(), tableId); |
| |
| //Create table where database doesn't exist, will create database instance as well |
| assertDatabaseIsRegistered(DEFAULT_DB); |
| } |
| |
| |
| private void verifyTimestamps(AtlasEntity ref, String property, long expectedTime) throws ParseException { |
| //Verify timestamps. |
| Object createTime = ref.getAttribute(property); |
| |
| Assert.assertNotNull(createTime); |
| |
| if (expectedTime > 0) { |
| Assert.assertEquals(expectedTime, createTime); |
| } |
| } |
| |
| private void verifyTimestamps(AtlasEntity ref, String property) throws ParseException { |
| verifyTimestamps(ref, property, 0); |
| } |
| |
| //ATLAS-1321: Disable problematic tests. Need to revisit and fix them later |
| @Test(enabled = false) |
| public void testCreateExternalTable() throws Exception { |
| String tableName = tableName(); |
| String colName = columnName(); |
| String pFile = createTestDFSPath("parentPath"); |
| String query = String.format("create EXTERNAL table %s.%s(%s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile); |
| |
| runCommand(query); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, null, true); |
| AtlasEntity tblEnity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlList = (List) tblEnity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| assertNotNull(ddlList); |
| assertEquals(ddlList.size(), 1); |
| |
| String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, getTableProcessQualifiedName(DEFAULT_DB, tableName), null); |
| |
| AtlasEntity processsEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); |
| |
| assertEquals(processsEntity.getAttribute("userName"), UserGroupInformation.getCurrentUser().getShortUserName()); |
| |
| verifyTimestamps(processsEntity, "startTime"); |
| verifyTimestamps(processsEntity, "endTime"); |
| |
| validateHDFSPaths(processsEntity, INPUTS, pFile); |
| } |
| |
| private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) throws HiveException { |
| final ReadEntity entity; |
| |
| if (Entity.Type.DFS_DIR.equals(entityType)) { |
| entity = new TestReadEntity(lower(new Path(inputName).toString()), entityType); |
| } else { |
| entity = new TestReadEntity(getQualifiedTblName(inputName), entityType); |
| } |
| |
| if (entityType == Entity.Type.TABLE) { |
| entity.setT(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, inputName)); |
| } |
| |
| return new LinkedHashSet<ReadEntity>() {{ add(entity); }}; |
| } |
| |
| private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) throws HiveException { |
| final WriteEntity entity; |
| |
| if (Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) { |
| entity = new TestWriteEntity(lower(new Path(inputName).toString()), entityType); |
| } else { |
| entity = new TestWriteEntity(getQualifiedTblName(inputName), entityType); |
| } |
| |
| if (entityType == Entity.Type.TABLE) { |
| entity.setT(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, inputName)); |
| } |
| |
| return new LinkedHashSet<WriteEntity>() {{ add(entity); }}; |
| } |
| |
| private void validateOutputTables(AtlasEntity processEntity, Set<WriteEntity> expectedTables) throws Exception { |
| validateTables(toAtlasObjectIdList(processEntity.getAttribute(ATTRIBUTE_OUTPUTS)), expectedTables); |
| } |
| |
| private void validateInputTables(AtlasEntity processEntity, Set<ReadEntity> expectedTables) throws Exception { |
| validateTables(toAtlasObjectIdList(processEntity.getAttribute(ATTRIBUTE_INPUTS)), expectedTables); |
| } |
| |
| private void validateTables(List<AtlasObjectId> tableIds, Set<? extends Entity> expectedTables) throws Exception { |
| if (tableIds == null) { |
| Assert.assertTrue(CollectionUtils.isEmpty(expectedTables)); |
| } else if (expectedTables == null) { |
| Assert.assertTrue(CollectionUtils.isEmpty(tableIds)); |
| } else { |
| Assert.assertEquals(tableIds.size(), expectedTables.size()); |
| |
| List<String> entityQualifiedNames = new ArrayList<>(tableIds.size()); |
| List<String> expectedTableNames = new ArrayList<>(expectedTables.size()); |
| |
| for (AtlasObjectId tableId : tableIds) { |
| AtlasEntity atlasEntity = atlasClientV2.getEntityByGuid(tableId.getGuid()).getEntity(); |
| |
| entityQualifiedNames.add((String) atlasEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); |
| } |
| |
| for (Iterator<? extends Entity> iterator = expectedTables.iterator(); iterator.hasNext(); ) { |
| Entity hiveEntity = iterator.next(); |
| |
| expectedTableNames.add(hiveEntity.getName()); |
| } |
| |
| for (String entityQualifiedName : entityQualifiedNames) { |
| boolean found = false; |
| |
| for (String expectedTableName : expectedTableNames) { |
| if (entityQualifiedName.startsWith(expectedTableName)) { |
| found = true; |
| |
| break; |
| } |
| } |
| |
| assertTrue(found, "Table name '" + entityQualifiedName + "' does not start with any name in the expected list " + expectedTableNames); |
| } |
| } |
| } |
| |
| private String assertColumnIsRegistered(String colName) throws Exception { |
| return assertColumnIsRegistered(colName, null); |
| } |
| |
| private String assertColumnIsRegistered(String colName, AssertPredicate assertPredicate) throws Exception { |
| LOG.debug("Searching for column {}", colName); |
| |
| return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), ATTRIBUTE_QUALIFIED_NAME, colName, assertPredicate); |
| } |
| |
| private String assertSDIsRegistered(String sdQFName, AssertPredicate assertPredicate) throws Exception { |
| LOG.debug("Searching for sd {}", sdQFName.toLowerCase()); |
| |
| return assertEntityIsRegistered(HiveDataTypes.HIVE_STORAGEDESC.getName(), ATTRIBUTE_QUALIFIED_NAME, sdQFName.toLowerCase(), assertPredicate); |
| } |
| |
| private void assertColumnIsNotRegistered(String colName) throws Exception { |
| LOG.debug("Searching for column {}", colName); |
| |
| assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), ATTRIBUTE_QUALIFIED_NAME, colName); |
| } |
| |
| @Test |
| public void testCTAS() throws Exception { |
| String tableName = createTable(); |
| String ctasTableName = "table" + random(); |
| String query = "create table " + ctasTableName + " as select * from " + tableName; |
| |
| runCommand(query); |
| |
| final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE); |
| final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, Entity.Type.TABLE); |
| |
| HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, |
| writeEntities); |
| AtlasEntity processEntity1 = validateProcess(hiveEventContext); |
| AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, hiveEventContext); |
| AtlasObjectId process = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity1.getGuid()); |
| |
| Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1); |
| assertTableIsRegistered(DEFAULT_DB, ctasTableName); |
| } |
| |
| private HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) { |
| HiveEventContext event = new HiveEventContext(); |
| |
| event.setQueryStr(query); |
| event.setOperation(op); |
| event.setInputs(inputs); |
| event.setOutputs(outputs); |
| |
| return event; |
| } |
| |
| @Test |
| public void testEmptyStringAsValue() throws Exception{ |
| String tableName = tableName(); |
| String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''"; |
| |
| runCommandWithDelay(command, 3000); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| } |
| |
| @Test |
| public void testDropAndRecreateCTASOutput() throws Exception { |
| String tableName = createTable(); |
| String ctasTableName = "table" + random(); |
| String query = "create table " + ctasTableName + " as select * from " + tableName; |
| |
| runCommand(query); |
| |
| assertTableIsRegistered(DEFAULT_DB, ctasTableName); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); |
| |
| HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); |
| AtlasEntity processEntity1 = validateProcess(hiveEventContext); |
| AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, hiveEventContext); |
| AtlasObjectId process = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity1.getGuid()); |
| |
| String dropQuery = String.format("drop table %s ", ctasTableName); |
| |
| runCommandWithDelay(dropQuery, 5000); |
| |
| assertTableIsNotRegistered(DEFAULT_DB, ctasTableName); |
| |
| runCommand(query); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, ctasTableName); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlList = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| assertNotNull(ddlList); |
| assertEquals(ddlList.size(), 1); |
| |
| outputs = getOutputs(ctasTableName, Entity.Type.TABLE); |
| |
| AtlasEntity processEntity2 = validateProcess(hiveEventContext); |
| AtlasEntity processExecutionEntity2 = validateProcessExecution(processEntity2, hiveEventContext); |
| AtlasObjectId process2 = toAtlasObjectId(processExecutionEntity2.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process2.getGuid(), processEntity2.getGuid()); |
| |
| assertNotEquals(processEntity1.getGuid(), processEntity2.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity2), 1); |
| |
| validateOutputTables(processEntity1, outputs); |
| } |
| |
| @Test |
| public void testCreateView() throws Exception { |
| String tableName = createTable(); |
| String viewName = tableName(); |
| String query = "create view " + viewName + " as select * from " + tableName; |
| |
| runCommand(query); |
| |
| HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, |
| Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)); |
| AtlasEntity processEntity1 = validateProcess(hiveEventContext); |
| AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, hiveEventContext); |
| AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process1.getGuid(), processEntity1.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1); |
| assertTableIsRegistered(DEFAULT_DB, viewName); |
| |
| String viewId = assertTableIsRegistered(DEFAULT_DB, viewName); |
| AtlasEntity viewEntity = atlasClientV2.getEntityByGuid(viewId).getEntity(); |
| List ddlQueries = (List) viewEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| } |
| |
| @Test |
| public void testAlterViewAsSelect() throws Exception { |
| //Create the view from table1 |
| String table1Name = createTable(); |
| String viewName = tableName(); |
| String query = "create view " + viewName + " as select * from " + table1Name; |
| |
| runCommand(query); |
| |
| String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name); |
| |
| HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, |
| Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)); |
| String processId1 = assertProcessIsRegistered(hiveEventContext); |
| AtlasEntity processEntity1 = atlasClientV2.getEntityByGuid(processId1).getEntity(); |
| AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, hiveEventContext); |
| AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process1.getGuid(), processEntity1.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1); |
| |
| String viewId = assertTableIsRegistered(DEFAULT_DB, viewName); |
| |
| //Check lineage which includes table1 |
| String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); |
| String tableId = assertTableIsRegistered(DEFAULT_DB, viewName); |
| AtlasLineageInfo inputLineageInfo = atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT, 0); |
| Map<String, AtlasEntityHeader> entityMap = inputLineageInfo.getGuidEntityMap(); |
| |
| assertTrue(entityMap.containsKey(viewId)); |
| assertTrue(entityMap.containsKey(table1Id)); |
| |
| //Alter the view from table2 |
| String table2Name = createTable(); |
| |
| query = "alter view " + viewName + " as select * from " + table2Name; |
| |
| runCommand(query); |
| |
| HiveEventContext hiveEventContext2 = constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, |
| Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)); |
| String processId2 = assertProcessIsRegistered(hiveEventContext2); |
| AtlasEntity processEntity2 = atlasClientV2.getEntityByGuid(processId2).getEntity(); |
| AtlasEntity processExecutionEntity2 = validateProcessExecution(processEntity2, hiveEventContext2); |
| AtlasObjectId process2 = toAtlasObjectId(processExecutionEntity2.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process2.getGuid(), processEntity2.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity2), 2); |
| Assert.assertEquals(processEntity1.getGuid(), processEntity2.getGuid()); |
| |
| String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name); |
| String viewId2 = assertTableIsRegistered(DEFAULT_DB, viewName); |
| |
| Assert.assertEquals(viewId2, viewId); |
| |
| AtlasEntity viewEntity = atlasClientV2.getEntityByGuid(viewId2).getEntity(); |
| List ddlQueries = (List) viewEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| |
| datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); |
| |
| String tableId1 = assertTableIsRegistered(DEFAULT_DB, viewName); |
| AtlasLineageInfo inputLineageInfo1 = atlasClientV2.getLineageInfo(tableId1, AtlasLineageInfo.LineageDirection.INPUT, 0); |
| Map<String, AtlasEntityHeader> entityMap1 = inputLineageInfo1.getGuidEntityMap(); |
| |
| assertTrue(entityMap1.containsKey(viewId)); |
| |
| //This is through the alter view process |
| assertTrue(entityMap1.containsKey(table2Id)); |
| |
| //This is through the Create view process |
| assertTrue(entityMap1.containsKey(table1Id)); |
| |
| //Outputs dont exist |
| AtlasLineageInfo outputLineageInfo = atlasClientV2.getLineageInfo(tableId1, AtlasLineageInfo.LineageDirection.OUTPUT, 0); |
| Map<String, AtlasEntityHeader> entityMap2 = outputLineageInfo.getGuidEntityMap(); |
| |
| assertEquals(entityMap2.size(),0); |
| } |
| |
| private String createTestDFSFile(String path) throws Exception { |
| return "pfile://" + file(path); |
| } |
| |
| @Test |
| public void testLoadLocalPath() throws Exception { |
| String tableName = createTable(false); |
| String loadFile = file("load"); |
| String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); |
| } |
| |
| @Test |
| public void testLoadLocalPathIntoPartition() throws Exception { |
| String tableName = createTable(true); |
| String loadFile = file("load"); |
| String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); |
| } |
| |
| @Test |
| public void testLoadDFSPathPartitioned() throws Exception { |
| String tableName = createTable(true, true, false); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| String loadFile = createTestDFSFile("loadDFSFile"); |
| String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; |
| |
| runCommand(query); |
| |
| Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); |
| Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR); |
| Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs); |
| |
| partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION)); |
| |
| AtlasEntity processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs); |
| |
| validateHDFSPaths(processReference, INPUTS, loadFile); |
| validateOutputTables(processReference, outputs); |
| |
| String loadFile2 = createTestDFSFile("loadDFSFile1"); |
| |
| query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR); |
| Set<ReadEntity> expectedInputs = new LinkedHashSet<>(); |
| |
| expectedInputs.addAll(process2Inputs); |
| expectedInputs.addAll(inputs); |
| |
| validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs); |
| } |
| |
| private String getQualifiedTblName(String inputTable) { |
| String inputtblQlfdName = inputTable; |
| |
| if (inputTable != null && !inputTable.contains("@")) { |
| inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable); |
| } |
| return inputtblQlfdName; |
| } |
| |
| private AtlasEntity validateProcess(HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception { |
| String processId = assertProcessIsRegistered(event, inputTables, outputTables); |
| AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); |
| |
| validateInputTables(processEntity, inputTables); |
| validateOutputTables(processEntity, outputTables); |
| |
| return processEntity; |
| } |
| |
| private AtlasEntity validateProcess(HiveEventContext event) throws Exception { |
| return validateProcess(event, event.getInputs(), event.getOutputs()); |
| } |
| |
| private AtlasEntity validateProcessExecution(AtlasEntity hiveProcess, HiveEventContext event) throws Exception { |
| String processExecutionId = assertProcessExecutionIsRegistered(hiveProcess, event); |
| AtlasEntity processExecutionEntity = atlasClientV2.getEntityByGuid(processExecutionId).getEntity(); |
| return processExecutionEntity; |
| } |
| |
| @Test |
| public void testInsertIntoTable() throws Exception { |
| String inputTable1Name = createTable(); |
| String inputTable2Name = createTable(); |
| String insertTableName = createTable(); |
| |
| assertTableIsRegistered(DEFAULT_DB, inputTable1Name); |
| assertTableIsRegistered(DEFAULT_DB, insertTableName); |
| |
| String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id"; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE); |
| |
| inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE)); |
| |
| Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); |
| |
| (outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT); |
| |
| HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs); |
| |
| Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{ |
| addAll(inputs); |
| }}; |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, insertTableName); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs); |
| |
| //Test sorting of tbl names |
| SortedSet<String> sortedTblNames = new TreeSet<>(); |
| |
| sortedTblNames.add(inputTable1Name.toLowerCase()); |
| sortedTblNames.add(inputTable2Name.toLowerCase()); |
| |
| //Verify sorted order of inputs in qualified name |
| Assert.assertEquals(processEntity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME), |
| Joiner.on(SEP).join("QUERY", |
| getQualifiedTblName(sortedTblNames.first()), |
| HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.first())), |
| getQualifiedTblName(sortedTblNames.last()), |
| HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.last()))) |
| + IO_SEP + SEP |
| + Joiner.on(SEP). |
| join(WriteEntity.WriteType.INSERT.name(), |
| getQualifiedTblName(insertTableName), |
| HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, insertTableName))) |
| ); |
| |
| //Rerun same query. Should result in same process |
| runCommandWithDelay(query, 3000); |
| |
| AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity2), 2); |
| Assert.assertEquals(processEntity1.getGuid(), processEntity2.getGuid()); |
| } |
| |
| @Test |
| public void testInsertIntoTableProcessExecution() throws Exception { |
| String inputTable1Name = createTable(); |
| String inputTable2Name = createTable(); |
| String insertTableName = createTable(); |
| |
| assertTableIsRegistered(DEFAULT_DB, inputTable1Name); |
| assertTableIsRegistered(DEFAULT_DB, insertTableName); |
| |
| String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id"; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE); |
| |
| inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE)); |
| |
| Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); |
| |
| (outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT); |
| |
| HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs); |
| |
| Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{ |
| addAll(inputs); |
| }}; |
| |
| assertTableIsRegistered(DEFAULT_DB, insertTableName); |
| |
| AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs); |
| AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, event); |
| AtlasObjectId process = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity1.getGuid()); |
| |
| //Test sorting of tbl names |
| SortedSet<String> sortedTblNames = new TreeSet<>(); |
| |
| sortedTblNames.add(inputTable1Name.toLowerCase()); |
| sortedTblNames.add(inputTable2Name.toLowerCase()); |
| |
| //Verify sorted order of inputs in qualified name |
| Assert.assertEquals(processEntity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME), |
| Joiner.on(SEP).join("QUERY", |
| getQualifiedTblName(sortedTblNames.first()), |
| HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.first())), |
| getQualifiedTblName(sortedTblNames.last()), |
| HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.last()))) |
| + IO_SEP + SEP |
| + Joiner.on(SEP). |
| join(WriteEntity.WriteType.INSERT.name(), |
| getQualifiedTblName(insertTableName), |
| HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, insertTableName))) |
| ); |
| |
| //Rerun same query. Should result in same process |
| runCommandWithDelay(query, 3000); |
| |
| AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs); |
| AtlasEntity processExecutionEntity2 = validateProcessExecution(processEntity2, event); |
| process = toAtlasObjectId(processExecutionEntity2.getRelationshipAttribute(BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity2.getGuid()); |
| Assert.assertEquals(processEntity1.getGuid(), processEntity2.getGuid()); |
| |
| String queryWithDifferentPredicate = "insert into " + insertTableName + " select t1.id, t1.name from " + |
| inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=100"; |
| runCommandWithDelay(queryWithDifferentPredicate, 1000); |
| |
| HiveEventContext event3 = constructEvent(queryWithDifferentPredicate, HiveOperation.QUERY, inputs, outputs); |
| AtlasEntity processEntity3 = validateProcess(event3, expectedInputs, outputs); |
| AtlasEntity processExecutionEntity3 = validateProcessExecution(processEntity3, event3); |
| process = toAtlasObjectId(processExecutionEntity3.getRelationshipAttribute(BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity3.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity3), 3); |
| Assert.assertEquals(processEntity2.getGuid(), processEntity3.getGuid()); |
| } |
| |
| @Test |
| public void testInsertIntoLocalDir() throws Exception { |
| String tableName = createTable(); |
| String randomLocalPath = mkdir("hiverandom.tmp"); |
| String query = "insert overwrite LOCAL DIRECTORY '" + randomLocalPath + "' select id, name from " + tableName; |
| |
| runCommand(query); |
| |
| HiveEventContext event = constructEvent(query, HiveOperation.QUERY, |
| getInputs(tableName, Entity.Type.TABLE), null); |
| AtlasEntity hiveProcess = validateProcess(event); |
| AtlasEntity hiveProcessExecution = validateProcessExecution(hiveProcess, event); |
| AtlasObjectId process = toAtlasObjectId(hiveProcessExecution.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), hiveProcess.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| } |
| |
| @Test |
| public void testUpdateProcess() throws Exception { |
| String tableName = createTable(); |
| String pFile1 = createTestDFSPath("somedfspath1"); |
| String query = "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); |
| |
| outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE); |
| |
| HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs); |
| AtlasEntity processEntity = validateProcess(hiveEventContext); |
| AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity, hiveEventContext); |
| AtlasObjectId process = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity.getGuid()); |
| |
| validateHDFSPaths(processEntity, OUTPUTS, pFile1); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| validateInputTables(processEntity, inputs); |
| |
| //Rerun same query with same HDFS path |
| runCommandWithDelay(query, 3000); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| AtlasEntity process2Entity = validateProcess(hiveEventContext); |
| AtlasEntity processExecutionEntity2 = validateProcessExecution(processEntity, hiveEventContext); |
| AtlasObjectId process2 = toAtlasObjectId(processExecutionEntity2.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process2.getGuid(), process2Entity.getGuid()); |
| |
| |
| validateHDFSPaths(process2Entity, OUTPUTS, pFile1); |
| |
| Assert.assertEquals(process2Entity.getGuid(), processEntity.getGuid()); |
| |
| //Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations |
| String pFile2 = createTestDFSPath("somedfspath2"); |
| |
| query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; |
| |
| runCommandWithDelay(query, 3000); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{ |
| addAll(getOutputs(pFile2, Entity.Type.DFS_DIR)); |
| addAll(outputs); |
| }}; |
| |
| AtlasEntity process3Entity = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs)); |
| AtlasEntity processExecutionEntity3 = validateProcessExecution(processEntity, hiveEventContext); |
| AtlasObjectId process3 = toAtlasObjectId(processExecutionEntity3.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process3.getGuid(), process3Entity.getGuid()); |
| validateHDFSPaths(process3Entity, OUTPUTS, pFile2); |
| |
| Assert.assertEquals(numberOfProcessExecutions(process3Entity), 3); |
| Assert.assertEquals(process3Entity.getGuid(), processEntity.getGuid()); |
| } |
| |
| @Test |
| public void testInsertIntoDFSDirPartitioned() throws Exception { |
| //Test with partitioned table |
| String tableName = createTable(true); |
| String pFile1 = createTestDFSPath("somedfspath1"); |
| String query = "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); |
| |
| outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE); |
| |
| Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs); |
| |
| partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION)); |
| |
| AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs); |
| |
| //Rerun same query with different HDFS path. Should not create another process and should update it. |
| |
| String pFile2 = createTestDFSPath("somedfspath2"); |
| query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; |
| |
| runCommand(query); |
| |
| Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR); |
| |
| pFile2Outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE); |
| |
| //Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition |
| Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{ |
| addAll(pFile2Outputs); |
| addAll(outputs); |
| }}; |
| |
| AtlasEntity process2Entity = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs); |
| |
| validateHDFSPaths(process2Entity, OUTPUTS, pFile2); |
| |
| Assert.assertEquals(process2Entity.getGuid(), processEntity.getGuid()); |
| } |
| |
| //Disabling test as temporary table is not captured by hiveHook(https://issues.apache.org/jira/browse/ATLAS-1274) |
| @Test(enabled = false) |
| public void testInsertIntoTempTable() throws Exception { |
| String tableName = createTable(); |
| String insertTableName = createTable(false, false, true); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| assertTableIsNotRegistered(DEFAULT_DB, insertTableName, true); |
| |
| String query = "insert into " + insertTableName + " select id, name from " + tableName; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); |
| |
| outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT); |
| |
| HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs); |
| AtlasEntity hiveProcess = validateProcess(event); |
| AtlasEntity hiveProcessExecution = validateProcessExecution(hiveProcess, event); |
| AtlasObjectId process = toAtlasObjectId(hiveProcessExecution.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), hiveProcess.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true); |
| } |
| |
| @Test |
| public void testInsertIntoPartition() throws Exception { |
| boolean isPartitionedTable = true; |
| String tableName = createTable(isPartitionedTable); |
| String insertTableName = createTable(isPartitionedTable); |
| String query = "insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName + " where dt = '"+ PART_FILE + "'"; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); |
| |
| outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT); |
| |
| Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() { |
| { |
| addAll(inputs); |
| add(getPartitionInput()); |
| } |
| }; |
| |
| Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() { |
| { |
| addAll(outputs); |
| add(getPartitionOutput()); |
| } |
| }; |
| |
| HiveEventContext event = constructEvent(query, HiveOperation.QUERY, partitionIps, partitionOps); |
| AtlasEntity hiveProcess = validateProcess(event, inputs, outputs); |
| AtlasEntity hiveProcessExecution = validateProcessExecution(hiveProcess, event); |
| AtlasObjectId process = toAtlasObjectId(hiveProcessExecution.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), hiveProcess.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1); |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, insertTableName); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| //TODO -Add update test case |
| } |
| |
| @Test |
| public void testExportImportUnPartitionedTable() throws Exception { |
| String tableName = createTable(false); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| String filename = "pfile://" + mkdir("exportUnPartitioned"); |
| String query = "export table " + tableName + " to \"" + filename + "\""; |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); |
| |
| HiveEventContext event = constructEvent(query, HiveOperation.EXPORT, inputs, outputs); |
| AtlasEntity processEntity = validateProcess(event); |
| AtlasEntity hiveProcessExecution = validateProcessExecution(processEntity, event); |
| AtlasObjectId process = toAtlasObjectId(hiveProcessExecution.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process.getGuid(), processEntity.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity), 1); |
| validateHDFSPaths(processEntity, OUTPUTS, filename); |
| validateInputTables(processEntity, inputs); |
| |
| //Import |
| String importTableName = createTable(false); |
| |
| String importTblId = assertTableIsRegistered(DEFAULT_DB, importTableName); |
| |
| query = "import table " + importTableName + " from '" + filename + "'"; |
| |
| runCommand(query); |
| |
| AtlasEntity importTblEntity = atlasClientV2.getEntityByGuid(importTblId).getEntity(); |
| List importTblddlQueries = (List) importTblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(importTblddlQueries); |
| Assert.assertEquals(importTblddlQueries.size(), 1); |
| |
| outputs = getOutputs(importTableName, Entity.Type.TABLE); |
| |
| HiveEventContext event2 = constructEvent(query, HiveOperation.IMPORT, |
| getInputs(filename, Entity.Type.DFS_DIR), outputs); |
| AtlasEntity processEntity2 = validateProcess(event2); |
| AtlasEntity hiveProcessExecution2 = validateProcessExecution(processEntity2, event2); |
| AtlasObjectId process2 = toAtlasObjectId(hiveProcessExecution2.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process2.getGuid(), processEntity2.getGuid()); |
| |
| Assert.assertEquals(numberOfProcessExecutions(processEntity2), 1); |
| Assert.assertNotEquals(processEntity.getGuid(), processEntity2.getGuid()); |
| |
| //Should create another process |
| filename = "pfile://" + mkdir("export2UnPartitioned"); |
| query = "export table " + tableName + " to \"" + filename + "\""; |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries2); |
| Assert.assertEquals(ddlQueries2.size(), 1); |
| |
| inputs = getInputs(tableName, Entity.Type.TABLE); |
| outputs = getOutputs(filename, Entity.Type.DFS_DIR); |
| |
| HiveEventContext event3 = constructEvent(query, HiveOperation.EXPORT, inputs, outputs); |
| AtlasEntity processEntity3 = validateProcess(event3); |
| AtlasEntity hiveProcessExecution3 = validateProcessExecution(processEntity3, event3); |
| AtlasObjectId process3 = toAtlasObjectId(hiveProcessExecution3.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process3.getGuid(), processEntity3.getGuid()); |
| |
| Assert.assertEquals(numberOfProcessExecutions(processEntity3), 1); |
| |
| // Should be a different process compared to the previous ones |
| Assert.assertNotEquals(processEntity.getGuid(), processEntity3.getGuid()); |
| Assert.assertNotEquals(processEntity2.getGuid(), processEntity3.getGuid()); |
| |
| //import again shouyld create another process |
| query = "import table " + importTableName + " from '" + filename + "'"; |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(importTblId).getEntity(); |
| List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries3); |
| Assert.assertEquals(ddlQueries3.size(), 1); |
| |
| outputs = getOutputs(importTableName, Entity.Type.TABLE); |
| |
| HiveEventContext event4 = constructEvent(query, HiveOperation.IMPORT, getInputs(filename, |
| Entity.Type.DFS_DIR), outputs); |
| AtlasEntity processEntity4 = validateProcess(event4); |
| AtlasEntity hiveProcessExecution4 = validateProcessExecution(processEntity4, event4); |
| AtlasObjectId process4 = toAtlasObjectId(hiveProcessExecution4.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process4.getGuid(), processEntity4.getGuid()); |
| |
| Assert.assertEquals(numberOfProcessExecutions(processEntity4), 1); |
| |
| // Should be a different process compared to the previous ones |
| Assert.assertNotEquals(processEntity.getGuid(), processEntity4.getGuid()); |
| Assert.assertNotEquals(processEntity2.getGuid(), processEntity4.getGuid()); |
| Assert.assertNotEquals(processEntity3.getGuid(), processEntity4.getGuid()); |
| } |
| |
| @Test |
| public void testExportImportPartitionedTable() throws Exception { |
| boolean isPartitionedTable = true; |
| String tableName = createTable(isPartitionedTable); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| //Add a partition |
| String partFile = "pfile://" + mkdir("partition"); |
| String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'"; |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 1); |
| |
| String filename = "pfile://" + mkdir("export"); |
| |
| query = "export table " + tableName + " to \"" + filename + "\""; |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries2); |
| Assert.assertEquals(ddlQueries2.size(), 1); |
| |
| Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); |
| Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case |
| |
| partitionIps.addAll(expectedExportInputs); |
| |
| HiveEventContext event1 = constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs); |
| AtlasEntity processEntity1 = validateProcess(event1, expectedExportInputs, outputs); |
| AtlasEntity hiveProcessExecution1 = validateProcessExecution(processEntity1, event1); |
| AtlasObjectId process1 = toAtlasObjectId(hiveProcessExecution1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process1.getGuid(), processEntity1.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1); |
| |
| validateHDFSPaths(processEntity1, OUTPUTS, filename); |
| |
| //Import |
| String importTableName = createTable(true); |
| |
| String tblId2 = assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| query = "import table " + importTableName + " from '" + filename + "'"; |
| |
| runCommand(query); |
| |
| AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(tblId2).getEntity(); |
| List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries3); |
| Assert.assertEquals(ddlQueries3.size(), 1); |
| |
| Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR); |
| Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE); |
| Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); |
| |
| partitionOps.addAll(importOutputs); |
| |
| HiveEventContext event2 = constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps); |
| AtlasEntity processEntity2 = validateProcess(event2, expectedImportInputs, importOutputs); |
| AtlasEntity hiveProcessExecution2 = validateProcessExecution(processEntity2, event2); |
| AtlasObjectId process2 = toAtlasObjectId(hiveProcessExecution2.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process2.getGuid(), processEntity2.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity2), 1); |
| Assert.assertNotEquals(processEntity1.getGuid(), processEntity2.getGuid()); |
| |
| //Export should update same process |
| filename = "pfile://" + mkdir("export2"); |
| query = "export table " + tableName + " to \"" + filename + "\""; |
| |
| runCommand(query); |
| |
| Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR); |
| Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{ |
| addAll(outputs2); |
| addAll(outputs); |
| }}; |
| |
| HiveEventContext event3 = constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2); |
| |
| // this process entity should return same as the processEntity1 since the inputs and outputs are the same, |
| // hence the qualifiedName will be the same |
| AtlasEntity processEntity3 = validateProcess(event3, expectedExportInputs, p3Outputs); |
| AtlasEntity hiveProcessExecution3 = validateProcessExecution(processEntity3, event3); |
| AtlasObjectId process3 = toAtlasObjectId(hiveProcessExecution3.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process3.getGuid(), processEntity3.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity3), 2); |
| Assert.assertEquals(processEntity1.getGuid(), processEntity3.getGuid()); |
| |
| query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')"; |
| |
| runCommand(query); |
| |
| //Import should update same process |
| query = "import table " + importTableName + " from '" + filename + "'"; |
| |
| runCommandWithDelay(query, 3000); |
| |
| Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR); |
| Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{ |
| addAll(importInputs); |
| addAll(expectedImportInputs); |
| }}; |
| |
| HiveEventContext event4 = constructEvent(query, HiveOperation.IMPORT, importInputs, partitionOps); |
| |
| // This process is going to be same as processEntity2 |
| AtlasEntity processEntity4 = validateProcess(event4, expectedImport2Inputs, importOutputs); |
| AtlasEntity hiveProcessExecution4 = validateProcessExecution(processEntity4, event4); |
| AtlasObjectId process4 = toAtlasObjectId(hiveProcessExecution4.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process4.getGuid(), processEntity4.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity4), 2); |
| Assert.assertEquals(processEntity2.getGuid(), processEntity4.getGuid()); |
| Assert.assertNotEquals(processEntity1.getGuid(), processEntity4.getGuid()); |
| } |
| |
| @Test |
| public void testIgnoreSelect() throws Exception { |
| String tableName = createTable(); |
| String query = "select * from " + tableName; |
| |
| runCommand(query); |
| |
| Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); |
| HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null); |
| |
| assertProcessIsNotRegistered(hiveEventContext); |
| |
| //check with uppercase table name |
| query = "SELECT * from " + tableName.toUpperCase(); |
| |
| runCommand(query); |
| |
| assertProcessIsNotRegistered(hiveEventContext); |
| } |
| |
| @Test |
| public void testAlterTableRenameAliasRegistered() throws Exception{ |
| String tableName = createTable(false); |
| String tableGuid = assertTableIsRegistered(DEFAULT_DB, tableName); |
| String newTableName = tableName(); |
| String query = String.format("alter table %s rename to %s", tableName, newTableName); |
| |
| runCommand(query); |
| |
| String newTableGuid = assertTableIsRegistered(DEFAULT_DB, newTableName); |
| |
| assertEquals(tableGuid, newTableGuid); |
| |
| AtlasEntity atlasEntity = atlasClientV2.getEntityByGuid(newTableGuid).getEntity(); |
| Map<String, Object> valueMap = atlasEntity.getAttributes(); |
| Iterable<String> aliasList = (Iterable<String>) valueMap.get("aliases"); |
| String aliasTableName = aliasList.iterator().next(); |
| |
| assert tableName.toLowerCase().equals(aliasTableName); |
| } |
| |
| @Test |
| public void testAlterTableRename() throws Exception { |
| String tableName = createTable(true); |
| String newDBName = createDatabase(); |
| String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| AtlasEntity tableEntity = atlasClientV2.getEntityByGuid(tableId).getEntity(); |
| String createTime = String.valueOf(tableEntity.getAttribute(ATTRIBUTE_CREATE_TIME)); |
| |
| Assert.assertNotNull(createTime); |
| |
| String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); |
| String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null); |
| |
| assertDatabaseIsRegistered(newDBName); |
| |
| String colTraitDetails = createTrait(columnGuid); //Add trait to column |
| String sdTraitDetails = createTrait(sdGuid); //Add trait to sd |
| String partColumnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt")); |
| String partColTraitDetails = createTrait(partColumnGuid); //Add trait to part col keys |
| String newTableName = tableName(); |
| String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName); |
| |
| runCommandWithDelay(query, 3000); |
| |
| String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME)); |
| |
| Assert.assertEquals(newColGuid, columnGuid); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), NAME)); |
| |
| assertTrait(columnGuid, colTraitDetails); |
| |
| String newSdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName)), null); |
| |
| Assert.assertEquals(newSdGuid, sdGuid); |
| assertTrait(sdGuid, sdTraitDetails); |
| assertTrait(partColumnGuid, partColTraitDetails); |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| |
| String renamedTableId = assertTableIsRegistered(newDBName, newTableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(final AtlasEntity entity) throws Exception { |
| AtlasObjectId sd = toAtlasObjectId(entity.getAttribute(ATTRIBUTE_STORAGEDESC)); |
| |
| assertNotNull(sd); |
| } |
| }); |
| |
| AtlasEntity renamedTableEntity = atlasClientV2.getEntityByGuid(renamedTableId).getEntity(); |
| List ddlQueries = (List) renamedTableEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| |
| } |
| |
| private List<AtlasEntity> getColumns(String dbName, String tableName) throws Exception { |
| String tableId = assertTableIsRegistered(dbName, tableName); |
| AtlasEntityWithExtInfo tblEntityWithExtInfo = atlasClientV2.getEntityByGuid(tableId); |
| AtlasEntity tableEntity = tblEntityWithExtInfo.getEntity(); |
| |
| //with soft delete, the deleted columns are returned as well. So, filter the deleted ones |
| List<AtlasObjectId> columns = toAtlasObjectIdList(tableEntity.getAttribute(ATTRIBUTE_COLUMNS)); |
| List<AtlasEntity> activeColumns = new ArrayList<>(); |
| |
| for (AtlasObjectId col : columns) { |
| AtlasEntity columnEntity = tblEntityWithExtInfo.getEntity(col.getGuid()); |
| |
| if (columnEntity.getStatus() == AtlasEntity.Status.ACTIVE) { |
| activeColumns.add(columnEntity); |
| } |
| } |
| |
| return activeColumns; |
| } |
| |
| private String createTrait(String guid) throws AtlasServiceException { |
| //add trait |
| //valid type names in v2 must consist of a letter followed by a sequence of letter, number, or _ characters |
| String traitName = "PII_Trait" + random(); |
| AtlasClassificationDef piiTrait = AtlasTypeUtil.createTraitTypeDef(traitName, Collections.<String>emptySet()); |
| |
| atlasClientV2.createAtlasTypeDefs(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(), Collections.singletonList(piiTrait), Collections.emptyList())); |
| atlasClientV2.addClassifications(guid, Collections.singletonList(new AtlasClassification(piiTrait.getName()))); |
| |
| return traitName; |
| } |
| |
| private void assertTrait(String guid, String traitName) throws AtlasServiceException { |
| AtlasClassification.AtlasClassifications classifications = atlasClientV2.getClassifications(guid); |
| |
| Assert.assertEquals(classifications.getList().get(0).getTypeName(), traitName); |
| } |
| |
| @Test |
| public void testAlterTableAddColumn() throws Exception { |
| String tableName = createTable(); |
| String column = columnName(); |
| String query = "alter table " + tableName + " add columns (" + column + " string)"; |
| |
| runCommand(query); |
| |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), column)); |
| |
| //Verify the number of columns present in the table |
| List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(columns.size(), 3); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| |
| } |
| |
| //ATLAS-1321: Disable problematic tests. Need to revisit and fix them later |
| @Test(enabled = false) |
| public void testAlterTableDropColumn() throws Exception { |
| String tableName = createTable(); |
| String colDropped = "id"; |
| String query = "alter table " + tableName + " replace columns (name string)"; |
| |
| runCommand(query); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), colDropped)); |
| |
| //Verify the number of columns present in the table |
| List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName); |
| |
| assertEquals(columns.size(), 1); |
| assertEquals(columns.get(0).getAttribute(NAME), "name"); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| } |
| |
| @Test |
| public void testAlterTableChangeColumn() throws Exception { |
| //Change name |
| String oldColName = NAME; |
| String newColName = "name1"; |
| String tableName = createTable(); |
| String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName); |
| |
| runCommandWithDelay(query, 3000); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName)); |
| |
| //Verify the number of columns present in the table |
| List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(columns.size(), 2); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| |
| //Change column type |
| oldColName = "name1"; |
| newColName = "name2"; |
| |
| String newColType = "int"; |
| |
| query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType); |
| |
| runCommandWithDelay(query, 3000); |
| |
| columns = getColumns(DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(columns.size(), 2); |
| |
| String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); |
| |
| assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| assertEquals(entity.getAttribute("type"), "int"); |
| } |
| }); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); |
| |
| AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries2); |
| Assert.assertEquals(ddlQueries2.size(), 3); |
| |
| //Change name and add comment |
| oldColName = "name2"; |
| newColName = "name3"; |
| |
| String comment = "added comment"; |
| |
| query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment); |
| |
| runCommandWithDelay(query, 3000); |
| |
| columns = getColumns(DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(columns.size(), 2); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); |
| |
| newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); |
| |
| assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| assertEquals(entity.getAttribute(ATTRIBUTE_COMMENT), comment); |
| } |
| }); |
| |
| //Change column position |
| oldColName = "name3"; |
| newColName = "name4"; |
| query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType); |
| |
| runCommandWithDelay(query, 3000); |
| |
| columns = getColumns(DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(columns.size(), 2); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); |
| |
| newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); |
| |
| assertColumnIsRegistered(newColQualifiedName); |
| |
| String finalNewColName = newColName; |
| |
| String tblId3 = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS)); |
| |
| assertEquals(columns.size(), 2); |
| } |
| } |
| ); |
| |
| AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(tblId3).getEntity(); |
| List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries3); |
| Assert.assertEquals(ddlQueries3.size(), 5); |
| |
| //Change col position again |
| oldColName = "name4"; |
| newColName = "name5"; |
| query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType); |
| |
| runCommandWithDelay(query, 3000); |
| |
| columns = getColumns(DEFAULT_DB, tableName); |
| |
| Assert.assertEquals(columns.size(), 2); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); |
| |
| newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); |
| |
| assertColumnIsRegistered(newColQualifiedName); |
| |
| //Check col position |
| String finalNewColName2 = newColName; |
| |
| String tblId4 = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS)); |
| |
| assertEquals(columns.size(), 2); |
| } |
| } |
| ); |
| |
| AtlasEntity tblEntity4 = atlasClientV2.getEntityByGuid(tblId4).getEntity(); |
| List ddlQueries4 = (List) tblEntity4.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries4); |
| Assert.assertEquals(ddlQueries4.size(), 6); |
| } |
| |
| /** |
| * Reenabling this test since HIVE-14706 is fixed now and the hive version we are using now sends |
| * us the column lineage information |
| * @throws Exception |
| */ |
| @Test |
| public void testColumnLevelLineage() throws Exception { |
| String sourceTable = "table" + random(); |
| |
| runCommand("create table " + sourceTable + "(a int, b int)"); |
| |
| String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable); |
| String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a")); |
| String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b")); |
| String ctasTableName = "table" + random(); |
| String query = "create table " + ctasTableName + " as " + "select sum(a+b) as a, count(*) as b from " + sourceTable; |
| |
| runCommand(query); |
| |
| String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "a")); |
| String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "b")); |
| |
| Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE); |
| Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); |
| HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); |
| AtlasEntity processEntity1 = validateProcess(event); |
| AtlasEntity hiveProcessExecution1 = validateProcessExecution(processEntity1, event); |
| AtlasObjectId process1 = toAtlasObjectId(hiveProcessExecution1.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS)); |
| Assert.assertEquals(process1.getGuid(), processEntity1.getGuid()); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1); |
| Assert.assertEquals(processEntity1.getGuid(), processEntity1.getGuid()); |
| |
| assertTableIsRegistered(DEFAULT_DB, ctasTableName); |
| |
| String processQName = sortEventsAndGetProcessQualifiedName(event); |
| List<String> aLineageInputs = Arrays.asList(a_guid, b_guid); |
| String aLineageProcessName = processQName + ":" + "a"; |
| |
| LOG.debug("Searching for column lineage process {} ", aLineageProcessName); |
| String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), ATTRIBUTE_QUALIFIED_NAME, aLineageProcessName, null); |
| |
| AtlasEntity colLineageEntity = atlasClientV2.getEntityByGuid(guid).getEntity(); |
| List<AtlasObjectId> processInputs = toAtlasObjectIdList(colLineageEntity.getAttribute("inputs")); |
| List<String> processInputsAsString = new ArrayList<>(); |
| |
| for(AtlasObjectId input: processInputs){ |
| processInputsAsString.add(input.getGuid()); |
| } |
| |
| Collections.sort(processInputsAsString); |
| Collections.sort(aLineageInputs); |
| |
| Assert.assertEquals(processInputsAsString, aLineageInputs); |
| |
| List<String> bLineageInputs = Arrays.asList(sourceTableGUID); |
| String bLineageProcessName = processQName + ":" + "b"; |
| |
| LOG.debug("Searching for column lineage process {} ", bLineageProcessName); |
| |
| String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), ATTRIBUTE_QUALIFIED_NAME, bLineageProcessName, null); |
| |
| |
| AtlasEntity colLineageEntity1 = atlasClientV2.getEntityByGuid(guid1).getEntity(); |
| List<AtlasObjectId> bProcessInputs = toAtlasObjectIdList(colLineageEntity1.getAttribute("inputs")); |
| List<String> bProcessInputsAsString = new ArrayList<>(); |
| |
| for(AtlasObjectId input: bProcessInputs){ |
| bProcessInputsAsString.add(input.getGuid()); |
| } |
| |
| Collections.sort(bProcessInputsAsString); |
| Collections.sort(bLineageInputs); |
| |
| Assert.assertEquals(bProcessInputsAsString, bLineageInputs); |
| |
| //Test lineage API response |
| AtlasLineageInfo atlasLineageInfoInput = atlasClientV2.getLineageInfo(dest_a_guid, AtlasLineageInfo.LineageDirection.INPUT,0); |
| Map<String, AtlasEntityHeader> entityMap = atlasLineageInfoInput.getGuidEntityMap(); |
| |
| ObjectNode response = atlasClient.getInputGraphForEntity(dest_a_guid); |
| JsonNode vertices = response.get("values").get("vertices"); |
| JsonNode dest_a_val = vertices.get(dest_a_guid); |
| JsonNode src_a_val = vertices.get(a_guid); |
| JsonNode src_b_val = vertices.get(b_guid); |
| |
| Assert.assertNotNull(dest_a_val); |
| Assert.assertNotNull(src_a_val); |
| Assert.assertNotNull(src_b_val); |
| |
| ObjectNode b_response = atlasClient.getInputGraphForEntity(dest_b_guid); |
| JsonNode b_vertices = b_response.get("values").get("vertices"); |
| JsonNode b_val = b_vertices.get(dest_b_guid); |
| JsonNode src_tbl_val = b_vertices.get(sourceTableGUID); |
| |
| Assert.assertNotNull(b_val); |
| Assert.assertNotNull(src_tbl_val); |
| } |
| |
| @Test |
| public void testIgnoreTruncateTable() throws Exception { |
| String tableName = createTable(false); |
| String query = String.format("truncate table %s", tableName); |
| |
| runCommand(query); |
| |
| Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); |
| HiveEventContext event = constructEvent(query, HiveOperation.TRUNCATETABLE, null, outputs); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| assertProcessIsNotRegistered(event); |
| } |
| |
| @Test |
| public void testAlterTablePartitionColumnType() throws Exception { |
| String tableName = createTable(true, true, false); |
| String newType = "int"; |
| String query = String.format("ALTER TABLE %s PARTITION COLUMN (dt %s)", tableName, newType); |
| |
| runCommand(query); |
| |
| String colQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt"); |
| String dtColId = assertColumnIsRegistered(colQualifiedName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity column) throws Exception { |
| Assert.assertEquals(column.getAttribute("type"), newType); |
| } |
| }); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity table) throws Exception { |
| final List<AtlasObjectId> partitionKeys = toAtlasObjectIdList(table.getAttribute("partitionKeys")); |
| Assert.assertEquals(partitionKeys.size(), 1); |
| Assert.assertEquals(partitionKeys.get(0).getGuid(), dtColId); |
| |
| } |
| }); |
| } |
| |
| @Test |
| public void testAlterTableWithoutHookConf() throws Exception { |
| String tableName = tableName(); |
| String createCommand = "create table " + tableName + " (id int, name string)"; |
| |
| driverWithNoHook.run(createCommand); |
| |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| |
| String command = "alter table " + tableName + " change id id_new string"; |
| |
| runCommand(command); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| |
| String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); |
| |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new")); |
| } |
| |
| @Test |
| public void testTraitsPreservedOnColumnRename() throws Exception { |
| String dbName = createDatabase(); |
| String tableName = tableName(); |
| String createQuery = String.format("create table %s.%s (id int, name string)", dbName, tableName); |
| |
| runCommand(createQuery); |
| |
| String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName); |
| String guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id")); |
| String trait = createTrait(guid); |
| String oldColName = "id"; |
| String newColName = "id_new"; |
| String query = String.format("alter table %s.%s change %s %s string", dbName, tableName, oldColName, newColName); |
| |
| runCommand(query); |
| |
| String guid2 = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new")); |
| |
| assertEquals(guid2, guid); |
| |
| assertTrue(atlasClient.getEntity(guid2).getTraitNames().contains(trait)); |
| } |
| |
| @Test |
| public void testAlterViewRename() throws Exception { |
| String tableName = createTable(); |
| String viewName = tableName(); |
| String newName = tableName(); |
| String query = "create view " + viewName + " as select * from " + tableName; |
| |
| runCommandWithDelay(query, 5000); |
| |
| query = "alter view " + viewName + " rename to " + newName; |
| |
| runCommandWithDelay(query, 5000); |
| |
| assertTableIsNotRegistered(DEFAULT_DB, viewName); |
| |
| String viewId = assertTableIsRegistered(DEFAULT_DB, newName); |
| AtlasEntity viewEntity = atlasClientV2.getEntityByGuid(viewId).getEntity(); |
| List ddlQueries = (List) viewEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| } |
| |
| @Test |
| public void testAlterTableLocation() throws Exception { |
| //Its an external table, so the HDFS location should also be registered as an entity |
| String tableName = createTable(true, true, false); |
| String testPath = createTestDFSPath("testBaseDir"); |
| String query = "alter table " + tableName + " set location '" + testPath + "'"; |
| |
| runCommandWithDelay(query, 8000); |
| |
| String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity tableRef) throws Exception { |
| AtlasObjectId sd = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC)); |
| |
| assertNotNull(sd); |
| } |
| }); |
| |
| AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity(); |
| List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES); |
| |
| Assert.assertNotNull(ddlQueries); |
| Assert.assertEquals(ddlQueries.size(), 2); |
| |
| String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName); |
| String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQualifiedName, null); |
| AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); |
| Assert.assertEquals(numberOfProcessExecutions(processEntity), 2); |
| //validateProcessExecution(processEntity, event); |
| validateHDFSPaths(processEntity, INPUTS, testPath); |
| } |
| |
| @Test |
| public void testAlterTableFileFormat() throws Exception { |
| String tableName = createTable(); |
| String testFormat = "orc"; |
| String query = "alter table " + tableName + " set FILEFORMAT " + testFormat; |
| |
| runCommand(query); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity tableRef) throws Exception { |
| AtlasObjectId sdObjectId = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC)); |
| AtlasEntity sdEntity = atlasClientV2.getEntityByGuid(sdObjectId.getGuid()).getEntity(); |
| |
| Assert.assertEquals(sdEntity.getAttribute(ATTRIBUTE_INPUT_FORMAT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); |
| Assert.assertEquals(sdEntity.getAttribute(ATTRIBUTE_OUTPUT_FORMAT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); |
| Assert.assertNotNull(sdEntity.getAttribute(ATTRIBUTE_SERDE_INFO)); |
| |
| AtlasStruct serdeInfo = toAtlasStruct(sdEntity.getAttribute(ATTRIBUTE_SERDE_INFO)); |
| |
| Assert.assertEquals(serdeInfo.getAttribute(ATTRIBUTE_SERIALIZATION_LIB), "org.apache.hadoop.hive.ql.io.orc.OrcSerde"); |
| Assert.assertNotNull(serdeInfo.getAttribute(ATTRIBUTE_PARAMETERS)); |
| Assert.assertEquals(((Map<String, String>) serdeInfo.getAttribute(ATTRIBUTE_PARAMETERS)).get("serialization.format"), "1"); |
| } |
| }); |
| |
| |
| /** |
| * Hive 'alter table stored as' is not supported - See https://issues.apache.org/jira/browse/HIVE-9576 |
| * query = "alter table " + tableName + " STORED AS " + testFormat.toUpperCase(); |
| * runCommand(query); |
| |
| * tableRef = atlasClientV1.getEntity(tableId); |
| * sdRef = (AtlasEntity)tableRef.getAttribute(HiveMetaStoreBridge.STORAGE_DESC); |
| * Assert.assertEquals(sdRef.getAttribute(HiveMetaStoreBridge.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); |
| * Assert.assertEquals(sdRef.getAttribute(HiveMetaStoreBridge.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); |
| * Assert.assertEquals(((Map) sdRef.getAttribute(HiveMetaStoreBridge.PARAMETERS)).getAttribute("orc.compress"), "ZLIB"); |
| */ |
| } |
| |
| @Test |
| public void testAlterTableBucketingClusterSort() throws Exception { |
| String tableName = createTable(); |
| List<String> cols = Collections.singletonList("id"); |
| |
| runBucketSortQuery(tableName, 5, cols, cols); |
| |
| cols = Arrays.asList("id", NAME); |
| |
| runBucketSortQuery(tableName, 2, cols, cols); |
| } |
| |
| private void runBucketSortQuery(String tableName, final int numBuckets, final List<String> bucketCols, final List<String> sortCols) throws Exception { |
| String fmtQuery = "alter table %s CLUSTERED BY (%s) SORTED BY (%s) INTO %s BUCKETS"; |
| String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()), stripListBrackets(sortCols.toString()), numBuckets); |
| |
| runCommand(query); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| verifyBucketSortingProperties(entity, numBuckets, bucketCols, sortCols); |
| } |
| }); |
| } |
| |
| private String stripListBrackets(String listElements) { |
| return StringUtils.strip(StringUtils.strip(listElements, "["), "]"); |
| } |
| |
| private void verifyBucketSortingProperties(AtlasEntity tableRef, int numBuckets, List<String> bucketColNames, List<String> sortcolNames) throws Exception { |
| AtlasObjectId sdObjectId = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC)); |
| AtlasEntity sdEntity = atlasClientV2.getEntityByGuid(sdObjectId.getGuid()).getEntity(); |
| |
| Assert.assertEquals((sdEntity.getAttribute(ATTRIBUTE_NUM_BUCKETS)), numBuckets); |
| Assert.assertEquals(sdEntity.getAttribute(ATTRIBUTE_BUCKET_COLS), bucketColNames); |
| |
| List<AtlasStruct> hiveOrderStructList = toAtlasStructList(sdEntity.getAttribute(ATTRIBUTE_SORT_COLS)); |
| |
| Assert.assertNotNull(hiveOrderStructList); |
| Assert.assertEquals(hiveOrderStructList.size(), sortcolNames.size()); |
| |
| for (int i = 0; i < sortcolNames.size(); i++) { |
| AtlasStruct hiveOrderStruct = hiveOrderStructList.get(i); |
| |
| Assert.assertNotNull(hiveOrderStruct); |
| Assert.assertEquals(hiveOrderStruct.getAttribute("col"), sortcolNames.get(i)); |
| Assert.assertEquals(hiveOrderStruct.getAttribute("order"), 1); |
| } |
| } |
| |
| @Test |
| public void testAlterTableSerde() throws Exception { |
| //SERDE PROPERTIES |
| String tableName = createTable(); |
| Map<String, String> expectedProps = new HashMap<String, String>() {{ |
| put("key1", "value1"); |
| }}; |
| |
| runSerdePropsQuery(tableName, expectedProps); |
| |
| expectedProps.put("key2", "value2"); |
| |
| //Add another property |
| runSerdePropsQuery(tableName, expectedProps); |
| } |
| |
| @Test |
| public void testDropTable() throws Exception { |
| //Test Deletion of tables and its corrresponding columns |
| String tableName = createTable(true, true, false); |
| |
| assertTableIsRegistered(DEFAULT_DB, tableName); |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id")); |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); |
| |
| String query = String.format("drop table %s ", tableName); |
| |
| runCommandWithDelay(query, 3000); |
| |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id")); |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| } |
| |
| private WriteEntity getPartitionOutput() { |
| TestWriteEntity partEntity = new TestWriteEntity(PART_FILE, Entity.Type.PARTITION); |
| |
| return partEntity; |
| } |
| |
| private ReadEntity getPartitionInput() { |
| ReadEntity partEntity = new TestReadEntity(PART_FILE, Entity.Type.PARTITION); |
| |
| return partEntity; |
| } |
| |
| @Test |
| public void testDropDatabaseWithCascade() throws Exception { |
| //Test Deletion of database and its corresponding tables |
| String dbName = "db" + random(); |
| |
| runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1')"); |
| |
| int numTables = 10; |
| String[] tableNames = new String[numTables]; |
| |
| for(int i = 0; i < numTables; i++) { |
| tableNames[i] = createTable(true, true, false); |
| } |
| |
| String query = String.format("drop database %s cascade", dbName); |
| |
| runCommand(query); |
| |
| //Verify columns are not registered for one of the tables |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id")); |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), NAME)); |
| |
| for(int i = 0; i < numTables; i++) { |
| assertTableIsNotRegistered(dbName, tableNames[i]); |
| } |
| |
| assertDatabaseIsNotRegistered(dbName); |
| } |
| |
| @Test |
| public void testDropDatabaseWithoutCascade() throws Exception { |
| //Test Deletion of database and its corresponding tables |
| String dbName = "db" + random(); |
| |
| runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1')"); |
| |
| int numTables = 5; |
| String[] tableNames = new String[numTables]; |
| |
| for(int i = 0; i < numTables; i++) { |
| tableNames[i] = createTable(true, true, false); |
| |
| String query = String.format("drop table %s", tableNames[i]); |
| |
| runCommand(query); |
| |
| assertTableIsNotRegistered(dbName, tableNames[i]); |
| } |
| |
| String query = String.format("drop database %s", dbName); |
| |
| runCommand(query); |
| |
| String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName); |
| |
| Thread.sleep(10000); |
| |
| try { |
| atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName)); |
| } catch (AtlasServiceException e) { |
| if (e.getStatus() == ClientResponse.Status.NOT_FOUND) { |
| return; |
| } |
| } |
| |
| fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, attributeValue = %s", HiveDataTypes.HIVE_DB.getName(), ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName)); |
| } |
| |
| @Test |
| public void testDropNonExistingDB() throws Exception { |
| //Test Deletion of a non existing DB |
| String dbName = "nonexistingdb"; |
| |
| assertDatabaseIsNotRegistered(dbName); |
| |
| String query = String.format("drop database if exists %s cascade", dbName); |
| |
| runCommand(query); |
| |
| //Should have no effect |
| assertDatabaseIsNotRegistered(dbName); |
| } |
| |
| @Test |
| public void testDropNonExistingTable() throws Exception { |
| //Test Deletion of a non existing table |
| String tableName = "nonexistingtable"; |
| |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| |
| String query = String.format("drop table if exists %s", tableName); |
| |
| runCommand(query); |
| |
| //Should have no effect |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| } |
| |
| @Test |
| public void testDropView() throws Exception { |
| //Test Deletion of tables and its corrresponding columns |
| String tableName = createTable(true, true, false); |
| String viewName = tableName(); |
| String query = "create view " + viewName + " as select * from " + tableName; |
| |
| runCommandWithDelay(query, 3000); |
| |
| assertTableIsRegistered(DEFAULT_DB, viewName); |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id")); |
| assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME)); |
| |
| query = String.format("drop view %s ", viewName); |
| |
| runCommandWithDelay(query, 3000); |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id")); |
| assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME)); |
| assertTableIsNotRegistered(DEFAULT_DB, viewName); |
| } |
| |
| private void runSerdePropsQuery(String tableName, Map<String, String> expectedProps) throws Exception { |
| String serdeLib = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; |
| String serializedProps = getSerializedProps(expectedProps); |
| String query = String.format("alter table %s set SERDE '%s' WITH SERDEPROPERTIES (%s)", tableName, serdeLib, serializedProps); |
| |
| runCommand(query); |
| |
| verifyTableSdProperties(tableName, serdeLib, expectedProps); |
| } |
| |
| private String getSerializedProps(Map<String, String> expectedProps) { |
| StringBuilder sb = new StringBuilder(); |
| |
| for(String expectedPropKey : expectedProps.keySet()) { |
| if(sb.length() > 0) { |
| sb.append(","); |
| } |
| |
| sb.append("'").append(expectedPropKey).append("'"); |
| sb.append("="); |
| sb.append("'").append(expectedProps.get(expectedPropKey)).append("'"); |
| } |
| |
| return sb.toString(); |
| } |
| |
| @Test |
| public void testAlterDBOwner() throws Exception { |
| String dbName = createDatabase(); |
| |
| assertDatabaseIsRegistered(dbName); |
| |
| String owner = "testOwner"; |
| String fmtQuery = "alter database %s set OWNER %s %s"; |
| String query = String.format(fmtQuery, dbName, "USER", owner); |
| |
| runCommandWithDelay(query, 3000); |
| |
| assertDatabaseIsRegistered(dbName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) { |
| assertEquals(entity.getAttribute(AtlasClient.OWNER), owner); |
| } |
| }); |
| } |
| |
| @Test |
| public void testAlterDBProperties() throws Exception { |
| String dbName = createDatabase(); |
| String fmtQuery = "alter database %s %s DBPROPERTIES (%s)"; |
| |
| testAlterProperties(Entity.Type.DATABASE, dbName, fmtQuery); |
| } |
| |
| @Test |
| public void testAlterTableProperties() throws Exception { |
| String tableName = createTable(); |
| String fmtQuery = "alter table %s %s TBLPROPERTIES (%s)"; |
| |
| testAlterProperties(Entity.Type.TABLE, tableName, fmtQuery); |
| } |
| |
| private void testAlterProperties(Entity.Type entityType, String entityName, String fmtQuery) throws Exception { |
| String SET_OP = "set"; |
| String UNSET_OP = "unset"; |
| Map<String, String> expectedProps = new HashMap<String, String>() {{ |
| put("testPropKey1", "testPropValue1"); |
| put("comment", "test comment"); |
| }}; |
| |
| String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps)); |
| |
| runCommandWithDelay(query, 3000); |
| |
| verifyEntityProperties(entityType, entityName, expectedProps, false); |
| |
| expectedProps.put("testPropKey2", "testPropValue2"); |
| //Add another property |
| |
| query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps)); |
| |
| runCommandWithDelay(query, 3000); |
| |
| verifyEntityProperties(entityType, entityName, expectedProps, false); |
| |
| if (entityType != Entity.Type.DATABASE) { |
| //Database unset properties doesnt work - alter database %s unset DBPROPERTIES doesnt work |
| //Unset all the props |
| StringBuilder sb = new StringBuilder("'"); |
| |
| query = String.format(fmtQuery, entityName, UNSET_OP, Joiner.on("','").skipNulls().appendTo(sb, expectedProps.keySet()).append('\'')); |
| |
| runCommandWithDelay(query, 3000); |
| |
| verifyEntityProperties(entityType, entityName, expectedProps, true); |
| } |
| } |
| |
| @Test |
| public void testAlterViewProperties() throws Exception { |
| String tableName = createTable(); |
| String viewName = tableName(); |
| String query = "create view " + viewName + " as select * from " + tableName; |
| |
| runCommand(query); |
| |
| String fmtQuery = "alter view %s %s TBLPROPERTIES (%s)"; |
| |
| testAlterProperties(Entity.Type.TABLE, viewName, fmtQuery); |
| } |
| |
| private void verifyEntityProperties(Entity.Type type, String entityName, final Map<String, String> expectedProps, final boolean checkIfNotExists) throws Exception { |
| switch(type) { |
| case TABLE: |
| assertTableIsRegistered(DEFAULT_DB, entityName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| verifyProperties(entity, expectedProps, checkIfNotExists); |
| } |
| }); |
| break; |
| case DATABASE: |
| assertDatabaseIsRegistered(entityName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity entity) throws Exception { |
| verifyProperties(entity, expectedProps, checkIfNotExists); |
| } |
| }); |
| break; |
| } |
| } |
| |
| private void verifyTableSdProperties(String tableName, final String serdeLib, final Map<String, String> expectedProps) throws Exception { |
| assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(AtlasEntity tableRef) throws Exception { |
| AtlasObjectId sdEntity = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC)); |
| |
| assertNotNull(sdEntity); |
| } |
| }); |
| } |
| |
| |
| private void verifyProperties(AtlasStruct referenceable, Map<String, String> expectedProps, boolean checkIfNotExists) { |
| Map<String, String> parameters = (Map<String, String>) referenceable.getAttribute(ATTRIBUTE_PARAMETERS); |
| |
| if (!checkIfNotExists) { |
| //Check if properties exist |
| Assert.assertNotNull(parameters); |
| for (String propKey : expectedProps.keySet()) { |
| Assert.assertEquals(parameters.get(propKey), expectedProps.get(propKey)); |
| } |
| } else { |
| //Check if properties dont exist |
| if (expectedProps != null && parameters != null) { |
| for (String propKey : expectedProps.keySet()) { |
| Assert.assertFalse(parameters.containsKey(propKey)); |
| } |
| } |
| } |
| } |
| |
| private String sortEventsAndGetProcessQualifiedName(final HiveEventContext event) throws HiveException{ |
| SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); |
| SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); |
| |
| if (event.getInputs() != null) { |
| sortedHiveInputs.addAll(event.getInputs()); |
| } |
| |
| if (event.getOutputs() != null) { |
| sortedHiveOutputs.addAll(event.getOutputs()); |
| } |
| |
| return getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs())); |
| } |
| |
| private String assertProcessIsRegistered(final HiveEventContext event) throws Exception { |
| try { |
| String processQFName = sortEventsAndGetProcessQualifiedName(event); |
| |
| LOG.debug("Searching for process with query {}", processQFName); |
| |
| return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(final AtlasEntity entity) throws Exception { |
| List<String> recentQueries = (List<String>) entity.getAttribute(ATTRIBUTE_RECENT_QUERIES); |
| Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr())); |
| } |
| }); |
| } catch (Exception e) { |
| LOG.error("Exception : ", e); |
| throw e; |
| } |
| } |
| |
| private String assertProcessIsRegistered(final HiveEventContext event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) throws Exception { |
| try { |
| SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); |
| SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); |
| |
| if (event.getInputs() != null) { |
| sortedHiveInputs.addAll(event.getInputs()); |
| } |
| |
| if (event.getOutputs() != null) { |
| sortedHiveOutputs.addAll(event.getOutputs()); |
| } |
| |
| String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls)); |
| |
| LOG.debug("Searching for process with query {}", processQFName); |
| |
| return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(final AtlasEntity entity) throws Exception { |
| List<String> recentQueries = (List<String>) entity.getAttribute(BaseHiveEvent.ATTRIBUTE_RECENT_QUERIES); |
| |
| Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr())); |
| } |
| }); |
| } catch(Exception e) { |
| LOG.error("Exception : ", e); |
| throw e; |
| } |
| } |
| |
| private String assertProcessExecutionIsRegistered(AtlasEntity hiveProcess, final HiveEventContext event) throws Exception { |
| try { |
| String guid = ""; |
| List<AtlasObjectId> processExecutions = toAtlasObjectIdList(hiveProcess.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS_EXECUTIONS)); |
| for (AtlasObjectId processExecution : processExecutions) { |
| AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2. |
| getEntityByGuid(processExecution.getGuid()); |
| AtlasEntity entity = atlasEntityWithExtInfo.getEntity(); |
| if (String.valueOf(entity.getAttribute(ATTRIBUTE_QUERY_TEXT)).equals(event.getQueryStr().toLowerCase().trim())) { |
| guid = entity.getGuid(); |
| } |
| } |
| |
| return assertEntityIsRegisteredViaGuid(guid, new AssertPredicate() { |
| @Override |
| public void assertOnEntity(final AtlasEntity entity) throws Exception { |
| String queryText = (String) entity.getAttribute(ATTRIBUTE_QUERY_TEXT); |
| Assert.assertEquals(queryText, event.getQueryStr().toLowerCase().trim()); |
| } |
| }); |
| } catch(Exception e) { |
| LOG.error("Exception : ", e); |
| throw e; |
| } |
| } |
| |
| |
| private String getDSTypeName(Entity entity) { |
| return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : HiveMetaStoreBridge.HDFS_PATH; |
| } |
| |
| private <T extends Entity> SortedMap<T, AtlasEntity> getSortedProcessDataSets(Set<T> inputTbls) { |
| SortedMap<T, AtlasEntity> inputs = new TreeMap<>(entityComparator); |
| |
| if (inputTbls != null) { |
| for (final T tbl : inputTbls) { |
| AtlasEntity inputTableRef = new AtlasEntity(getDSTypeName(tbl), new HashMap<String, Object>() {{ |
| put(ATTRIBUTE_QUALIFIED_NAME, tbl.getName()); |
| }}); |
| |
| inputs.put(tbl, inputTableRef); |
| } |
| } |
| return inputs; |
| } |
| |
| private void assertProcessIsNotRegistered(HiveEventContext event) throws Exception { |
| try { |
| SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); |
| SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); |
| |
| if (event.getInputs() != null) { |
| sortedHiveInputs.addAll(event.getInputs()); |
| } |
| |
| if (event.getOutputs() != null) { |
| sortedHiveOutputs.addAll(event.getOutputs()); |
| } |
| |
| String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs())); |
| |
| LOG.debug("Searching for process with query {}", processQFName); |
| |
| assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName); |
| } catch(Exception e) { |
| LOG.error("Exception : ", e); |
| } |
| } |
| |
| private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception { |
| LOG.debug("Searching for table {}.{}", dbName, tableName); |
| |
| String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporaryTable); |
| |
| assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); |
| } |
| |
| private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception { |
| LOG.debug("Searching for table {}.{}", dbName, tableName); |
| |
| String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, false); |
| |
| assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); |
| } |
| |
| private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception { |
| return assertTableIsRegistered(dbName, tableName, assertPredicate, false); |
| } |
| |
| @Test |
| public void testLineage() throws Exception { |
| String table1 = createTable(false); |
| String db2 = createDatabase(); |
| String table2 = tableName(); |
| String query = String.format("create table %s.%s as select * from %s", db2, table2, table1); |
| |
| runCommand(query); |
| |
| String table1Id = assertTableIsRegistered(DEFAULT_DB, table1); |
| String table2Id = assertTableIsRegistered(db2, table2); |
| AtlasLineageInfo inputLineage = atlasClientV2.getLineageInfo(table2Id, AtlasLineageInfo.LineageDirection.INPUT, 0); |
| Map<String, AtlasEntityHeader> entityMap = inputLineage.getGuidEntityMap(); |
| |
| assertTrue(entityMap.containsKey(table1Id)); |
| assertTrue(entityMap.containsKey(table2Id)); |
| |
| AtlasLineageInfo inputLineage1 = atlasClientV2.getLineageInfo(table1Id, AtlasLineageInfo.LineageDirection.OUTPUT, 0); |
| Map<String, AtlasEntityHeader> entityMap1 = inputLineage1.getGuidEntityMap(); |
| |
| assertTrue(entityMap1.containsKey(table1Id)); |
| assertTrue(entityMap1.containsKey(table2Id)); |
| } |
| |
| //For ATLAS-448 |
| @Test |
| public void testNoopOperation() throws Exception { |
| runCommand("show compactions"); |
| runCommand("show transactions"); |
| } |
| |
| private String createDatabase() throws Exception { |
| String dbName = dbName(); |
| |
| runCommand("create database " + dbName); |
| |
| return dbName; |
| } |
| |
| private String columnName() { |
| return "col" + random(); |
| } |
| |
| private String createTable() throws Exception { |
| return createTable(false); |
| } |
| |
| private String createTable(boolean isPartitioned) throws Exception { |
| String tableName = tableName(); |
| |
| runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "")); |
| |
| return tableName; |
| } |
| |
| private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception { |
| String tableName = tableName(); |
| |
| String location = ""; |
| if (isExternal) { |
| location = " location '" + createTestDFSPath("someTestPath") + "'"; |
| } |
| |
| runCommandWithDelay("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "") + location, 3000); |
| |
| return tableName; |
| } |
| |
| // ReadEntity class doesn't offer a constructor that takes (name, type). A hack to get the tests going! |
| private static class TestReadEntity extends ReadEntity { |
| private final String name; |
| private final Entity.Type type; |
| |
| public TestReadEntity(String name, Entity.Type type) { |
| this.name = name; |
| this.type = type; |
| } |
| |
| @Override |
| public String getName() { return name; } |
| |
| @Override |
| public Entity.Type getType() { return type; } |
| } |
| |
| // WriteEntity class doesn't offer a constructor that takes (name, type). A hack to get the tests going! |
| private static class TestWriteEntity extends WriteEntity { |
| private final String name; |
| private final Entity.Type type; |
| |
| public TestWriteEntity(String name, Entity.Type type) { |
| this.name = name; |
| this.type = type; |
| } |
| |
| @Override |
| public String getName() { return name; } |
| |
| @Override |
| public Entity.Type getType() { return type; } |
| } |
| |
| private int numberOfProcessExecutions(AtlasEntity hiveProcess) { |
| return toAtlasObjectIdList(hiveProcess.getRelationshipAttribute( |
| BaseHiveEvent.ATTRIBUTE_PROCESS_EXECUTIONS)).size(); |
| } |
| } |