blob: 2bce1b2a0e03efbae31c7a0a4947958833e0b896 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_DDL_QUERIES;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.testng.AssertJUnit.*;
public class HiveMetastoreHookIT extends HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHookIT.class);
@Test (priority = 1)
public void testCreateDatabase() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
AtlasEntity dbEntity = getAtlasEntity(dbId);
assertEquals(((List) dbEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
}
@Test (priority = 2)
public void testAlterDatabase() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
AtlasEntity dbEntity = getAtlasEntity(dbId);
assertNotNull(dbEntity);
// SET DBPROPERTIES
query = "ALTER DATABASE " + dbName + " SET DBPROPERTIES (\"prop1\"=\"val1\", \"prop2\"=\"val2\")";
runCommandWithDelay(query);
dbEntity = getAtlasEntity(dbId);
Map parameters = (Map) dbEntity.getAttribute("parameters");
assertEquals(((List) dbEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
assertNotNull(parameters);
assertEquals(2, parameters.size());
// SET OWNER to 'hive'
query = "ALTER DATABASE " + dbName + " SET OWNER USER hive";
runCommandWithDelay(query);
dbEntity = getAtlasEntity(dbId);
assertEquals(((List) dbEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
assertEquals(dbEntity.getAttribute("owner"), "hive");
assertEquals(dbEntity.getAttribute("ownerType"), "USER");
// SET LOCATION
String hdfsPath = "hdfs://localhost:8020/warehouse/tablespace/managed/dwx/new_db.db";
query = String.format("ALTER DATABASE %s SET LOCATION \"%s\"", dbName, hdfsPath);
runCommandWithDelay(query);
dbEntity = getAtlasEntity(dbId);
assertEquals(((List) dbEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
String location = (String) dbEntity.getAttribute("location");
assertEquals(location, hdfsPath);
}
@Test (priority = 3)
public void testDropDatabase() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
AtlasEntity dbEntity = getAtlasEntity(dbId);
assertNotNull(dbEntity);
query = "DROP DATABASE " + dbName;
runCommand(query);
assertDatabaseIsNotRegistered(dbName);
dbEntity = getAtlasEntity(dbId);
assertEquals(dbEntity.getStatus(), DELETED);
}
@Test (priority = 4)
public void testDropDatabaseWithTables() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommandWithDelay(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String table1 = tableName();
runCommandWithDelay("CREATE TABLE " + dbName + "." + table1 + " (name string, age int, dob date)");
String table1Id = assertTableIsRegistered(dbName, table1);
assertEquals(getAtlasEntity(table1Id).getStatus(), ACTIVE);
String table2 = tableName();
runCommandWithDelay("CREATE TABLE " + dbName + "." + table2 + " (name string, age int, dob date)");
String table2Id = assertTableIsRegistered(dbName, table2);
assertEquals(getAtlasEntity(table2Id).getStatus(), ACTIVE);
query = "DROP DATABASE " + dbName + " CASCADE";
runCommandWithDelay(query);
assertDatabaseIsNotRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), DELETED);
assertEquals(getAtlasEntity(table1Id).getStatus(), DELETED);
assertEquals(getAtlasEntity(table2Id).getStatus(), DELETED);
}
@Test (priority = 5)
public void testCreateTable() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (name string, age int, dob date)");
String tblId = assertTableIsRegistered(dbName, tableName);
AtlasEntity tblEntity = getAtlasEntity(tblId);
assertEquals(((List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
}
@Test (priority = 6)
public void testCreateView() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (name string, age int, dob date)");
String tblId = assertTableIsRegistered(dbName, tableName);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
String viewName = tableName();
runCommand("CREATE VIEW " + dbName + "." + viewName + " AS SELECT * FROM " + dbName + "." + tableName);
tblId = assertTableIsRegistered(dbName, viewName);
AtlasEntity tblEntity = getAtlasEntity(tblId);
assertEquals(((List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
}
@Test (priority = 7)
public void testAlterTableProperties() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
assertEquals(((List) getAtlasEntity(dbId).getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (name string, age int, dob date)");
String tblId = assertTableIsRegistered(dbName, tableName);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
assertEquals(((List) getAtlasEntity(tblId).getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
// SET TBLPROPERTIES
query = "ALTER TABLE " + dbName + "." + tableName + " SET TBLPROPERTIES (\"prop1\"=\"val1\", \"prop2\"=\"val2\", \"prop3\"=\"val3\")";
runCommandWithDelay(query);
query = "ALTER TABLE " + dbName + "." + tableName + " SET TBLPROPERTIES (\"comment\" = \"sample comment\")";
runCommandWithDelay(query);
// SET SERDE
query = "ALTER TABLE " + dbName + "." + tableName + " SET SERDE \"org.apache.hadoop.hive.ql.io.orc.OrcSerde\" WITH SERDEPROPERTIES (\"prop1\"=\"val1\", \"prop2\"=\"val2\")";
runCommandWithDelay(query);
// SET SERDEPROPERTIES
query = "ALTER TABLE " + dbName + "." + tableName + " SET SERDEPROPERTIES (\"prop1\"=\"val1\", \"prop2\"=\"val2\")";
runCommandWithDelay(query);
AtlasEntity tableEntity = getAtlasEntity(tblId);
Map<String, Object> tableParameters = (Map) tableEntity.getAttribute("parameters");
assertEquals(tableParameters.get("comment"), "sample comment");
assertEquals(tableParameters.get("prop1"), "val1");
assertEquals(tableParameters.get("prop2"), "val2");
assertEquals(tableParameters.get("prop3"), "val3");
AtlasEntity sdEntity = getAtlasEntity((String) ((Map) tableEntity.getAttribute("sd")).get("guid"));
Map serdeInfo = (Map) sdEntity.getAttribute("serdeInfo");
Map serdeAttrs = (Map) serdeInfo.get("attributes");
assertEquals(serdeAttrs.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde");
assertEquals(((Map) serdeAttrs.get("parameters")).get("prop1"), "val1");
assertEquals(((Map) serdeAttrs.get("parameters")).get("prop2"), "val2");
assertEquals(((List) tableEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
}
@Test (priority = 8)
public void testAlterTableRenameTableName() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (name string, age int, dob date)");
String tblId = assertTableIsRegistered(dbName, tableName);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
// RENAME TABLE NAME
String newTableName = tableName + "_new";
query = "ALTER TABLE " + dbName + "." + tableName + " RENAME TO " + dbName + "." + newTableName;
runCommandWithDelay(query);
AtlasEntityWithExtInfo tableEntityWithExtInfo = getAtlasEntityWithExtInfo(tblId);
AtlasEntity tableEntity = tableEntityWithExtInfo.getEntity();
assertEquals(((List) tableEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
// validate table rename in table entity
assertEquals(newTableName, tableEntity.getAttribute("name"));
assertTrue(((String) tableEntity.getAttribute("qualifiedName")).contains(newTableName));
// validate table rename in column and sd entity
for (AtlasEntity referredEntity : tableEntityWithExtInfo.getReferredEntities().values()) {
assertTrue(((String) referredEntity.getAttribute("qualifiedName")).contains(newTableName));
}
}
@Test (priority = 9)
public void testAlterTableRenameColumnName() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (col1 int, col2 int, col3 int)");
String tblId = assertTableIsRegistered(dbName, tableName);
AtlasEntityWithExtInfo tableEntityWithExtInfo = getAtlasEntityWithExtInfo(tblId);
assertEquals(tableEntityWithExtInfo.getEntity().getStatus(), ACTIVE);
String col1Id = getColumnId(tableEntityWithExtInfo, "col1");
String col2Id = getColumnId(tableEntityWithExtInfo, "col2");
// RENAME COLUMN NAME
query = "ALTER TABLE " + dbName + "." + tableName + " CHANGE col1 col11 int";
runCommandWithDelay(query);
AtlasEntity col1Entity = getAtlasEntity(col1Id);
assertEquals(col1Entity.getAttribute("name"), "col11");
assertTrue(((String) col1Entity.getAttribute("qualifiedName")).contains("col11"));
// CHANGE COLUMN NAME and DATATYPE
query = "ALTER TABLE " + dbName + "." + tableName + " CHANGE col2 col22 string";
runCommandWithDelay(query);
AtlasEntity col2Entity = getAtlasEntity(col2Id);
assertEquals(col2Entity.getAttribute("name"), "col22");
assertEquals(col2Entity.getAttribute("type"), "string");
assertEquals(((List) getAtlasEntity(tblId).getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 0);
}
@Test (priority = 10)
public void testDropTable() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (name string, age int, dob date)");
String tblId = assertTableIsRegistered(dbName, tableName);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
query = "DROP TABLE " + dbName + "." + tableName;
runCommandWithDelay(query);
assertEquals(getAtlasEntity(tblId).getStatus(), DELETED);
}
@Test (priority = 11)
public void testDropView() throws Exception {
String dbName = dbName();
String query = "CREATE DATABASE " + dbName;
runCommand(query);
String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String tableName = tableName();
runCommand("CREATE TABLE " + dbName + "." + tableName + " (name string, age int, dob date)");
String tblId = assertTableIsRegistered(dbName, tableName);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
String viewName = tableName();
runCommand("CREATE VIEW " + dbName + "." + viewName + " AS SELECT * FROM " + dbName + "." + tableName);
tblId = assertTableIsRegistered(dbName, viewName);
assertEquals(getAtlasEntity(tblId).getStatus(), ACTIVE);
query = "DROP VIEW " + dbName + "." + viewName;
runCommandWithDelay(query);
assertEquals(getAtlasEntity(tblId).getStatus(), DELETED);
}
private String getColumnId(AtlasEntityWithExtInfo entityWithExtInfo, String columnName) {
String ret = null;
for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) {
if (entity.getTypeName().equals("hive_column") && entity.getAttribute("name").equals(columnName)) {
ret = entity.getGuid();
break;
}
}
return ret;
}
private AtlasEntity getAtlasEntity(String guid) throws AtlasServiceException {
return atlasClientV2.getEntityByGuid(guid).getEntity();
}
private AtlasEntityWithExtInfo getAtlasEntityWithExtInfo(String guid) throws AtlasServiceException {
return atlasClientV2.getEntityByGuid(guid);
}
protected void runCommand(String cmd) throws Exception {
runCommandWithDelay(driverWithoutContext, cmd, 0);
}
protected void runCommandWithDelay(String cmd) throws Exception {
int delayTimeInMs = 10000;
runCommandWithDelay(driverWithoutContext, cmd, delayTimeInMs);
}
}