blob: 0627c0e095c7845681bb4aa4b8a3a3a497e2ddc2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.bridge;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.atlas.utils.AtlasConfigurationUtil;
import org.apache.atlas.utils.PathExtractorContext;
import org.apache.atlas.utils.LruCache;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
/**
* A Bridge Utility that imports metadata into zip file from the Hive Meta Store
* which can be exported at Atlas
*/
public class HiveMetaStoreBridgeV2 {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridgeV2.class);
private static final String OPTION_DATABASE_SHORT = "d";
private static final String OPTION_TABLE_SHORT = "t";
private static final String OPTION_IMPORT_DATA_FILE_SHORT = "f";
private static final String OPTION_OUTPUT_FILEPATH_SHORT = "o";
private static final String OPTION_IGNORE_BULK_IMPORT_SHORT = "i";
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version";
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String HIVE_USERNAME = "atlas.hook.hive.default.username";
public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String TEMP_TABLE_PREFIX = "_temp-";
public static final String SEP = ":".intern();
public static final String DEFAULT_METASTORE_CATALOG = "hive";
public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit";
private static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
private static final String ZIP_FILE_COMMENT_FORMAT = "{\"entitiesCount\":%d, \"total\":%d}";
private static final int DEFAULT_PAGE_LIMIT = 10000;
private static final String DEFAULT_ZIP_FILE_NAME = "import-hive-output.zip";
private static final String ZIP_ENTRY_ENTITIES = "entities.json";
private static final String TYPES_DEF_JSON = "atlas-typesdef.json";
private static final String JSON_ARRAY_START = "[";
private static final String JSON_COMMA = ",";
private static final String JSON_EMPTY_OBJECT = "{}";
private static final String JSON_ARRAY_END = "]";
private static int pageLimit = DEFAULT_PAGE_LIMIT;
private String awsS3AtlasModelVersion = null;
private final String metadataNamespace;
private final Hive hiveClient;
private final AtlasClientV2 atlasClientV2;
private final boolean convertHdfsPathToLowerCase;
private ZipOutputStream zipOutputStream;
private String outZipFileName;
private int totalProcessedEntities = 0;
private final Map<String, AtlasEntityWithExtInfo> entityLRUCache = new LruCache<>(10000, 0);
private final Map<Table, AtlasEntity> hiveTablesAndAtlasEntity = new HashMap<>();
private final Map<String, AtlasEntity> dbEntities = new HashMap<>();
private final List<Map<String, String>> databaseAndTableListToImport = new ArrayList<>();
private final Map<String, String> qualifiedNameGuidMap = new HashMap<>();
/**
* Construct a HiveMetaStoreBridgeV2.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
public HiveMetaStoreBridgeV2(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
this.metadataNamespace = getMetadataNamespace(atlasProperties);
this.hiveClient = Hive.get(hiveConf);
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
if (atlasProperties != null) {
pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, DEFAULT_PAGE_LIMIT);
}
}
public boolean exportDataToZipAndRunAtlasImport(CommandLine cmd) throws MissingArgumentException, IOException, HiveException, AtlasBaseException {
boolean ret = true;
boolean failOnError = cmd.hasOption("failOnError");
String databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT);
String tableToImport = cmd.getOptionValue(OPTION_TABLE_SHORT);
String importDataFile = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT);
String outputFileOrPath = cmd.getOptionValue(OPTION_OUTPUT_FILEPATH_SHORT);
boolean ignoreBulkImport = cmd.hasOption(OPTION_IGNORE_BULK_IMPORT_SHORT);
validateOutputFileOrPath(outputFileOrPath);
try {
initializeZipStream();
if (isValidImportDataFile(importDataFile)) {
File f = new File(importDataFile);
BufferedReader br = new BufferedReader(new FileReader(f));
String line = null;
while ((line = br.readLine()) != null) {
String val[] = line.split(":");
if (ArrayUtils.isNotEmpty(val)) {
databaseToImport = val[0];
if (val.length > 1) {
tableToImport = val[1];
} else {
tableToImport = "";
}
importHiveDatabases(databaseToImport, tableToImport, failOnError);
}
}
} else {
importHiveDatabases(databaseToImport, tableToImport, failOnError);
}
importHiveTables(failOnError);
importHiveColumns(failOnError);
} finally {
endWritingAndZipStream();
}
if (!ignoreBulkImport) {
runAtlasImport();
}
return ret;
}
private void validateOutputFileOrPath(String outputFileOrPath) throws MissingArgumentException {
if (StringUtils.isBlank(outputFileOrPath)) {
throw new MissingArgumentException("Output Path/File can't be empty");
}
File fileOrDirToImport = new File(outputFileOrPath);
if (fileOrDirToImport.exists()) {
if (fileOrDirToImport.isDirectory()) {
this.outZipFileName = outputFileOrPath + File.separator + DEFAULT_ZIP_FILE_NAME;
LOG.info("The default output zip file {} will be created at {}", DEFAULT_ZIP_FILE_NAME, outputFileOrPath);
} else {
throw new MissingArgumentException("output file: " + outputFileOrPath + " already present");
}
} else if (fileOrDirToImport.getParentFile().isDirectory() && outputFileOrPath.endsWith(".zip")) {
LOG.info("The mentioned output zip file {} will be created", outputFileOrPath);
this.outZipFileName = outputFileOrPath;
} else {
throw new MissingArgumentException("Invalid File/Path");
}
}
private boolean isValidImportDataFile(String importDataFile) throws MissingArgumentException {
boolean ret = false;
if (StringUtils.isNotBlank(importDataFile)) {
File dataFile = new File(importDataFile);
if (!dataFile.exists() || !dataFile.canRead()) {
throw new MissingArgumentException("Invalid import data file");
}
ret = true;
}
return ret;
}
private void initializeZipStream() throws IOException, AtlasBaseException {
this.zipOutputStream = new ZipOutputStream(getOutputStream(this.outZipFileName));
storeTypesDefToZip(new AtlasTypesDef());
startWritingEntitiesToZip();
}
private void storeTypesDefToZip(AtlasTypesDef typesDef) throws AtlasBaseException {
String jsonData = AtlasType.toJson(typesDef);
saveToZip(TYPES_DEF_JSON, jsonData);
}
private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
try {
ZipEntry e = new ZipEntry(fileName);
zipOutputStream.putNextEntry(e);
writeBytes(jsonData);
zipOutputStream.closeEntry();
} catch (IOException e) {
throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
}
}
private void startWritingEntitiesToZip() throws IOException {
zipOutputStream.putNextEntry(new ZipEntry(ZIP_ENTRY_ENTITIES));
writeBytes(JSON_ARRAY_START);
}
private String getDatabaseToImport(String TableWithDatabase) {
String ret = null;
String val[] = TableWithDatabase.split("\\.");
if (val.length > 1) {
ret = val[0];
}
return ret;
}
private String getTableToImport(String TableWithDatabase) {
String ret = null;
String val[] = TableWithDatabase.split("\\.");
if (val.length > 1) {
ret = val[1];
}
return ret;
}
private void importHiveDatabases(String databaseToImport, String tableWithDatabaseToImport, boolean failOnError) throws HiveException, AtlasBaseException {
LOG.info("Importing Hive Databases");
List<String> databaseNames = null;
if (StringUtils.isEmpty(databaseToImport) && StringUtils.isNotEmpty(tableWithDatabaseToImport)) {
if (isTableWithDatabaseName(tableWithDatabaseToImport)) {
databaseToImport = getDatabaseToImport(tableWithDatabaseToImport);
tableWithDatabaseToImport = getTableToImport(tableWithDatabaseToImport);
}
}
if (StringUtils.isEmpty(databaseToImport)) {
//when database to import is empty, import all
databaseNames = hiveClient.getAllDatabases();
} else {
//when database to import has some value then, import that db and all table under it.
databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
}
if (!CollectionUtils.isEmpty(databaseNames)) {
LOG.info("Found {} databases", databaseNames.size());
for (String databaseName : databaseNames) {
try {
if (!dbEntities.containsKey(databaseName)) {
LOG.info("Importing Hive Database {}", databaseName);
AtlasEntityWithExtInfo dbEntity = writeDatabase(databaseName);
if (dbEntity != null) {
dbEntities.put(databaseName, dbEntity.getEntity());
}
}
databaseAndTableListToImport.add(Collections.singletonMap(databaseName, tableWithDatabaseToImport));
} catch (IOException e) {
LOG.error("Import failed for hive database {}", databaseName, e);
if (failOnError) {
throw new AtlasBaseException(e.getMessage(), e);
}
}
}
} else {
LOG.error("No database found");
if (failOnError) {
throw new AtlasBaseException("No database found");
}
}
}
private void writeEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws IOException {
if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
Iterator<Map.Entry<String, AtlasEntity>> itr = entityWithExtInfo.getReferredEntities().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, AtlasEntity> eachEntity = itr.next();
if (eachEntity.getValue().getTypeName().equalsIgnoreCase(HiveDataTypes.HIVE_DB.getName())) {
itr.remove();
}
}
}
if (!entityLRUCache.containsKey(entityWithExtInfo.getEntity().getGuid())) {
entityLRUCache.put(entityWithExtInfo.getEntity().getGuid(), entityWithExtInfo);
writeBytes(AtlasType.toJson(entityWithExtInfo) + JSON_COMMA);
}
totalProcessedEntities++;
}
private void endWritingAndZipStream() throws IOException {
writeBytes(JSON_EMPTY_OBJECT);
writeBytes(JSON_ARRAY_END);
setStreamSize(totalProcessedEntities);
close();
}
private void flush() {
try {
zipOutputStream.flush();
} catch (IOException e) {
LOG.error("Error: Flush: ", e);
}
}
private void close() throws IOException {
zipOutputStream.flush();
zipOutputStream.closeEntry();
zipOutputStream.close();
}
private void writeBytes(String payload) throws IOException {
zipOutputStream.write(payload.getBytes());
}
private OutputStream getOutputStream(String fileToWrite) throws IOException {
return FileUtils.openOutputStream(new File(fileToWrite));
}
public String getMetadataNamespace(Configuration config) {
return AtlasConfigurationUtil.getRecentString(config, HIVE_METADATA_NAMESPACE, getClusterName(config));
}
private String getClusterName(Configuration config) {
return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
}
public String getMetadataNamespace() {
return metadataNamespace;
}
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
/**
* Imports Hive tables if databaseAndTableListToImport is populated
* @param failOnError
* @throws Exception
*/
public void importHiveTables(boolean failOnError) throws HiveException, AtlasBaseException {
LOG.info("Importing Hive Tables");
int tablesImported = 0;
if (CollectionUtils.isNotEmpty(databaseAndTableListToImport) && MapUtils.isNotEmpty(dbEntities)) {
for (Map<String, String> eachEntry : databaseAndTableListToImport) {
final List<Table> tableObjects;
String databaseName = eachEntry.keySet().iterator().next();
if (StringUtils.isEmpty(eachEntry.values().iterator().next())) {
tableObjects = hiveClient.getAllTableObjects(databaseName);
populateQualifiedNameGuidMap(HiveDataTypes.HIVE_DB.getName(), (String) dbEntities.get(databaseName).getAttribute(ATTRIBUTE_QUALIFIED_NAME));
} else {
List<String> tableNames = hiveClient.getTablesByPattern(databaseName, eachEntry.values().iterator().next());
tableObjects = new ArrayList<>();
for (String tableName : tableNames) {
Table table = hiveClient.getTable(databaseName, tableName);
tableObjects.add(table);
populateQualifiedNameGuidMap(HiveDataTypes.HIVE_TABLE.getName(), getTableQualifiedName(metadataNamespace, table));
}
}
if (!CollectionUtils.isEmpty(tableObjects)) {
LOG.info("Found {} tables to import in database {}", tableObjects.size(), databaseName);
try {
for (Table table : tableObjects) {
int imported = importTable(dbEntities.get(databaseName), table, failOnError);
tablesImported += imported;
}
} finally {
if (tablesImported == tableObjects.size()) {
LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
} else {
LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import",
tablesImported, tableObjects.size(), databaseName);
}
}
} else {
LOG.error("No tables to import in database {}", databaseName);
if (failOnError) {
throw new AtlasBaseException("No tables to import in database - " + databaseName);
}
}
}
}
dbEntities.clear();
}
private void populateQualifiedNameGuidMap(String typeName, String qualifiedName) {
try {
AtlasEntitiesWithExtInfo entitiesWithExtInfo = atlasClientV2.getEntitiesByAttribute(typeName, Collections.singletonList(Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)), true, false);
if (entitiesWithExtInfo != null && entitiesWithExtInfo.getEntities() != null) {
for (AtlasEntity entity : entitiesWithExtInfo.getEntities()) {
qualifiedNameGuidMap.put((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getGuid());
for(Map.Entry<String, AtlasEntity> eachEntry : entitiesWithExtInfo.getReferredEntities().entrySet()) {
qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey());
}
if (typeName.equals(HiveDataTypes.HIVE_DB.getName())) {
for (String eachRelatedGuid : getAllRelatedGuids(entity)) {
AtlasEntityWithExtInfo relatedEntity = atlasClientV2.getEntityByGuid(eachRelatedGuid, true, false);
qualifiedNameGuidMap.put((String) relatedEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), relatedEntity.getEntity().getGuid());
for (Map.Entry<String, AtlasEntity> eachEntry : relatedEntity.getReferredEntities().entrySet()) {
qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey());
}
}
}
}
}
} catch (AtlasServiceException e) {
LOG.info("Unable to load the related entities for type {} and qualified name {} from Atlas", typeName, qualifiedName, e);
}
}
private Set<String> getAllRelatedGuids(AtlasEntity entity) {
Set<String> relGuidsSet = new HashSet<>();
for (Object o : entity.getRelationshipAttributes().values()) {
if (o instanceof AtlasObjectId) {
relGuidsSet.add(((AtlasObjectId) o).getGuid());
} else if (o instanceof List) {
for (Object id : (List) o) {
if (id instanceof AtlasObjectId) {
relGuidsSet.add(((AtlasObjectId) id).getGuid());
}
if (id instanceof Map) {
relGuidsSet.add((String) ((Map) id).get("guid"));
}
}
}
}
return relGuidsSet;
}
public void importHiveColumns(boolean failOnError) throws AtlasBaseException {
LOG.info("Importing Hive Columns");
if (MapUtils.isEmpty(hiveTablesAndAtlasEntity)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No hive table present to import columns");
}
return;
}
for (Map.Entry<Table, AtlasEntity> eachTable : hiveTablesAndAtlasEntity.entrySet()) {
int columnsImported = 0;
List<AtlasEntity> columnEntities = new ArrayList<>();
try {
List<AtlasEntity> partKeys = toColumns(eachTable.getKey().getPartitionKeys(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_PART_KEYS);
List<AtlasEntity> columns = toColumns(eachTable.getKey().getCols(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_COLUMNS);
partKeys.stream().collect(Collectors.toCollection(() -> columnEntities));
columns.stream().collect(Collectors.toCollection(() -> columnEntities));
for (AtlasEntity eachColumnEntity : columnEntities) {
writeEntityToZip(new AtlasEntityWithExtInfo(eachColumnEntity));
columnsImported++;
}
} catch (IOException e) {
LOG.error("Column Import failed for hive table {}", eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), e);
if (failOnError) {
throw new AtlasBaseException(e.getMessage(), e);
}
} finally {
if (columnsImported == columnEntities.size()) {
LOG.info("Successfully imported {} columns for table {}", columnsImported, eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
} else {
LOG.error("Imported {} of {} columns for table {}. Please check logs for errors during import", columnsImported, columnEntities.size(), eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
}
}
}
private void runAtlasImport() {
AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.UPDATE_TYPE_DEFINITION_KEY, "false");
request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
try {
AtlasImportResult importResult = atlasClientV2.importData(request, this.outZipFileName);
if (importResult.getOperationStatus() == AtlasImportResult.OperationStatus.SUCCESS) {
LOG.info("Successfully imported the zip file {} at Atlas and imported {} entities. Number of entities to be imported {}.", this.outZipFileName, importResult.getProcessedEntities().size(), totalProcessedEntities);
} else {
LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities);
}
} catch (AtlasServiceException e) {
LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities, e);
}
}
public int importTable(AtlasEntity dbEntity, Table table, final boolean failOnError) throws AtlasBaseException {
try {
AtlasEntityWithExtInfo tableEntity = writeTable(dbEntity, table);
hiveTablesAndAtlasEntity.put(table, tableEntity.getEntity());
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
String processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table);
String tableLocationString = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
Path location = table.getDataLocation();
String query = getCreateTableString(table, tableLocationString);
PathExtractorContext pathExtractorCtx = new PathExtractorContext(getMetadataNamespace(), isConvertHdfsPathToLowerCase(), awsS3AtlasModelVersion);
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(location, pathExtractorCtx);
AtlasEntity pathInst = entityWithExtInfo.getEntity();
AtlasEntity tableInst = tableEntity.getEntity();
AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
long now = System.currentTimeMillis();
processInst.setGuid(getGuid(processQualifiedName));
processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
processInst.setAttribute(ATTRIBUTE_NAME, query);
processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS)));
processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
String userName = table.getOwner();
if (StringUtils.isEmpty(userName)) {
userName = ApplicationProperties.get().getString(HIVE_USERNAME, "hive");
}
processInst.setAttribute(ATTRIBUTE_USER_NAME, userName);
processInst.setAttribute(ATTRIBUTE_START_TIME, now);
processInst.setAttribute(ATTRIBUTE_END_TIME, now);
processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE");
processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query);
processInst.setAttribute(ATTRIBUTE_QUERY_ID, query);
processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}");
processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query));
AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
createTableProcess.addEntity(processInst);
if (pathExtractorCtx.getKnownEntities() != null) {
pathExtractorCtx.getKnownEntities().values().forEach(entity -> createTableProcess.addEntity(entity));
} else {
createTableProcess.addEntity(pathInst);
}
writeEntitiesToZip(createTableProcess);
}
return 1;
} catch (Exception e) {
LOG.error("Import failed for hive_table {}", table.getTableName(), e);
if (failOnError) {
throw new AtlasBaseException(e.getMessage(), e);
}
return 0;
}
}
/**
* Write db entity
* @param databaseName
* @return
* @throws Exception
*/
private AtlasEntityWithExtInfo writeDatabase(String databaseName) throws HiveException, IOException {
AtlasEntityWithExtInfo ret = null;
Database db = hiveClient.getDatabase(databaseName);
if (db != null) {
ret = new AtlasEntityWithExtInfo(toDbEntity(db));
writeEntityToZip(ret);
}
return ret;
}
private AtlasEntityWithExtInfo writeTable(AtlasEntity dbEntity, Table table) throws AtlasHookException {
try {
AtlasEntityWithExtInfo tableEntity = toTableEntity(dbEntity, table);
writeEntityToZip(tableEntity);
return tableEntity;
} catch (Exception e) {
throw new AtlasHookException("HiveMetaStoreBridgeV2.registerTable() failed.", e);
}
}
/**
* Write an entity to Zip file
* @param entity
* @return
* @throws Exception
*/
private void writeEntityToZip(AtlasEntityWithExtInfo entity) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing {} entity: {}", entity.getEntity().getTypeName(), entity);
}
writeEntity(entity);
clearRelationshipAttributes(entity.getEntity());
flush();
}
/**
* Registers an entity in atlas
* @param entities
* @return
* @throws Exception
*/
private void writeEntitiesToZip(AtlasEntitiesWithExtInfo entities) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing {} entities: {}", entities.getEntities().size(), entities);
}
for (AtlasEntity entity : entities.getEntities()) {
writeEntity(new AtlasEntityWithExtInfo(entity));
}
flush();
clearRelationshipAttributes(entities);
}
/**
* Create a Hive Database entity
* @param hiveDB The Hive {@link Database} object from which to map properties
* @return new Hive Database AtlasEntity
* @throws HiveException
*/
private AtlasEntity toDbEntity(Database hiveDB) {
return toDbEntity(hiveDB, null);
}
private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) {
if (dbEntity == null) {
dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
}
String dbName = getDatabaseName(hiveDB);
String qualifiedName = getDBQualifiedName(metadataNamespace, dbName);
dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
dbEntity.setGuid(getGuid(true, qualifiedName));
dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
if (hiveDB.getOwnerType() != null) {
dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue()));
}
return dbEntity;
}
private String getDBGuidFromAtlas(String dBQualifiedName) {
String guid = null;
try {
guid = atlasClientV2.getEntityHeaderByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dBQualifiedName)).getGuid();
} catch (AtlasServiceException e) {
LOG.warn("Failed to get DB guid from Atlas with qualified name {}", dBQualifiedName, e);
}
return guid;
}
public static String getDatabaseName(Database hiveDB) {
String dbName = hiveDB.getName().toLowerCase();
String catalogName = hiveDB.getCatalogName() != null ? hiveDB.getCatalogName().toLowerCase() : null;
if (StringUtils.isNotEmpty(catalogName) && !StringUtils.equals(catalogName, DEFAULT_METASTORE_CATALOG)) {
dbName = catalogName + SEP + dbName;
}
return dbName;
}
/**
* Create a new table instance in Atlas
* @param database AtlasEntity for Hive {@link AtlasEntity} to which this table belongs
* @param hiveTable reference to the Hive {@link Table} from which to map properties
* @return Newly created Hive AtlasEntity
* @throws Exception
*/
private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable) throws AtlasHookException {
AtlasEntityWithExtInfo table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()));
AtlasEntity tableEntity = table.getEntity();
String tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable);
long createTime = BaseHiveEvent.getTableCreateTime(hiveTable);
long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
tableEntity.setGuid(getGuid(tableQualifiedName));
tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention());
tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters());
tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT));
tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name());
tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary());
if (hiveTable.getViewOriginalText() != null) {
tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText());
}
if (hiveTable.getViewExpandedText() != null) {
tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
}
AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), getStorageDescQFName(tableQualifiedName), AtlasTypeUtil.getObjectId(tableEntity));
tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
table.addReferredEntity(database);
table.addReferredEntity(sdEntity);
table.setEntity(tableEntity);
return table;
}
private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String sdQualifiedName, AtlasObjectId tableId) {
AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
ret.setGuid(getGuid(sdQualifiedName));
ret.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets());
ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories());
if (storageDesc.getBucketCols().size() > 0) {
ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols());
}
if (storageDesc.getSerdeInfo() != null) {
SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
LOG.info("serdeInfo = {}", serdeInfo);
AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName());
serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName());
serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib());
serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters());
ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct);
}
if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) {
List<AtlasStruct> sortColsStruct = new ArrayList<>();
for (Order sortcol : storageDesc.getSortCols()) {
String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
AtlasStruct colStruct = new AtlasStruct(hiveOrderName);
colStruct.setAttribute("col", sortcol.getCol());
colStruct.setAttribute("order", sortcol.getOrder());
sortColsStruct.add(colStruct);
}
ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct);
}
return ret;
}
private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) {
List<AtlasEntity> ret = new ArrayList<>();
int columnPosition = 0;
for (FieldSchema fs : schemaList) {
LOG.debug("Processing field {}", fs);
AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
String columnQualifiedName = getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName());
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName);
column.setGuid(getGuid(columnQualifiedName));
column.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, relationshipType));
column.setAttribute(ATTRIBUTE_NAME, fs.getName());
column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType());
column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++);
column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment());
ret.add(column);
}
return ret;
}
private String getCreateTableString(Table table, String location){
String colString = "";
List<FieldSchema> colList = table.getAllCols();
if (colList != null) {
for (FieldSchema col : colList) {
colString += col.getName() + " " + col.getType() + ",";
}
if (colList.size() > 0) {
colString = colString.substring(0, colString.length() - 1);
colString = "(" + colString + ")";
}
}
String query = "create external table " + table.getTableName() + colString + " location '" + location + "'";
return query;
}
private String lower(String str) {
if (StringUtils.isEmpty(str)) {
return "";
}
return str.toLowerCase().trim();
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
* @param table hive table for which the qualified name is needed
* @return Unique qualified name to identify the Table instance in Atlas.
*/
private static String getTableQualifiedName(String metadataNamespace, Table table) {
return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary());
}
/**
* Construct the qualified name used to uniquely identify a Database instance in Atlas.
* @param metadataNamespace Name of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database
* @return Unique qualified name to identify the Database instance in Atlas.
*/
public static String getDBQualifiedName(String metadataNamespace, String dbName) {
return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param metadataNamespace Name of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database to which the Table belongs
* @param tableName Name of the Hive table
* @param isTemporaryTable is this a temporary table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) {
String tableTempName = tableName;
if (isTemporaryTable) {
if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
} else {
tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
}
}
return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace);
}
public static String getTableProcessQualifiedName(String metadataNamespace, Table table) {
String tableQualifiedName = getTableQualifiedName(metadataNamespace, table);
long createdTime = getTableCreatedTime(table);
return tableQualifiedName + SEP + createdTime;
}
public static String getStorageDescQFName(String tableQualifiedName) {
return tableQualifiedName + "_storage";
}
public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
final String[] parts = tableQualifiedName.split("@");
final String tableName = parts[0];
final String metadataNamespace = parts[1];
return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace);
}
public static long getTableCreatedTime(Table table) {
return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
}
private void clearRelationshipAttributes(AtlasEntitiesWithExtInfo entities) {
if (entities != null) {
if (entities.getEntities() != null) {
for (AtlasEntity entity : entities.getEntities()) {
clearRelationshipAttributes(entity);;
}
}
if (entities.getReferredEntities() != null) {
clearRelationshipAttributes(entities.getReferredEntities().values());
}
}
}
private void clearRelationshipAttributes(Collection<AtlasEntity> entities) {
if (entities != null) {
for (AtlasEntity entity : entities) {
clearRelationshipAttributes(entity);
}
}
}
private void clearRelationshipAttributes(AtlasEntity entity) {
if (entity != null && entity.getRelationshipAttributes() != null) {
entity.getRelationshipAttributes().clear();
}
}
private boolean isTableWithDatabaseName(String tableName) {
boolean ret = false;
if (tableName.contains(".")) {
ret = true;
}
return ret;
}
private String getGuid(String qualifiedName) {
return getGuid(false, qualifiedName);
}
private String getGuid(boolean isDBType, String qualifiedName) {
String guid = null;
if (qualifiedNameGuidMap.containsKey(qualifiedName)) {
guid = qualifiedNameGuidMap.get(qualifiedName);
} else if (isDBType) {
guid = getDBGuidFromAtlas(qualifiedName);
}
if (StringUtils.isBlank(guid)) {
guid = generateGuid();
}
return guid;
}
private String generateGuid() {
return UUID.randomUUID().toString();
}
public void setStreamSize(long size) {
zipOutputStream.setComment(String.format(ZIP_FILE_COMMENT_FORMAT, size, -1));
}
}