blob: 08471b480343abecb6d034776b9303bc8d53b8b6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.bridge;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.atlas.utils.AtlasConfigurationUtil;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.utils.PathExtractorContext;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
/**
* A Bridge Utility that imports metadata from the Hive Meta Store
* and registers them in Atlas.
*/
public class HiveMetaStoreBridge {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String HIVE_USERNAME = "atlas.hook.hive.default.username";
public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String TEMP_TABLE_PREFIX = "_temp-";
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
public static final String SEP = ":".intern();
public static final String HDFS_PATH = "hdfs_path";
public static final String DEFAULT_METASTORE_CATALOG = "hive";
public static final String HIVE_TABLE_DB_EDGE_LABEL = "__hive_table.db";
public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private static int pageLimit = 10000;
private final String metadataNamespace;
private final Hive hiveClient;
private final AtlasClientV2 atlasClientV2;
private final boolean convertHdfsPathToLowerCase;
private String awsS3AtlasModelVersion = null;
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
try {
Options options = new Options();
options.addOption("d", "database", true, "Database name");
options.addOption("t", "table", true, "Table name");
options.addOption("f", "filename", true, "Filename");
options.addOption("failOnError", false, "failOnError");
options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive");
CommandLine cmd = new BasicParser().parse(options, args);
boolean failOnError = cmd.hasOption("failOnError");
boolean deleteNonExisting = cmd.hasOption("deleteNonExisting");
LOG.info("delete non existing flag : {} ", deleteNonExisting);
String databaseToImport = cmd.getOptionValue("d");
String tableToImport = cmd.getOptionValue("t");
String fileToImport = cmd.getOptionValue("f");
Configuration atlasConf = ApplicationProperties.get();
String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
if (atlasEndpoint == null || atlasEndpoint.length == 0) {
atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
}
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
} else {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
}
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
if (deleteNonExisting) {
hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
} else if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
if (f.exists() && f.canRead()) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line = null;
while((line = br.readLine()) != null) {
String val[] = line.split(":");
if (ArrayUtils.isNotEmpty(val)) {
databaseToImport = val[0];
if (val.length > 1) {
tableToImport = val[1];
} else {
tableToImport = "";
}
hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
}
}
exitCode = EXIT_CODE_SUCCESS;
} else {
LOG.error("Failed to read the input file: " + fileToImport);
}
} else {
hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
}
exitCode = EXIT_CODE_SUCCESS;
} catch(ParseException e) {
LOG.error("Failed to parse arguments. Error: ", e.getMessage());
printUsage();
} catch(Exception e) {
LOG.error("Import failed", e);
} finally {
if( atlasClientV2 !=null) {
atlasClientV2.close();
}
}
System.exit(exitCode);
}
private static void printUsage() {
System.out.println();
System.out.println();
System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] " );
System.out.println(" Imports specified database and its tables ...");
System.out.println();
System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
System.out.println(" Imports specified table within that database ...");
System.out.println();
System.out.println("Usage 3: import-hive.sh");
System.out.println(" Imports all databases and tables...");
System.out.println();
System.out.println("Usage 4: import-hive.sh -f <filename>");
System.out.println(" Imports all databases and tables in the file...");
System.out.println(" Format:");
System.out.println(" database1:tbl1");
System.out.println(" database1:tbl2");
System.out.println(" database2:tbl2");
System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] " );
System.out.println(" Deletes databases and tables which are not in Hive ...");
System.out.println();
}
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
this.metadataNamespace = getMetadataNamespace(atlasProperties);
this.hiveClient = Hive.get(hiveConf);
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
if (atlasProperties != null) {
pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, 10000);
}
}
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception {
this(atlasProperties, hiveConf, null);
}
HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient, AtlasClientV2 atlasClientV2) {
this(metadataNamespace, hiveClient, atlasClientV2, true);
}
HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
this.metadataNamespace = metadataNamespace;
this.hiveClient = hiveClient;
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
}
public String getMetadataNamespace(Configuration config) {
return AtlasConfigurationUtil.getRecentString(config, HIVE_METADATA_NAMESPACE, getClusterName(config));
}
private String getClusterName(Configuration config) {
return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
}
public String getMetadataNamespace() {
return metadataNamespace;
}
public Hive getHiveClient() {
return hiveClient;
}
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
@VisibleForTesting
public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception {
LOG.info("Importing Hive metadata");
importDatabases(failOnError, databaseToImport, tableToImport);
}
private void importDatabases(boolean failOnError, String databaseToImport, String tableToImport) throws Exception {
List<String> databaseNames = null;
if (StringUtils.isEmpty(databaseToImport) && StringUtils.isEmpty(tableToImport)) {
//when both database and table to import are empty, import all
databaseNames = hiveClient.getAllDatabases();
} else if (StringUtils.isEmpty(databaseToImport) && StringUtils.isNotEmpty(tableToImport)) {
//when database is empty and table is not, then check table has database name in it and import that db and table
if (isTableWithDatabaseName(tableToImport)) {
String val[] = tableToImport.split("\\.");
if (val.length > 1) {
databaseToImport = val[0];
tableToImport = val[1];
}
databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
} else {
databaseNames = hiveClient.getAllDatabases();
}
} else {
//when database to import has some value then, import that db and all table under it.
databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
}
if(!CollectionUtils.isEmpty(databaseNames)) {
LOG.info("Found {} databases", databaseNames.size());
for (String databaseName : databaseNames) {
AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
if (dbEntity != null) {
importTables(dbEntity.getEntity(), databaseName, tableToImport, failOnError);
}
}
} else {
LOG.info("No database found");
}
}
/**
* Imports all tables for the given db
* @param dbEntity
* @param databaseName
* @param failOnError
* @throws Exception
*/
private int importTables(AtlasEntity dbEntity, String databaseName, String tblName, final boolean failOnError) throws Exception {
int tablesImported = 0;
final List<String> tableNames;
if (StringUtils.isEmpty(tblName)) {
tableNames = hiveClient.getAllTables(databaseName);
} else {
tableNames = hiveClient.getTablesByPattern(databaseName, tblName);
}
if(!CollectionUtils.isEmpty(tableNames)) {
LOG.info("Found {} tables to import in database {}", tableNames.size(), databaseName);
try {
for (String tableName : tableNames) {
int imported = importTable(dbEntity, databaseName, tableName, failOnError);
tablesImported += imported;
}
} finally {
if (tablesImported == tableNames.size()) {
LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
} else {
LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, tableNames.size(), databaseName);
}
}
} else {
LOG.info("No tables to import in database {}", databaseName);
}
return tablesImported;
}
@VisibleForTesting
public int importTable(AtlasEntity dbEntity, String databaseName, String tableName, final boolean failOnError) throws Exception {
try {
Table table = hiveClient.getTable(databaseName, tableName);
AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity, table);
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
String processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table);
AtlasEntityWithExtInfo processEntity = findProcessEntity(processQualifiedName);
if (processEntity == null) {
String tableLocationString = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
Path location = table.getDataLocation();
String query = getCreateTableString(table, tableLocationString);
PathExtractorContext pathExtractorCtx = new PathExtractorContext(getMetadataNamespace(), isConvertHdfsPathToLowerCase(), awsS3AtlasModelVersion);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(location, pathExtractorCtx);
AtlasEntity pathInst = entityWithExtInfo.getEntity();
AtlasEntity tableInst = tableEntity.getEntity();
AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
long now = System.currentTimeMillis();
processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
processInst.setAttribute(ATTRIBUTE_NAME, query);
processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS)));
processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
String userName = table.getOwner();
if (StringUtils.isEmpty(userName)) {
userName = ApplicationProperties.get().getString(HIVE_USERNAME, "hive");
}
processInst.setAttribute(ATTRIBUTE_USER_NAME, userName);
processInst.setAttribute(ATTRIBUTE_START_TIME, now);
processInst.setAttribute(ATTRIBUTE_END_TIME, now);
processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE");
processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query);
processInst.setAttribute(ATTRIBUTE_QUERY_ID, query);
processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}");
processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query));
AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
createTableProcess.addEntity(processInst);
if (pathExtractorCtx.getKnownEntities() != null) {
pathExtractorCtx.getKnownEntities().values().forEach(entity -> createTableProcess.addEntity(entity));
} else {
createTableProcess.addEntity(pathInst);
}
registerInstances(createTableProcess);
} else {
LOG.info("Process {} is already registered", processQualifiedName);
}
}
return 1;
} catch (Exception e) {
LOG.error("Import failed for hive_table {}", tableName, e);
if (failOnError) {
throw e;
}
return 0;
}
}
/**
* Checks if db is already registered, else creates and registers db entity
* @param databaseName
* @return
* @throws Exception
*/
private AtlasEntityWithExtInfo registerDatabase(String databaseName) throws Exception {
AtlasEntityWithExtInfo ret = null;
Database db = hiveClient.getDatabase(databaseName);
if (db != null) {
ret = findDatabase(metadataNamespace, databaseName);
if (ret == null) {
ret = registerInstance(new AtlasEntityWithExtInfo(toDbEntity(db)));
} else {
LOG.info("Database {} is already registered - id={}. Updating it.", databaseName, ret.getEntity().getGuid());
ret.setEntity(toDbEntity(db, ret.getEntity()));
updateInstance(ret);
}
}
return ret;
}
private AtlasEntityWithExtInfo registerTable(AtlasEntity dbEntity, Table table) throws AtlasHookException {
try {
AtlasEntityWithExtInfo ret;
AtlasEntityWithExtInfo tableEntity = findTableEntity(table);
if (tableEntity == null) {
tableEntity = toTableEntity(dbEntity, table);
ret = registerInstance(tableEntity);
} else {
LOG.info("Table {}.{} is already registered with id {}. Updating entity.", table.getDbName(), table.getTableName(), tableEntity.getEntity().getGuid());
ret = toTableEntity(dbEntity, table, tableEntity);
updateInstance(ret);
}
return ret;
} catch (Exception e) {
throw new AtlasHookException("HiveMetaStoreBridge.registerTable() failed.", e);
}
}
/**
* Registers an entity in atlas
* @param entity
* @return
* @throws Exception
*/
private AtlasEntityWithExtInfo registerInstance(AtlasEntityWithExtInfo entity) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), entity);
}
AtlasEntityWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.createEntity(entity);
List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
if (CollectionUtils.isNotEmpty(createdEntities)) {
for (AtlasEntityHeader createdEntity : createdEntities) {
if (ret == null) {
ret = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid());
} else if (ret.getEntity(createdEntity.getGuid()) == null) {
AtlasEntityWithExtInfo newEntity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
ret.addReferredEntity(newEntity.getEntity());
if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
for (Map.Entry<String, AtlasEntity> entry : newEntity.getReferredEntities().entrySet()) {
ret.addReferredEntity(entry.getKey(), entry.getValue());
}
}
LOG.info("Created {} entity: name={}, guid={}", newEntity.getEntity().getTypeName(), newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), newEntity.getEntity().getGuid());
}
}
}
clearRelationshipAttributes(ret);
return ret;
}
/**
* Registers an entity in atlas
* @param entities
* @return
* @throws Exception
*/
private AtlasEntitiesWithExtInfo registerInstances(AtlasEntitiesWithExtInfo entities) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("creating {} entities: {}", entities.getEntities().size(), entities);
}
AtlasEntitiesWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.createEntities(entities);
List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
if (CollectionUtils.isNotEmpty(createdEntities)) {
ret = new AtlasEntitiesWithExtInfo();
for (AtlasEntityHeader createdEntity : createdEntities) {
AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
ret.addEntity(entity.getEntity());
if (MapUtils.isNotEmpty(entity.getReferredEntities())) {
for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
ret.addReferredEntity(entry.getKey(), entry.getValue());
}
}
LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
}
}
clearRelationshipAttributes(ret);
return ret;
}
private void updateInstance(AtlasEntityWithExtInfo entity) throws AtlasServiceException {
if (LOG.isDebugEnabled()) {
LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity);
}
atlasClientV2.updateEntity(entity);
LOG.info("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
}
/**
* Create a Hive Database entity
* @param hiveDB The Hive {@link Database} object from which to map properties
* @return new Hive Database AtlasEntity
* @throws HiveException
*/
private AtlasEntity toDbEntity(Database hiveDB) throws HiveException {
return toDbEntity(hiveDB, null);
}
private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) {
if (dbEntity == null) {
dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
}
String dbName = getDatabaseName(hiveDB);
dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(metadataNamespace, dbName));
dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
if (hiveDB.getOwnerType() != null) {
dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue()));
}
return dbEntity;
}
public static String getDatabaseName(Database hiveDB) {
String dbName = hiveDB.getName().toLowerCase();
String catalogName = hiveDB.getCatalogName() != null ? hiveDB.getCatalogName().toLowerCase() : null;
if (StringUtils.isNotEmpty(catalogName) && !StringUtils.equals(catalogName, DEFAULT_METASTORE_CATALOG)) {
dbName = catalogName + SEP + dbName;
}
return dbName;
}
/**
* Create a new table instance in Atlas
* @param database AtlasEntity for Hive {@link AtlasEntity} to which this table belongs
* @param hiveTable reference to the Hive {@link Table} from which to map properties
* @return Newly created Hive AtlasEntity
* @throws Exception
*/
private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, Table hiveTable) throws AtlasHookException {
return toTableEntity(database, hiveTable, null);
}
private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable, AtlasEntityWithExtInfo table) throws AtlasHookException {
if (table == null) {
table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()));
}
AtlasEntity tableEntity = table.getEntity();
String tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable);
long createTime = BaseHiveEvent.getTableCreateTime(hiveTable);
long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention());
tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters());
tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT));
tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name());
tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary());
if (hiveTable.getViewOriginalText() != null) {
tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText());
}
if (hiveTable.getViewExpandedText() != null) {
tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
}
AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), AtlasTypeUtil.getObjectId(tableEntity));
List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity, RELATIONSHIP_HIVE_TABLE_PART_KEYS);
List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity, RELATIONSHIP_HIVE_TABLE_COLUMNS);
tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, AtlasTypeUtil.getAtlasRelatedObjectIds(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS, AtlasTypeUtil.getAtlasRelatedObjectIds(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
table.addReferredEntity(database);
table.addReferredEntity(sdEntity);
if (partKeys != null) {
for (AtlasEntity partKey : partKeys) {
table.addReferredEntity(partKey);
}
}
if (columns != null) {
for (AtlasEntity column : columns) {
table.addReferredEntity(column);
}
}
table.setEntity(tableEntity);
return table;
}
private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException {
AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
ret.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets());
ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories());
if (storageDesc.getBucketCols().size() > 0) {
ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols());
}
if (storageDesc.getSerdeInfo() != null) {
SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
LOG.debug("serdeInfo = {}", serdeInfo);
// SkewedInfo skewedInfo = storageDesc.getSkewedInfo();
AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName());
serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName());
serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib());
serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters());
ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct);
}
if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) {
List<AtlasStruct> sortColsStruct = new ArrayList<>();
for (Order sortcol : storageDesc.getSortCols()) {
String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
AtlasStruct colStruct = new AtlasStruct(hiveOrderName);
colStruct.setAttribute("col", sortcol.getCol());
colStruct.setAttribute("order", sortcol.getOrder());
sortColsStruct.add(colStruct);
}
ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct);
}
return ret;
}
private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) throws AtlasHookException {
List<AtlasEntity> ret = new ArrayList<>();
int columnPosition = 0;
for (FieldSchema fs : schemaList) {
LOG.debug("Processing field {}", fs);
AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
column.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, relationshipType));
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName()));
column.setAttribute(ATTRIBUTE_NAME, fs.getName());
column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType());
column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++);
column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment());
ret.add(column);
}
return ret;
}
/**
* Gets the atlas entity for the database
* @param databaseName database Name
* @param metadataNamespace cluster name
* @return AtlasEntity for database if exists, else null
* @throws Exception
*/
private AtlasEntityWithExtInfo findDatabase(String metadataNamespace, String databaseName) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Searching Atlas for database {}", databaseName);
}
String typeName = HiveDataTypes.HIVE_DB.getName();
return findEntity(typeName, getDBQualifiedName(metadataNamespace, databaseName), true, true);
}
/**
* Gets Atlas Entity for the table
*
* @param hiveTable
* @return table entity from Atlas if exists, else null
* @throws Exception
*/
private AtlasEntityWithExtInfo findTableEntity(Table hiveTable) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Searching Atlas for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
}
String typeName = HiveDataTypes.HIVE_TABLE.getName();
String tblQualifiedName = getTableQualifiedName(getMetadataNamespace(), hiveTable.getDbName(), hiveTable.getTableName());
return findEntity(typeName, tblQualifiedName, true, true);
}
private AtlasEntityWithExtInfo findProcessEntity(String qualifiedName) throws Exception{
if (LOG.isDebugEnabled()) {
LOG.debug("Searching Atlas for process {}", qualifiedName);
}
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
return findEntity(typeName, qualifiedName , true , true);
}
private AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName , boolean minExtInfo, boolean ignoreRelationship) throws AtlasServiceException {
AtlasEntityWithExtInfo ret = null;
try {
ret = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo, ignoreRelationship);
} catch (AtlasServiceException e) {
if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
return null;
}
throw e;
}
return ret;
}
private String getCreateTableString(Table table, String location){
String colString = "";
List<FieldSchema> colList = table.getAllCols();
if (colList != null) {
for (FieldSchema col : colList) {
colString += col.getName() + " " + col.getType() + ",";
}
if (colList.size() > 0) {
colString = colString.substring(0, colString.length() - 1);
colString = "(" + colString + ")";
}
}
String query = "create external table " + table.getTableName() + colString + " location '" + location + "'";
return query;
}
private String lower(String str) {
if (StringUtils.isEmpty(str)) {
return "";
}
return str.toLowerCase().trim();
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
* @param table hive table for which the qualified name is needed
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private static String getTableQualifiedName(String metadataNamespace, Table table) {
return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary());
}
private String getHdfsPathQualifiedName(String hdfsPath) {
return String.format("%s@%s", hdfsPath, metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Database instance in Atlas.
* @param metadataNamespace Name of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database
* @return Unique qualified name to identify the Database instance in Atlas.
*/
public static String getDBQualifiedName(String metadataNamespace, String dbName) {
return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param metadataNamespace Name of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database to which the Table belongs
* @param tableName Name of the Hive table
* @param isTemporaryTable is this a temporary table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) {
String tableTempName = tableName;
if (isTemporaryTable) {
if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
} else {
tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
}
}
return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace);
}
public static String getTableProcessQualifiedName(String metadataNamespace, Table table) {
String tableQualifiedName = getTableQualifiedName(metadataNamespace, table);
long createdTime = getTableCreatedTime(table);
return tableQualifiedName + SEP + createdTime;
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database to which the Table belongs
* @param tableName Name of the Hive table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName) {
return getTableQualifiedName(metadataNamespace, dbName, tableName, false);
}
public static String getStorageDescQFName(String tableQualifiedName) {
return tableQualifiedName + "_storage";
}
public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
final String[] parts = tableQualifiedName.split("@");
final String tableName = parts[0];
final String metadataNamespace = parts[1];
return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace);
}
public static long getTableCreatedTime(Table table) {
return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
}
private void clearRelationshipAttributes(AtlasEntitiesWithExtInfo entities) {
if (entities != null) {
if (entities.getEntities() != null) {
for (AtlasEntity entity : entities.getEntities()) {
clearRelationshipAttributes(entity);;
}
}
if (entities.getReferredEntities() != null) {
clearRelationshipAttributes(entities.getReferredEntities().values());
}
}
}
private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
if (entity != null) {
clearRelationshipAttributes(entity.getEntity());
if (entity.getReferredEntities() != null) {
clearRelationshipAttributes(entity.getReferredEntities().values());
}
}
}
private void clearRelationshipAttributes(Collection<AtlasEntity> entities) {
if (entities != null) {
for (AtlasEntity entity : entities) {
clearRelationshipAttributes(entity);
}
}
}
private void clearRelationshipAttributes(AtlasEntity entity) {
if (entity != null && entity.getRelationshipAttributes() != null) {
entity.getRelationshipAttributes().clear();
}
}
private boolean isTableWithDatabaseName(String tableName) {
boolean ret = false;
if (tableName.contains(".")) {
ret = true;
}
return ret;
}
private List<AtlasEntityHeader> getAllDatabaseInCluster() throws AtlasServiceException {
List<AtlasEntityHeader> entities = new ArrayList<>();
final int pageSize = pageLimit;
SearchParameters.FilterCriteria fc = new SearchParameters.FilterCriteria();
fc.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
fc.setAttributeValue(metadataNamespace);
fc.setOperator(SearchParameters.Operator.EQ);
for (int i = 0; ; i++) {
int offset = pageSize * i;
LOG.info("Retrieving databases: offset={}, pageSize={}", offset, pageSize);
AtlasSearchResult searchResult = atlasClientV2.basicSearch(HIVE_TYPE_DB, fc,null, null, true, pageSize, offset);
List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
int dbCount = entityHeaders == null ? 0 : entityHeaders.size();
LOG.info("Retrieved {} databases of {} cluster", dbCount, metadataNamespace);
if (dbCount > 0) {
entities.addAll(entityHeaders);
}
if (dbCount < pageSize) { // last page
break;
}
}
return entities;
}
private List<AtlasEntityHeader> getAllTablesInDb(String databaseGuid) throws AtlasServiceException {
List<AtlasEntityHeader> entities = new ArrayList<>();
final int pageSize = pageLimit;
for (int i = 0; ; i++) {
int offset = pageSize * i;
LOG.info("Retrieving tables: offset={}, pageSize={}", offset, pageSize);
AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(databaseGuid, HIVE_TABLE_DB_EDGE_LABEL, null, null, true, pageSize, offset);
List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
int tableCount = entityHeaders == null ? 0 : entityHeaders.size();
LOG.info("Retrieved {} tables of {} database", tableCount, databaseGuid);
if (tableCount > 0) {
entities.addAll(entityHeaders);
}
if (tableCount < pageSize) { // last page
break;
}
}
return entities;
}
public String getHiveDatabaseName(String qualifiedName) {
if (StringUtils.isNotEmpty(qualifiedName)) {
String[] split = qualifiedName.split("@");
if (split.length > 0) {
return split[0];
}
}
return null;
}
public String getHiveTableName(String qualifiedName, boolean isTemporary) {
if (StringUtils.isNotEmpty(qualifiedName)) {
String tableName = StringUtils.substringBetween(qualifiedName, ".", "@");
if (!isTemporary) {
return tableName;
} else {
if (StringUtils.isNotEmpty(tableName)) {
String[] splitTemp = tableName.split(TEMP_TABLE_PREFIX);
if (splitTemp.length > 0) {
return splitTemp[0];
}
}
}
}
return null;
}
private void deleteByGuid(List<String> guidTodelete) throws AtlasServiceException {
if (CollectionUtils.isNotEmpty(guidTodelete)) {
for (String guid : guidTodelete) {
EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid);
if (response.getDeletedEntities().size() < 1) {
LOG.info("Entity with guid : {} is not deleted", guid);
} else {
LOG.info("Entity with guid : {} is deleted", guid);
}
}
} else {
LOG.info("No Entity to delete from Atlas");
}
}
public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError) throws Exception {
//fetch databases from Atlas
List<AtlasEntityHeader> dbs = null;
try {
dbs = getAllDatabaseInCluster();
LOG.info("Total Databases in cluster {} : {} ", metadataNamespace, dbs.size());
} catch (AtlasServiceException e) {
LOG.error("Failed to retrieve database entities for cluster {} from Atlas", metadataNamespace, e);
if (failOnError) {
throw e;
}
}
if (CollectionUtils.isNotEmpty(dbs)) {
//iterate all dbs to check if exists in hive
for (AtlasEntityHeader db : dbs) {
String dbGuid = db.getGuid();
String hiveDbName = getHiveDatabaseName((String) db.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
if (StringUtils.isEmpty(hiveDbName)) {
LOG.error("Failed to get database from qualifiedName: {}, guid: {} ", db.getAttribute(ATTRIBUTE_QUALIFIED_NAME), dbGuid);
continue;
}
List<AtlasEntityHeader> tables;
try {
tables = getAllTablesInDb(dbGuid);
LOG.info("Total Tables in database {} : {} ", hiveDbName, tables.size());
} catch (AtlasServiceException e) {
LOG.error("Failed to retrieve table entities for database {} from Atlas", hiveDbName, e);
if (failOnError) {
throw e;
}
continue;
}
List<String> guidsToDelete = new ArrayList<>();
if (!hiveClient.databaseExists(hiveDbName)) {
//table guids
if (CollectionUtils.isNotEmpty(tables)) {
for (AtlasEntityHeader table : tables) {
guidsToDelete.add(table.getGuid());
}
}
//db guid
guidsToDelete.add(db.getGuid());
LOG.info("Added database {}.{} and its {} tables to delete", metadataNamespace, hiveDbName, tables.size());
} else {
//iterate all table of db to check if it exists
if (CollectionUtils.isNotEmpty(tables)) {
for (AtlasEntityHeader table : tables) {
String hiveTableName = getHiveTableName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), true);
if (StringUtils.isEmpty(hiveTableName)) {
LOG.error("Failed to get table from qualifiedName: {}, guid: {} ", table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), table.getGuid());
continue;
}
try {
hiveClient.getTable(hiveDbName, hiveTableName, true);
} catch (InvalidTableException e) { //table doesn't exists
LOG.info("Added table {}.{} to delete", hiveDbName, hiveTableName);
guidsToDelete.add(table.getGuid());
} catch (HiveException e) {
LOG.error("Failed to get table {}.{} from Hive", hiveDbName, hiveTableName, e);
if (failOnError) {
throw e;
}
}
}
}
}
//delete entities
if (CollectionUtils.isNotEmpty(guidsToDelete)) {
try {
deleteByGuid(guidsToDelete);
} catch (AtlasServiceException e) {
LOG.error("Failed to delete Atlas entities for database {}", hiveDbName, e);
if (failOnError) {
throw e;
}
}
}
}
} else {
LOG.info("No database found in service.");
}
}
}