| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.atlas.hive.hook; |
| |
| import org.apache.atlas.ApplicationProperties; |
| import org.apache.atlas.AtlasClient; |
| import org.apache.atlas.utils.ParamChecker; |
| import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; |
| import org.apache.atlas.hive.model.HiveDataModelGenerator; |
| import org.apache.atlas.hive.model.HiveDataTypes; |
| import org.apache.atlas.typesystem.Referenceable; |
| import org.apache.commons.configuration.Configuration; |
| import org.apache.commons.lang.RandomStringUtils; |
| import org.apache.commons.lang.StringEscapeUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.TableType; |
| import org.apache.hadoop.hive.ql.Driver; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.codehaus.jettison.json.JSONArray; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.slf4j.Logger; |
| import org.testng.Assert; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import java.io.File; |
| import java.util.Map; |
| |
| public class HiveHookIT { |
| public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class); |
| |
| private static final String DGI_URL = "http://localhost:21000/"; |
| private static final String CLUSTER_NAME = "test"; |
| public static final String DEFAULT_DB = "default"; |
| private Driver driver; |
| private AtlasClient dgiCLient; |
| private SessionState ss; |
| |
| @BeforeClass |
| public void setUp() throws Exception { |
| //Set-up hive session |
| HiveConf conf = new HiveConf(); |
| driver = new Driver(conf); |
| ss = new SessionState(conf, System.getProperty("user.name")); |
| ss = SessionState.start(ss); |
| SessionState.setCurrentSessionState(ss); |
| |
| Configuration configuration = ApplicationProperties.get(); |
| dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); |
| } |
| |
| private void runCommand(String cmd) throws Exception { |
| ss.setCommandType(null); |
| driver.run(cmd); |
| } |
| |
| @Test |
| public void testCreateDatabase() throws Exception { |
| String dbName = "db" + random(); |
| runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')"); |
| String dbId = assertDatabaseIsRegistered(dbName); |
| |
| Referenceable definition = dgiCLient.getEntity(dbId); |
| Map params = (Map) definition.get("parameters"); |
| Assert.assertNotNull(params); |
| Assert.assertEquals(params.size(), 2); |
| Assert.assertEquals(params.get("p1"), "v1"); |
| |
| //There should be just one entity per dbname |
| runCommand("drop database " + dbName); |
| runCommand("create database " + dbName); |
| String dbid = assertDatabaseIsRegistered(dbName); |
| |
| //assert on qualified name |
| Referenceable dbEntity = dgiCLient.getEntity(dbid); |
| Assert.assertEquals(dbEntity.get("qualifiedName"), dbName.toLowerCase() + "@" + CLUSTER_NAME); |
| |
| } |
| |
| private String dbName() { |
| return "db" + random(); |
| } |
| |
| private String createDatabase() throws Exception { |
| String dbName = dbName(); |
| runCommand("create database " + dbName); |
| return dbName; |
| } |
| |
| private String tableName() { |
| return "table" + random(); |
| } |
| |
| private String createTable() throws Exception { |
| return createTable(true); |
| } |
| |
| private String createTable(boolean partition) throws Exception { |
| String tableName = tableName(); |
| runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (partition ? |
| " partitioned by(dt string)" : "")); |
| return tableName; |
| } |
| |
| @Test |
| public void testCreateTable() throws Exception { |
| String tableName = tableName(); |
| String dbName = createDatabase(); |
| String colName = "col" + random(); |
| runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)"); |
| assertTableIsRegistered(dbName, tableName); |
| |
| //there is only one instance of column registered |
| String colId = assertColumnIsRegistered(colName); |
| Referenceable colEntity = dgiCLient.getEntity(colId); |
| Assert.assertEquals(colEntity.get("qualifiedName"), String.format("%s.%s.%s@%s", dbName.toLowerCase(), |
| tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME)); |
| |
| tableName = createTable(); |
| String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); |
| Referenceable tableRef = dgiCLient.getEntity(tableId); |
| Assert.assertEquals(tableRef.get("tableType"), TableType.MANAGED_TABLE.name()); |
| Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment"); |
| String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); |
| Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName); |
| Assert.assertEquals(tableRef.get("name"), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME); |
| |
| final Referenceable sdRef = (Referenceable) tableRef.get("sd"); |
| Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS), false); |
| |
| //Create table where database doesn't exist, will create database instance as well |
| assertDatabaseIsRegistered(DEFAULT_DB); |
| } |
| |
| @Test |
| public void testRenameTable() throws Exception { |
| String tableName = createTable(); |
| String newTableName = tableName(); |
| runCommand(String.format("alter table %s rename to %s", tableName, newTableName)); |
| |
| assertTableIsRegistered(DEFAULT_DB, newTableName); |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| } |
| |
| |
| private String assertColumnIsRegistered(String colName) throws Exception { |
| LOG.debug("Searching for column {}", colName); |
| String query = |
| String.format("%s where name = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase()); |
| return assertEntityIsRegistered(query); |
| } |
| |
| @Test |
| public void testCTAS() throws Exception { |
| String tableName = createTable(); |
| String ctasTableName = "table" + random(); |
| String query = "create table " + ctasTableName + " as select * from " + tableName; |
| runCommand(query); |
| |
| assertProcessIsRegistered(query); |
| assertTableIsRegistered(DEFAULT_DB, ctasTableName); |
| } |
| |
| @Test |
| public void testCreateView() throws Exception { |
| String tableName = createTable(); |
| String viewName = tableName(); |
| String query = "create view " + viewName + " as select * from " + tableName; |
| runCommand(query); |
| |
| assertProcessIsRegistered(query); |
| assertTableIsRegistered(DEFAULT_DB, viewName); |
| } |
| |
| @Test |
| public void testLoadData() throws Exception { |
| String tableName = createTable(false); |
| |
| String loadFile = file("load"); |
| String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; |
| runCommand(query); |
| |
| assertProcessIsRegistered(query); |
| } |
| |
| @Test |
| public void testInsert() throws Exception { |
| String tableName = createTable(); |
| String insertTableName = createTable(); |
| String query = |
| "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName |
| + " where dt = '2015-01-01'"; |
| |
| runCommand(query); |
| assertProcessIsRegistered(query); |
| String partId = assertPartitionIsRegistered(DEFAULT_DB, insertTableName, "2015-01-01"); |
| Referenceable partitionEntity = dgiCLient.getEntity(partId); |
| Assert.assertEquals(partitionEntity.get("qualifiedName"), |
| String.format("%s.%s.%s@%s", "default", insertTableName.toLowerCase(), "2015-01-01", CLUSTER_NAME)); |
| } |
| |
| private String random() { |
| return RandomStringUtils.randomAlphanumeric(10); |
| } |
| |
| private String file(String tag) throws Exception { |
| String filename = "./target/" + tag + "-data-" + random(); |
| File file = new File(filename); |
| file.createNewFile(); |
| return file.getAbsolutePath(); |
| } |
| |
| private String mkdir(String tag) throws Exception { |
| String filename = "./target/" + tag + "-data-" + random(); |
| File file = new File(filename); |
| file.mkdirs(); |
| return file.getAbsolutePath(); |
| } |
| |
| @Test |
| public void testExportImport() throws Exception { |
| String tableName = createTable(false); |
| |
| String filename = "pfile://" + mkdir("export"); |
| String query = "export table " + tableName + " to \"" + filename + "\""; |
| runCommand(query); |
| assertProcessIsRegistered(query); |
| |
| tableName = createTable(false); |
| |
| query = "import table " + tableName + " from '" + filename + "'"; |
| runCommand(query); |
| assertProcessIsRegistered(query); |
| } |
| |
| @Test |
| public void testSelect() throws Exception { |
| String tableName = createTable(); |
| String query = "select * from " + tableName; |
| runCommand(query); |
| String pid = assertProcessIsRegistered(query); |
| Referenceable processEntity = dgiCLient.getEntity(pid); |
| Assert.assertEquals(processEntity.get("name"), query.toLowerCase()); |
| |
| //single entity per query |
| query = "SELECT * from " + tableName.toUpperCase(); |
| runCommand(query); |
| assertProcessIsRegistered(query); |
| } |
| |
| @Test(enabled = false) |
| public void testAlterTable() throws Exception { |
| String tableName = createTable(); |
| String newName = tableName(); |
| String query = "alter table " + tableName + " rename to " + newName; |
| runCommand(query); |
| |
| assertTableIsRegistered(DEFAULT_DB, newName); |
| assertTableIsNotRegistered(DEFAULT_DB, tableName); |
| } |
| |
| @Test(enabled = false) |
| public void testAlterView() throws Exception { |
| String tableName = createTable(); |
| String viewName = tableName(); |
| String newName = tableName(); |
| String query = "create view " + viewName + " as select * from " + tableName; |
| runCommand(query); |
| |
| query = "alter view " + viewName + " rename to " + newName; |
| runCommand(query); |
| |
| assertTableIsRegistered(DEFAULT_DB, newName); |
| assertTableIsNotRegistered(DEFAULT_DB, viewName); |
| } |
| |
| private String assertProcessIsRegistered(String queryStr) throws Exception { |
| // String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), |
| // normalize(queryStr)); |
| // assertEntityIsRegistered(dslQuery, true); |
| //todo replace with DSL |
| String typeName = HiveDataTypes.HIVE_PROCESS.getName(); |
| String gremlinQuery = |
| String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName, |
| normalize(queryStr)); |
| return assertEntityIsRegistered(gremlinQuery); |
| } |
| |
| private String normalize(String str) { |
| if (StringUtils.isEmpty(str)) { |
| return null; |
| } |
| return StringEscapeUtils.escapeJava(str.toLowerCase()); |
| } |
| |
| private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception { |
| LOG.debug("Searching for table {}.{}", dbName, tableName); |
| String query = String.format( |
| "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", |
| HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); |
| assertEntityIsNotRegistered(query); |
| } |
| |
| private String assertTableIsRegistered(String dbName, String tableName) throws Exception { |
| LOG.debug("Searching for table {}.{}", dbName, tableName); |
| String query = String.format( |
| "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", |
| HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); |
| return assertEntityIsRegistered(query, "t"); |
| } |
| |
| private String assertDatabaseIsRegistered(String dbName) throws Exception { |
| LOG.debug("Searching for database {}", dbName); |
| String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(), |
| dbName.toLowerCase(), CLUSTER_NAME); |
| return assertEntityIsRegistered(query); |
| } |
| |
| private String assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception { |
| String typeName = HiveDataTypes.HIVE_PARTITION.getName(); |
| |
| LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value); |
| String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', " |
| + "db where name = '%s' and clusterName = '%s' select p", typeName, value, |
| tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); |
| |
| return assertEntityIsRegistered(dslQuery, "p"); |
| } |
| |
| private String assertEntityIsRegistered(final String query, String... arg) throws Exception { |
| waitFor(2000, new Predicate() { |
| @Override |
| public boolean evaluate() throws Exception { |
| JSONArray results = dgiCLient.search(query); |
| return results.length() == 1; |
| } |
| }); |
| |
| String column = (arg.length > 0) ? arg[0] : "_col_0"; |
| |
| JSONArray results = dgiCLient.search(query); |
| JSONObject row = results.getJSONObject(0); |
| if (row.has("__guid")) { |
| return row.getString("__guid"); |
| } else if (row.has("$id$")) { |
| return row.getJSONObject("$id$").getString("id"); |
| } else { |
| return row.getJSONObject(column).getString("id"); |
| } |
| } |
| |
| private void assertEntityIsNotRegistered(String dslQuery) throws Exception { |
| JSONArray results = dgiCLient.searchByDSL(dslQuery); |
| Assert.assertEquals(results.length(), 0); |
| } |
| |
| @Test |
| public void testLineage() throws Exception { |
| String table1 = createTable(false); |
| |
| String db2 = createDatabase(); |
| String table2 = tableName(); |
| |
| String query = String.format("create table %s.%s as select * from %s", db2, table2, table1); |
| runCommand(query); |
| String table1Id = assertTableIsRegistered(DEFAULT_DB, table1); |
| String table2Id = assertTableIsRegistered(db2, table2); |
| |
| String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2); |
| JSONObject response = dgiCLient.getInputGraph(datasetName); |
| JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); |
| Assert.assertTrue(vertices.has(table1Id)); |
| Assert.assertTrue(vertices.has(table2Id)); |
| |
| datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1); |
| response = dgiCLient.getOutputGraph(datasetName); |
| vertices = response.getJSONObject("values").getJSONObject("vertices"); |
| Assert.assertTrue(vertices.has(table1Id)); |
| Assert.assertTrue(vertices.has(table2Id)); |
| } |
| |
| public interface Predicate { |
| |
| /** |
| * Perform a predicate evaluation. |
| * |
| * @return the boolean result of the evaluation. |
| * @throws Exception thrown if the predicate evaluation could not evaluate. |
| */ |
| boolean evaluate() throws Exception; |
| } |
| |
| /** |
| * Wait for a condition, expressed via a {@link Predicate} to become true. |
| * |
| * @param timeout maximum time in milliseconds to wait for the predicate to become true. |
| * @param predicate predicate waiting on. |
| */ |
| protected void waitFor(int timeout, Predicate predicate) throws Exception { |
| ParamChecker.notNull(predicate, "predicate"); |
| long mustEnd = System.currentTimeMillis() + timeout; |
| |
| boolean eval; |
| while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) { |
| LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis()); |
| Thread.sleep(100); |
| } |
| if (!eval) { |
| throw new Exception("Waiting timed out after " + timeout + " msec"); |
| } |
| } |
| } |