ATLAS-4424: Enhanced the Import hive utility to create export zip files and run bulk import
Signed-off-by: Sidharth Mishra <sidmishra@apache.org>
diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index c353937..ebe6976 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -140,24 +140,34 @@
-d) IMPORT_ARGS="$IMPORT_ARGS -d $1"; shift;;
-t) IMPORT_ARGS="$IMPORT_ARGS -t $1"; shift;;
-f) IMPORT_ARGS="$IMPORT_ARGS -f $1"; shift;;
+ -o) IMPORT_ARGS="$IMPORT_ARGS -o $1"; shift;;
+ -i) IMPORT_ARGS="$IMPORT_ARGS -i";;
+ -h) export HELP_OPTION="true"; IMPORT_ARGS="$IMPORT_ARGS -h";;
--database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;;
--table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;;
--filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;;
+ --output) IMPORT_ARGS="$IMPORT_ARGS --output $1"; shift;;
+ --ignoreBulkImport) IMPORT_ARGS="$IMPORT_ARGS --ignoreBulkImport";;
+ --help) export HELP_OPTION="true"; IMPORT_ARGS="$IMPORT_ARGS --help";;
-deleteNonExisting) IMPORT_ARGS="$IMPORT_ARGS -deleteNonExisting";;
"") break;;
- *) JVM_ARGS="$JVM_ARGS $option"
+ *) IMPORT_ARGS="$IMPORT_ARGS $option"
esac
done
JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
-echo "Log file for import is $LOGFILE"
+if [ -z ${HELP_OPTION} ]; then
+ echo "Log file for import is $LOGFILE"
+fi
"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS
RETVAL=$?
-[ $RETVAL -eq 0 ] && echo Hive metadata imported successfully!
-[ $RETVAL -ne 0 ] && echo Failed to import Hive metadata! Check logs at: $LOGFILE for details.
+if [ -z ${HELP_OPTION} ]; then
+ [ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!
+ [ $RETVAL -eq 1 ] && echo Failed to import Hive Meta Data! Check logs at: $LOGFILE for details.
+fi
exit $RETVAL
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index c361ac6..28365bc 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
@@ -43,12 +42,14 @@
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.utils.PathExtractorContext;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;
@@ -105,10 +106,27 @@
public static final String HIVE_TABLE_DB_EDGE_LABEL = "__hive_table.db";
public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit";
+ static final String OPTION_OUTPUT_FILEPATH_SHORT = "o";
+ static final String OPTION_OUTPUT_FILEPATH_LONG = "output";
+ static final String OPTION_IGNORE_BULK_IMPORT_SHORT = "i";
+ static final String OPTION_IGNORE_BULK_IMPORT_LONG = "ignoreBulkImport";
+ static final String OPTION_DATABASE_SHORT = "d";
+ static final String OPTION_DATABASE_LONG = "database";
+ static final String OPTION_TABLE_SHORT = "t";
+ static final String OPTION_TABLE_LONG = "table";
+ static final String OPTION_IMPORT_DATA_FILE_SHORT = "f";
+ static final String OPTION_IMPORT_DATA_FILE_LONG = "filename";
+ static final String OPTION_FAIL_ON_ERROR = "failOnError";
+ static final String OPTION_DELETE_NON_EXISTING = "deleteNonExisting";
+ static final String OPTION_HELP_SHORT = "h";
+ static final String OPTION_HELP_LONG = "help";
+
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
- private static final int EXIT_CODE_SUCCESS = 0;
- private static final int EXIT_CODE_FAILED = 1;
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final int EXIT_CODE_INVALID_ARG = 2;
+
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private static int pageLimit = 10000;
@@ -122,84 +140,63 @@
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
+ Options acceptedCliOptions = prepareCommandLineOptions();
try {
- Options options = new Options();
- options.addOption("d", "database", true, "Database name");
- options.addOption("t", "table", true, "Table name");
- options.addOption("f", "filename", true, "Filename");
- options.addOption("failOnError", false, "failOnError");
- options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive");
+ CommandLine cmd = new BasicParser().parse(acceptedCliOptions, args);
+ List<String> argsNotProcessed = cmd.getArgList();
- CommandLine cmd = new BasicParser().parse(options, args);
- boolean failOnError = cmd.hasOption("failOnError");
- boolean deleteNonExisting = cmd.hasOption("deleteNonExisting");
- LOG.info("delete non existing flag : {} ", deleteNonExisting);
-
- String databaseToImport = cmd.getOptionValue("d");
- String tableToImport = cmd.getOptionValue("t");
- String fileToImport = cmd.getOptionValue("f");
- Configuration atlasConf = ApplicationProperties.get();
- String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
-
- if (atlasEndpoint == null || atlasEndpoint.length == 0) {
- atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
+ if (argsNotProcessed != null && argsNotProcessed.size() > 0) {
+ throw new ParseException("Unrecognized arguments.");
}
-
- if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
- String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
-
- atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
- } else {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
- }
-
- HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
-
- if (deleteNonExisting) {
- hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
+ if (cmd.hasOption(OPTION_HELP_SHORT)) {
+ printUsage(acceptedCliOptions);
exitCode = EXIT_CODE_SUCCESS;
- } else if (StringUtils.isNotEmpty(fileToImport)) {
- File f = new File(fileToImport);
+ } else {
+ Configuration atlasConf = ApplicationProperties.get();
+ String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
- if (f.exists() && f.canRead()) {
- BufferedReader br = new BufferedReader(new FileReader(f));
- String line = null;
-
- while((line = br.readLine()) != null) {
- String val[] = line.split(":");
-
- if (ArrayUtils.isNotEmpty(val)) {
- databaseToImport = val[0];
-
- if (val.length > 1) {
- tableToImport = val[1];
- } else {
- tableToImport = "";
- }
-
- hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
- }
- }
-
- exitCode = EXIT_CODE_SUCCESS;
- } else {
- LOG.error("Failed to read the input file: " + fileToImport);
- exitCode = EXIT_CODE_FAILED;
+ if (atlasEndpoint == null || atlasEndpoint.length == 0) {
+ atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
}
- } else {
- hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
- exitCode = EXIT_CODE_SUCCESS;
- }
+ if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+ String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
+
+ atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
+ } else {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
+ }
+
+ boolean createZip = cmd.hasOption(OPTION_OUTPUT_FILEPATH_LONG);
+
+ if (createZip) {
+ HiveMetaStoreBridgeV2 hiveMetaStoreBridgeV2 = new HiveMetaStoreBridgeV2(atlasConf, new HiveConf(), atlasClientV2);
+
+ if (hiveMetaStoreBridgeV2.exportDataToZipAndRunAtlasImport(cmd)) {
+ exitCode = EXIT_CODE_SUCCESS;
+ }
+ } else {
+ HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
+
+ if (hiveMetaStoreBridge.importDataDirectlyToAtlas(cmd)) {
+ exitCode = EXIT_CODE_SUCCESS;
+ }
+ }
+ }
} catch(ParseException e) {
- LOG.error("Failed to parse arguments. Error: ", e.getMessage());
- printUsage();
+ LOG.error("Invalid argument. Error: {}", e.getMessage());
+ System.out.println("Invalid argument. Error: " + e.getMessage());
+ exitCode = EXIT_CODE_INVALID_ARG;
+
+ if (!(e instanceof MissingArgumentException)) {
+ printUsage(acceptedCliOptions);
+ }
} catch(Exception e) {
- LOG.error("Import failed", e);
+ LOG.error("Import Failed", e);
} finally {
if( atlasClientV2 !=null) {
atlasClientV2.close();
@@ -209,26 +206,48 @@
System.exit(exitCode);
}
- private static void printUsage() {
+ private static Options prepareCommandLineOptions() {
+ Options acceptedCliOptions = new Options();
+
+ return acceptedCliOptions.addOption(OPTION_OUTPUT_FILEPATH_SHORT, OPTION_OUTPUT_FILEPATH_LONG, true, "Output path or file for Zip import")
+ .addOption(OPTION_IGNORE_BULK_IMPORT_SHORT, OPTION_IGNORE_BULK_IMPORT_LONG, false, "Ignore bulk Import for Zip import")
+ .addOption(OPTION_DATABASE_SHORT, OPTION_DATABASE_LONG, true, "Database name")
+ .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table name")
+ .addOption(OPTION_IMPORT_DATA_FILE_SHORT, OPTION_IMPORT_DATA_FILE_LONG, true, "Filename")
+ .addOption(OPTION_FAIL_ON_ERROR, false, "failOnError")
+ .addOption(OPTION_DELETE_NON_EXISTING, false, "Delete database and table entities in Atlas if not present in Hive")
+ .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print this help message");
+ }
+
+ private static void printUsage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("import-hive.sh", options);
System.out.println();
+ System.out.println("Usage options:");
+ System.out.println(" Usage 1: import-hive.sh [-d <database> OR --database <database>] " );
+ System.out.println(" Imports specified database and its tables ...");
System.out.println();
- System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] " );
- System.out.println(" Imports specified database and its tables ...");
+ System.out.println(" Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+ System.out.println(" Imports specified table within that database ...");
System.out.println();
- System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
- System.out.println(" Imports specified table within that database ...");
+ System.out.println(" Usage 3: import-hive.sh");
+ System.out.println(" Imports all databases and tables...");
System.out.println();
- System.out.println("Usage 3: import-hive.sh");
- System.out.println(" Imports all databases and tables...");
+ System.out.println(" Usage 4: import-hive.sh -f <filename>");
+ System.out.println(" Imports all databases and tables in the file...");
+ System.out.println(" Format:");
+ System.out.println(" database1:tbl1");
+ System.out.println(" database1:tbl2");
+ System.out.println(" database2:tbl2");
System.out.println();
- System.out.println("Usage 4: import-hive.sh -f <filename>");
- System.out.println(" Imports all databases and tables in the file...");
- System.out.println(" Format:");
- System.out.println(" database1:tbl1");
- System.out.println(" database1:tbl2");
- System.out.println(" database2:tbl2");
- System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] " );
- System.out.println(" Deletes databases and tables which are not in Hive ...");
+ System.out.println(" Usage 5: import-hive.sh [-deleteNonExisting] " );
+ System.out.println(" Deletes databases and tables which are not in Hive ...");
+ System.out.println();
+ System.out.println(" Usage 6: import-hive.sh -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+ System.out.println(" To create zip file with exported data and import the zip file at Atlas ...");
+ System.out.println();
+ System.out.println(" Usage 7: import-hive.sh -i -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+ System.out.println(" To create zip file with exported data without importing to Atlas which can be imported later ...");
System.out.println();
}
@@ -286,6 +305,54 @@
return convertHdfsPathToLowerCase;
}
+ public boolean importDataDirectlyToAtlas(CommandLine cmd) throws Exception {
+ LOG.info("Importing Hive metadata");
+ boolean ret = false;
+
+ String databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT);
+ String tableToImport = cmd.getOptionValue(OPTION_TABLE_SHORT);
+ String fileToImport = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT);
+
+ boolean failOnError = cmd.hasOption(OPTION_FAIL_ON_ERROR);
+ boolean deleteNonExisting = cmd.hasOption(OPTION_DELETE_NON_EXISTING);
+
+ LOG.info("delete non existing flag : {} ", deleteNonExisting);
+
+ if (deleteNonExisting) {
+ deleteEntitiesForNonExistingHiveMetadata(failOnError);
+ ret = true;
+ } else if (StringUtils.isNotEmpty(fileToImport)) {
+ File f = new File(fileToImport);
+
+ if (f.exists() && f.canRead()) {
+ BufferedReader br = new BufferedReader(new FileReader(f));
+ String line = null;
+
+ while((line = br.readLine()) != null) {
+ String val[] = line.split(":");
+
+ if (ArrayUtils.isNotEmpty(val)) {
+ databaseToImport = val[0];
+
+ if (val.length > 1) {
+ tableToImport = val[1];
+ } else {
+ tableToImport = "";
+ }
+
+ importDatabases(failOnError, databaseToImport, tableToImport);
+ }
+ }
+ ret = true;
+ } else {
+ LOG.error("Failed to read the input file: " + fileToImport);
+ }
+ } else {
+ importDatabases(failOnError, databaseToImport, tableToImport);
+ ret = true;
+ }
+ return ret;
+ }
@VisibleForTesting
public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java
new file mode 100644
index 0000000..0627c0e
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java
@@ -0,0 +1,1036 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.hive.hook.events.BaseHiveEvent;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.utils.HdfsNameServiceResolver;
+import org.apache.atlas.utils.AtlasConfigurationUtil;
+import org.apache.atlas.utils.PathExtractorContext;
+import org.apache.atlas.utils.LruCache;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
+
+/**
+ * A Bridge Utility that imports metadata into zip file from the Hive Meta Store
+ * which can be exported at Atlas
+ */
+public class HiveMetaStoreBridgeV2 {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridgeV2.class);
+
+ private static final String OPTION_DATABASE_SHORT = "d";
+ private static final String OPTION_TABLE_SHORT = "t";
+ private static final String OPTION_IMPORT_DATA_FILE_SHORT = "f";
+ private static final String OPTION_OUTPUT_FILEPATH_SHORT = "o";
+ private static final String OPTION_IGNORE_BULK_IMPORT_SHORT = "i";
+
+ public static final String CONF_PREFIX = "atlas.hook.hive.";
+ public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+ public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version";
+
+ public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
+ public static final String HIVE_USERNAME = "atlas.hook.hive.default.username";
+ public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace";
+ public static final String DEFAULT_CLUSTER_NAME = "primary";
+ public static final String TEMP_TABLE_PREFIX = "_temp-";
+ public static final String SEP = ":".intern();
+ public static final String DEFAULT_METASTORE_CATALOG = "hive";
+ public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit";
+
+ private static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
+ private static final String ZIP_FILE_COMMENT_FORMAT = "{\"entitiesCount\":%d, \"total\":%d}";
+ private static final int DEFAULT_PAGE_LIMIT = 10000;
+ private static final String DEFAULT_ZIP_FILE_NAME = "import-hive-output.zip";
+ private static final String ZIP_ENTRY_ENTITIES = "entities.json";
+ private static final String TYPES_DEF_JSON = "atlas-typesdef.json";
+
+ private static final String JSON_ARRAY_START = "[";
+ private static final String JSON_COMMA = ",";
+ private static final String JSON_EMPTY_OBJECT = "{}";
+ private static final String JSON_ARRAY_END = "]";
+
+ private static int pageLimit = DEFAULT_PAGE_LIMIT;
+ private String awsS3AtlasModelVersion = null;
+
+ private final String metadataNamespace;
+ private final Hive hiveClient;
+ private final AtlasClientV2 atlasClientV2;
+ private final boolean convertHdfsPathToLowerCase;
+
+ private ZipOutputStream zipOutputStream;
+ private String outZipFileName;
+ private int totalProcessedEntities = 0;
+
+ private final Map<String, AtlasEntityWithExtInfo> entityLRUCache = new LruCache<>(10000, 0);
+ private final Map<Table, AtlasEntity> hiveTablesAndAtlasEntity = new HashMap<>();
+ private final Map<String, AtlasEntity> dbEntities = new HashMap<>();
+ private final List<Map<String, String>> databaseAndTableListToImport = new ArrayList<>();
+ private final Map<String, String> qualifiedNameGuidMap = new HashMap<>();
+
+ /**
+ * Construct a HiveMetaStoreBridgeV2.
+ * @param hiveConf {@link HiveConf} for Hive component in the cluster
+ */
+ public HiveMetaStoreBridgeV2(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
+ this.metadataNamespace = getMetadataNamespace(atlasProperties);
+ this.hiveClient = Hive.get(hiveConf);
+ this.atlasClientV2 = atlasClientV2;
+ this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+ this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
+
+ if (atlasProperties != null) {
+ pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, DEFAULT_PAGE_LIMIT);
+ }
+ }
+
+ public boolean exportDataToZipAndRunAtlasImport(CommandLine cmd) throws MissingArgumentException, IOException, HiveException, AtlasBaseException {
+ boolean ret = true;
+ boolean failOnError = cmd.hasOption("failOnError");
+
+ String databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT);
+ String tableToImport = cmd.getOptionValue(OPTION_TABLE_SHORT);
+ String importDataFile = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT);
+ String outputFileOrPath = cmd.getOptionValue(OPTION_OUTPUT_FILEPATH_SHORT);
+
+ boolean ignoreBulkImport = cmd.hasOption(OPTION_IGNORE_BULK_IMPORT_SHORT);
+
+ validateOutputFileOrPath(outputFileOrPath);
+
+ try {
+ initializeZipStream();
+
+ if (isValidImportDataFile(importDataFile)) {
+ File f = new File(importDataFile);
+
+ BufferedReader br = new BufferedReader(new FileReader(f));
+ String line = null;
+
+ while ((line = br.readLine()) != null) {
+ String val[] = line.split(":");
+
+ if (ArrayUtils.isNotEmpty(val)) {
+ databaseToImport = val[0];
+
+ if (val.length > 1) {
+ tableToImport = val[1];
+ } else {
+ tableToImport = "";
+ }
+
+ importHiveDatabases(databaseToImport, tableToImport, failOnError);
+ }
+ }
+ } else {
+ importHiveDatabases(databaseToImport, tableToImport, failOnError);
+ }
+
+ importHiveTables(failOnError);
+ importHiveColumns(failOnError);
+ } finally {
+ endWritingAndZipStream();
+ }
+
+ if (!ignoreBulkImport) {
+ runAtlasImport();
+ }
+
+ return ret;
+ }
+
+ private void validateOutputFileOrPath(String outputFileOrPath) throws MissingArgumentException {
+ if (StringUtils.isBlank(outputFileOrPath)) {
+ throw new MissingArgumentException("Output Path/File can't be empty");
+ }
+
+ File fileOrDirToImport = new File(outputFileOrPath);
+ if (fileOrDirToImport.exists()) {
+ if (fileOrDirToImport.isDirectory()) {
+ this.outZipFileName = outputFileOrPath + File.separator + DEFAULT_ZIP_FILE_NAME;
+ LOG.info("The default output zip file {} will be created at {}", DEFAULT_ZIP_FILE_NAME, outputFileOrPath);
+ } else {
+ throw new MissingArgumentException("output file: " + outputFileOrPath + " already present");
+ }
+ } else if (fileOrDirToImport.getParentFile().isDirectory() && outputFileOrPath.endsWith(".zip")) {
+ LOG.info("The mentioned output zip file {} will be created", outputFileOrPath);
+ this.outZipFileName = outputFileOrPath;
+ } else {
+ throw new MissingArgumentException("Invalid File/Path");
+ }
+ }
+
+ private boolean isValidImportDataFile(String importDataFile) throws MissingArgumentException {
+ boolean ret = false;
+ if (StringUtils.isNotBlank(importDataFile)) {
+ File dataFile = new File(importDataFile);
+
+ if (!dataFile.exists() || !dataFile.canRead()) {
+ throw new MissingArgumentException("Invalid import data file");
+ }
+ ret = true;
+ }
+
+ return ret;
+ }
+
+ private void initializeZipStream() throws IOException, AtlasBaseException {
+ this.zipOutputStream = new ZipOutputStream(getOutputStream(this.outZipFileName));
+
+ storeTypesDefToZip(new AtlasTypesDef());
+
+ startWritingEntitiesToZip();
+ }
+
+ private void storeTypesDefToZip(AtlasTypesDef typesDef) throws AtlasBaseException {
+ String jsonData = AtlasType.toJson(typesDef);
+ saveToZip(TYPES_DEF_JSON, jsonData);
+ }
+
+ private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
+ try {
+ ZipEntry e = new ZipEntry(fileName);
+ zipOutputStream.putNextEntry(e);
+ writeBytes(jsonData);
+ zipOutputStream.closeEntry();
+ } catch (IOException e) {
+ throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
+ }
+ }
+
+ private void startWritingEntitiesToZip() throws IOException {
+ zipOutputStream.putNextEntry(new ZipEntry(ZIP_ENTRY_ENTITIES));
+ writeBytes(JSON_ARRAY_START);
+ }
+
+ private String getDatabaseToImport(String TableWithDatabase) {
+ String ret = null;
+ String val[] = TableWithDatabase.split("\\.");
+ if (val.length > 1) {
+ ret = val[0];
+ }
+ return ret;
+ }
+
+ private String getTableToImport(String TableWithDatabase) {
+ String ret = null;
+ String val[] = TableWithDatabase.split("\\.");
+ if (val.length > 1) {
+ ret = val[1];
+ }
+ return ret;
+ }
+
+ private void importHiveDatabases(String databaseToImport, String tableWithDatabaseToImport, boolean failOnError) throws HiveException, AtlasBaseException {
+ LOG.info("Importing Hive Databases");
+
+ List<String> databaseNames = null;
+
+ if (StringUtils.isEmpty(databaseToImport) && StringUtils.isNotEmpty(tableWithDatabaseToImport)) {
+ if (isTableWithDatabaseName(tableWithDatabaseToImport)) {
+ databaseToImport = getDatabaseToImport(tableWithDatabaseToImport);
+ tableWithDatabaseToImport = getTableToImport(tableWithDatabaseToImport);
+ }
+ }
+
+ if (StringUtils.isEmpty(databaseToImport)) {
+ //when database to import is empty, import all
+ databaseNames = hiveClient.getAllDatabases();
+ } else {
+ //when database to import has some value then, import that db and all table under it.
+ databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
+ }
+
+ if (!CollectionUtils.isEmpty(databaseNames)) {
+ LOG.info("Found {} databases", databaseNames.size());
+ for (String databaseName : databaseNames) {
+ try {
+ if (!dbEntities.containsKey(databaseName)) {
+ LOG.info("Importing Hive Database {}", databaseName);
+ AtlasEntityWithExtInfo dbEntity = writeDatabase(databaseName);
+ if (dbEntity != null) {
+ dbEntities.put(databaseName, dbEntity.getEntity());
+ }
+ }
+ databaseAndTableListToImport.add(Collections.singletonMap(databaseName, tableWithDatabaseToImport));
+ } catch (IOException e) {
+ LOG.error("Import failed for hive database {}", databaseName, e);
+
+ if (failOnError) {
+ throw new AtlasBaseException(e.getMessage(), e);
+ }
+ }
+ }
+ } else {
+ LOG.error("No database found");
+ if (failOnError) {
+ throw new AtlasBaseException("No database found");
+ }
+ }
+ }
+
+ private void writeEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws IOException {
+ if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+ Iterator<Map.Entry<String, AtlasEntity>> itr = entityWithExtInfo.getReferredEntities().entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, AtlasEntity> eachEntity = itr.next();
+ if (eachEntity.getValue().getTypeName().equalsIgnoreCase(HiveDataTypes.HIVE_DB.getName())) {
+ itr.remove();
+ }
+ }
+ }
+
+ if (!entityLRUCache.containsKey(entityWithExtInfo.getEntity().getGuid())) {
+ entityLRUCache.put(entityWithExtInfo.getEntity().getGuid(), entityWithExtInfo);
+ writeBytes(AtlasType.toJson(entityWithExtInfo) + JSON_COMMA);
+ }
+ totalProcessedEntities++;
+ }
+
+ private void endWritingAndZipStream() throws IOException {
+ writeBytes(JSON_EMPTY_OBJECT);
+ writeBytes(JSON_ARRAY_END);
+ setStreamSize(totalProcessedEntities);
+ close();
+ }
+
+ private void flush() {
+ try {
+ zipOutputStream.flush();
+ } catch (IOException e) {
+ LOG.error("Error: Flush: ", e);
+ }
+ }
+
+ private void close() throws IOException {
+ zipOutputStream.flush();
+ zipOutputStream.closeEntry();
+ zipOutputStream.close();
+ }
+
+ private void writeBytes(String payload) throws IOException {
+ zipOutputStream.write(payload.getBytes());
+ }
+
+ private OutputStream getOutputStream(String fileToWrite) throws IOException {
+ return FileUtils.openOutputStream(new File(fileToWrite));
+ }
+
+ public String getMetadataNamespace(Configuration config) {
+ return AtlasConfigurationUtil.getRecentString(config, HIVE_METADATA_NAMESPACE, getClusterName(config));
+ }
+
+ private String getClusterName(Configuration config) {
+ return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+ }
+
+ public String getMetadataNamespace() {
+ return metadataNamespace;
+ }
+
+ public boolean isConvertHdfsPathToLowerCase() {
+ return convertHdfsPathToLowerCase;
+ }
+
+ /**
+ * Imports Hive tables if databaseAndTableListToImport is populated
+ * @param failOnError
+ * @throws Exception
+ */
+ public void importHiveTables(boolean failOnError) throws HiveException, AtlasBaseException {
+ LOG.info("Importing Hive Tables");
+
+ int tablesImported = 0;
+
+ if (CollectionUtils.isNotEmpty(databaseAndTableListToImport) && MapUtils.isNotEmpty(dbEntities)) {
+ for (Map<String, String> eachEntry : databaseAndTableListToImport) {
+ final List<Table> tableObjects;
+
+ String databaseName = eachEntry.keySet().iterator().next();
+
+ if (StringUtils.isEmpty(eachEntry.values().iterator().next())) {
+ tableObjects = hiveClient.getAllTableObjects(databaseName);
+
+ populateQualifiedNameGuidMap(HiveDataTypes.HIVE_DB.getName(), (String) dbEntities.get(databaseName).getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ } else {
+ List<String> tableNames = hiveClient.getTablesByPattern(databaseName, eachEntry.values().iterator().next());
+ tableObjects = new ArrayList<>();
+
+ for (String tableName : tableNames) {
+ Table table = hiveClient.getTable(databaseName, tableName);
+ tableObjects.add(table);
+ populateQualifiedNameGuidMap(HiveDataTypes.HIVE_TABLE.getName(), getTableQualifiedName(metadataNamespace, table));
+ }
+ }
+
+ if (!CollectionUtils.isEmpty(tableObjects)) {
+ LOG.info("Found {} tables to import in database {}", tableObjects.size(), databaseName);
+
+ try {
+ for (Table table : tableObjects) {
+ int imported = importTable(dbEntities.get(databaseName), table, failOnError);
+
+ tablesImported += imported;
+ }
+ } finally {
+ if (tablesImported == tableObjects.size()) {
+ LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
+ } else {
+ LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import",
+ tablesImported, tableObjects.size(), databaseName);
+ }
+ }
+ } else {
+ LOG.error("No tables to import in database {}", databaseName);
+ if (failOnError) {
+ throw new AtlasBaseException("No tables to import in database - " + databaseName);
+ }
+ }
+ }
+ }
+
+ dbEntities.clear();
+ }
+
+ private void populateQualifiedNameGuidMap(String typeName, String qualifiedName) {
+ try {
+ AtlasEntitiesWithExtInfo entitiesWithExtInfo = atlasClientV2.getEntitiesByAttribute(typeName, Collections.singletonList(Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)), true, false);
+
+ if (entitiesWithExtInfo != null && entitiesWithExtInfo.getEntities() != null) {
+ for (AtlasEntity entity : entitiesWithExtInfo.getEntities()) {
+ qualifiedNameGuidMap.put((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getGuid());
+
+ for(Map.Entry<String, AtlasEntity> eachEntry : entitiesWithExtInfo.getReferredEntities().entrySet()) {
+ qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey());
+ }
+
+ if (typeName.equals(HiveDataTypes.HIVE_DB.getName())) {
+ for (String eachRelatedGuid : getAllRelatedGuids(entity)) {
+ AtlasEntityWithExtInfo relatedEntity = atlasClientV2.getEntityByGuid(eachRelatedGuid, true, false);
+
+ qualifiedNameGuidMap.put((String) relatedEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), relatedEntity.getEntity().getGuid());
+ for (Map.Entry<String, AtlasEntity> eachEntry : relatedEntity.getReferredEntities().entrySet()) {
+ qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey());
+ }
+ }
+ }
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.info("Unable to load the related entities for type {} and qualified name {} from Atlas", typeName, qualifiedName, e);
+ }
+ }
+
+ private Set<String> getAllRelatedGuids(AtlasEntity entity) {
+ Set<String> relGuidsSet = new HashSet<>();
+
+ for (Object o : entity.getRelationshipAttributes().values()) {
+ if (o instanceof AtlasObjectId) {
+ relGuidsSet.add(((AtlasObjectId) o).getGuid());
+ } else if (o instanceof List) {
+ for (Object id : (List) o) {
+ if (id instanceof AtlasObjectId) {
+ relGuidsSet.add(((AtlasObjectId) id).getGuid());
+ }
+ if (id instanceof Map) {
+ relGuidsSet.add((String) ((Map) id).get("guid"));
+ }
+ }
+ }
+ }
+
+ return relGuidsSet;
+ }
+
+ public void importHiveColumns(boolean failOnError) throws AtlasBaseException {
+ LOG.info("Importing Hive Columns");
+
+ if (MapUtils.isEmpty(hiveTablesAndAtlasEntity)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No hive table present to import columns");
+ }
+
+ return;
+ }
+
+ for (Map.Entry<Table, AtlasEntity> eachTable : hiveTablesAndAtlasEntity.entrySet()) {
+ int columnsImported = 0;
+ List<AtlasEntity> columnEntities = new ArrayList<>();
+
+ try {
+ List<AtlasEntity> partKeys = toColumns(eachTable.getKey().getPartitionKeys(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+ List<AtlasEntity> columns = toColumns(eachTable.getKey().getCols(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_COLUMNS);
+
+ partKeys.stream().collect(Collectors.toCollection(() -> columnEntities));
+ columns.stream().collect(Collectors.toCollection(() -> columnEntities));
+
+ for (AtlasEntity eachColumnEntity : columnEntities) {
+ writeEntityToZip(new AtlasEntityWithExtInfo(eachColumnEntity));
+ columnsImported++;
+ }
+ } catch (IOException e) {
+ LOG.error("Column Import failed for hive table {}", eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), e);
+
+ if (failOnError) {
+ throw new AtlasBaseException(e.getMessage(), e);
+ }
+ } finally {
+ if (columnsImported == columnEntities.size()) {
+ LOG.info("Successfully imported {} columns for table {}", columnsImported, eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ } else {
+ LOG.error("Imported {} of {} columns for table {}. Please check logs for errors during import", columnsImported, columnEntities.size(), eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+ }
+ }
+
+ }
+
+ private void runAtlasImport() {
+ AtlasImportRequest request = new AtlasImportRequest();
+ request.setOption(AtlasImportRequest.UPDATE_TYPE_DEFINITION_KEY, "false");
+ request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
+
+ try {
+ AtlasImportResult importResult = atlasClientV2.importData(request, this.outZipFileName);
+
+ if (importResult.getOperationStatus() == AtlasImportResult.OperationStatus.SUCCESS) {
+ LOG.info("Successfully imported the zip file {} at Atlas and imported {} entities. Number of entities to be imported {}.", this.outZipFileName, importResult.getProcessedEntities().size(), totalProcessedEntities);
+ } else {
+ LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities);
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities, e);
+ }
+ }
+
+ public int importTable(AtlasEntity dbEntity, Table table, final boolean failOnError) throws AtlasBaseException {
+ try {
+ AtlasEntityWithExtInfo tableEntity = writeTable(dbEntity, table);
+
+ hiveTablesAndAtlasEntity.put(table, tableEntity.getEntity());
+
+ if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+ String processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table);
+ String tableLocationString = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
+ Path location = table.getDataLocation();
+ String query = getCreateTableString(table, tableLocationString);
+
+ PathExtractorContext pathExtractorCtx = new PathExtractorContext(getMetadataNamespace(), isConvertHdfsPathToLowerCase(), awsS3AtlasModelVersion);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(location, pathExtractorCtx);
+ AtlasEntity pathInst = entityWithExtInfo.getEntity();
+ AtlasEntity tableInst = tableEntity.getEntity();
+ AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
+
+ long now = System.currentTimeMillis();
+
+ processInst.setGuid(getGuid(processQualifiedName));
+ processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
+ processInst.setAttribute(ATTRIBUTE_NAME, query);
+ processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+ processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS)));
+ processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+ String userName = table.getOwner();
+ if (StringUtils.isEmpty(userName)) {
+ userName = ApplicationProperties.get().getString(HIVE_USERNAME, "hive");
+ }
+ processInst.setAttribute(ATTRIBUTE_USER_NAME, userName);
+ processInst.setAttribute(ATTRIBUTE_START_TIME, now);
+ processInst.setAttribute(ATTRIBUTE_END_TIME, now);
+ processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE");
+ processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query);
+ processInst.setAttribute(ATTRIBUTE_QUERY_ID, query);
+ processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}");
+ processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query));
+
+ AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
+
+ createTableProcess.addEntity(processInst);
+
+ if (pathExtractorCtx.getKnownEntities() != null) {
+ pathExtractorCtx.getKnownEntities().values().forEach(entity -> createTableProcess.addEntity(entity));
+ } else {
+ createTableProcess.addEntity(pathInst);
+ }
+
+ writeEntitiesToZip(createTableProcess);
+ }
+
+ return 1;
+ } catch (Exception e) {
+ LOG.error("Import failed for hive_table {}", table.getTableName(), e);
+
+ if (failOnError) {
+ throw new AtlasBaseException(e.getMessage(), e);
+ }
+
+ return 0;
+ }
+ }
+
+ /**
+ * Write db entity
+ * @param databaseName
+ * @return
+ * @throws Exception
+ */
+ private AtlasEntityWithExtInfo writeDatabase(String databaseName) throws HiveException, IOException {
+ AtlasEntityWithExtInfo ret = null;
+ Database db = hiveClient.getDatabase(databaseName);
+
+ if (db != null) {
+ ret = new AtlasEntityWithExtInfo(toDbEntity(db));
+ writeEntityToZip(ret);
+ }
+
+ return ret;
+ }
+
+ private AtlasEntityWithExtInfo writeTable(AtlasEntity dbEntity, Table table) throws AtlasHookException {
+ try {
+ AtlasEntityWithExtInfo tableEntity = toTableEntity(dbEntity, table);
+ writeEntityToZip(tableEntity);
+
+ return tableEntity;
+ } catch (Exception e) {
+ throw new AtlasHookException("HiveMetaStoreBridgeV2.registerTable() failed.", e);
+ }
+ }
+
+ /**
+ * Write an entity to Zip file
+ * @param entity
+ * @return
+ * @throws Exception
+ */
+ private void writeEntityToZip(AtlasEntityWithExtInfo entity) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing {} entity: {}", entity.getEntity().getTypeName(), entity);
+ }
+
+ writeEntity(entity);
+ clearRelationshipAttributes(entity.getEntity());
+ flush();
+ }
+
+ /**
+ * Registers an entity in atlas
+ * @param entities
+ * @return
+ * @throws Exception
+ */
+ private void writeEntitiesToZip(AtlasEntitiesWithExtInfo entities) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing {} entities: {}", entities.getEntities().size(), entities);
+ }
+
+ for (AtlasEntity entity : entities.getEntities()) {
+ writeEntity(new AtlasEntityWithExtInfo(entity));
+ }
+
+ flush();
+ clearRelationshipAttributes(entities);
+ }
+
+ /**
+ * Create a Hive Database entity
+ * @param hiveDB The Hive {@link Database} object from which to map properties
+ * @return new Hive Database AtlasEntity
+ * @throws HiveException
+ */
+ private AtlasEntity toDbEntity(Database hiveDB) {
+ return toDbEntity(hiveDB, null);
+ }
+
+ private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) {
+ if (dbEntity == null) {
+ dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+ }
+
+ String dbName = getDatabaseName(hiveDB);
+
+ String qualifiedName = getDBQualifiedName(metadataNamespace, dbName);
+ dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+
+ dbEntity.setGuid(getGuid(true, qualifiedName));
+
+ dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
+ dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
+ dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
+
+ dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+ dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
+ dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
+
+ if (hiveDB.getOwnerType() != null) {
+ dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue()));
+ }
+
+ return dbEntity;
+ }
+
+ private String getDBGuidFromAtlas(String dBQualifiedName) {
+ String guid = null;
+ try {
+ guid = atlasClientV2.getEntityHeaderByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dBQualifiedName)).getGuid();
+ } catch (AtlasServiceException e) {
+ LOG.warn("Failed to get DB guid from Atlas with qualified name {}", dBQualifiedName, e);
+ }
+ return guid;
+ }
+
+ public static String getDatabaseName(Database hiveDB) {
+ String dbName = hiveDB.getName().toLowerCase();
+ String catalogName = hiveDB.getCatalogName() != null ? hiveDB.getCatalogName().toLowerCase() : null;
+
+ if (StringUtils.isNotEmpty(catalogName) && !StringUtils.equals(catalogName, DEFAULT_METASTORE_CATALOG)) {
+ dbName = catalogName + SEP + dbName;
+ }
+
+ return dbName;
+ }
+
+ /**
+ * Create a new table instance in Atlas
+ * @param database AtlasEntity for Hive {@link AtlasEntity} to which this table belongs
+ * @param hiveTable reference to the Hive {@link Table} from which to map properties
+ * @return Newly created Hive AtlasEntity
+ * @throws Exception
+ */
+ private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable) throws AtlasHookException {
+ AtlasEntityWithExtInfo table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()));
+
+ AtlasEntity tableEntity = table.getEntity();
+ String tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable);
+ long createTime = BaseHiveEvent.getTableCreateTime(hiveTable);
+ long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
+
+ tableEntity.setGuid(getGuid(tableQualifiedName));
+ tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
+ tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
+ tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
+ tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
+
+ tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+ tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+ tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention());
+ tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters());
+ tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT));
+ tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name());
+ tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary());
+
+ if (hiveTable.getViewOriginalText() != null) {
+ tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText());
+ }
+
+ if (hiveTable.getViewExpandedText() != null) {
+ tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
+ }
+
+ AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), getStorageDescQFName(tableQualifiedName), AtlasTypeUtil.getObjectId(tableEntity));
+
+ tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+
+ table.addReferredEntity(database);
+ table.addReferredEntity(sdEntity);
+ table.setEntity(tableEntity);
+
+ return table;
+ }
+
+ private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String sdQualifiedName, AtlasObjectId tableId) {
+ AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
+
+ ret.setGuid(getGuid(sdQualifiedName));
+ ret.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
+ ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
+ ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
+ ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
+ ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
+ ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
+ ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets());
+ ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories());
+
+ if (storageDesc.getBucketCols().size() > 0) {
+ ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols());
+ }
+
+ if (storageDesc.getSerdeInfo() != null) {
+ SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
+
+ LOG.info("serdeInfo = {}", serdeInfo);
+ AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName());
+
+ serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName());
+ serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib());
+ serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters());
+
+ ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct);
+ }
+
+ if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) {
+ List<AtlasStruct> sortColsStruct = new ArrayList<>();
+
+ for (Order sortcol : storageDesc.getSortCols()) {
+ String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
+ AtlasStruct colStruct = new AtlasStruct(hiveOrderName);
+ colStruct.setAttribute("col", sortcol.getCol());
+ colStruct.setAttribute("order", sortcol.getOrder());
+
+ sortColsStruct.add(colStruct);
+ }
+
+ ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct);
+ }
+
+ return ret;
+ }
+
+ private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) {
+ List<AtlasEntity> ret = new ArrayList<>();
+
+ int columnPosition = 0;
+ for (FieldSchema fs : schemaList) {
+ LOG.debug("Processing field {}", fs);
+
+ AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
+
+ String columnQualifiedName = getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName());
+
+ column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName);
+ column.setGuid(getGuid(columnQualifiedName));
+
+ column.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, relationshipType));
+
+ column.setAttribute(ATTRIBUTE_NAME, fs.getName());
+ column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
+ column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType());
+ column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++);
+ column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment());
+
+ ret.add(column);
+ }
+ return ret;
+ }
+
+ private String getCreateTableString(Table table, String location){
+ String colString = "";
+ List<FieldSchema> colList = table.getAllCols();
+
+ if (colList != null) {
+ for (FieldSchema col : colList) {
+ colString += col.getName() + " " + col.getType() + ",";
+ }
+
+ if (colList.size() > 0) {
+ colString = colString.substring(0, colString.length() - 1);
+ colString = "(" + colString + ")";
+ }
+ }
+
+ String query = "create external table " + table.getTableName() + colString + " location '" + location + "'";
+
+ return query;
+ }
+
+ private String lower(String str) {
+ if (StringUtils.isEmpty(str)) {
+ return "";
+ }
+
+ return str.toLowerCase().trim();
+ }
+
+ /**
+ * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+ * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
+ * @param table hive table for which the qualified name is needed
+ * @return Unique qualified name to identify the Table instance in Atlas.
+ */
+ private static String getTableQualifiedName(String metadataNamespace, Table table) {
+ return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary());
+ }
+
+ /**
+ * Construct the qualified name used to uniquely identify a Database instance in Atlas.
+ * @param metadataNamespace Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database
+ * @return Unique qualified name to identify the Database instance in Atlas.
+ */
+ public static String getDBQualifiedName(String metadataNamespace, String dbName) {
+ return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
+ }
+
+ /**
+ * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+ * @param metadataNamespace Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database to which the Table belongs
+ * @param tableName Name of the Hive table
+ * @param isTemporaryTable is this a temporary table
+ * @return Unique qualified name to identify the Table instance in Atlas.
+ */
+ public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) {
+ String tableTempName = tableName;
+
+ if (isTemporaryTable) {
+ if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
+ tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
+ } else {
+ tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
+ }
+ }
+
+ return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace);
+ }
+
+ public static String getTableProcessQualifiedName(String metadataNamespace, Table table) {
+ String tableQualifiedName = getTableQualifiedName(metadataNamespace, table);
+ long createdTime = getTableCreatedTime(table);
+
+ return tableQualifiedName + SEP + createdTime;
+ }
+
+ public static String getStorageDescQFName(String tableQualifiedName) {
+ return tableQualifiedName + "_storage";
+ }
+
+ public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
+ final String[] parts = tableQualifiedName.split("@");
+ final String tableName = parts[0];
+ final String metadataNamespace = parts[1];
+
+ return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace);
+ }
+
+ public static long getTableCreatedTime(Table table) {
+ return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
+ }
+
+ private void clearRelationshipAttributes(AtlasEntitiesWithExtInfo entities) {
+ if (entities != null) {
+ if (entities.getEntities() != null) {
+ for (AtlasEntity entity : entities.getEntities()) {
+ clearRelationshipAttributes(entity);;
+ }
+ }
+
+ if (entities.getReferredEntities() != null) {
+ clearRelationshipAttributes(entities.getReferredEntities().values());
+ }
+ }
+ }
+
+ private void clearRelationshipAttributes(Collection<AtlasEntity> entities) {
+ if (entities != null) {
+ for (AtlasEntity entity : entities) {
+ clearRelationshipAttributes(entity);
+ }
+ }
+ }
+
+ private void clearRelationshipAttributes(AtlasEntity entity) {
+ if (entity != null && entity.getRelationshipAttributes() != null) {
+ entity.getRelationshipAttributes().clear();
+ }
+ }
+
+ private boolean isTableWithDatabaseName(String tableName) {
+ boolean ret = false;
+ if (tableName.contains(".")) {
+ ret = true;
+ }
+ return ret;
+ }
+
+ private String getGuid(String qualifiedName) {
+ return getGuid(false, qualifiedName);
+ }
+
+ private String getGuid(boolean isDBType, String qualifiedName) {
+ String guid = null;
+
+ if (qualifiedNameGuidMap.containsKey(qualifiedName)) {
+ guid = qualifiedNameGuidMap.get(qualifiedName);
+ } else if (isDBType) {
+ guid = getDBGuidFromAtlas(qualifiedName);
+ }
+
+ if (StringUtils.isBlank(guid)) {
+ guid = generateGuid();
+ }
+
+ return guid;
+ }
+
+ private String generateGuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ public void setStreamSize(long size) {
+ zipOutputStream.setComment(String.format(ZIP_FILE_COMMENT_FORMAT, size, -1));
+ }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 2c18704..cbc1aa9 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -17,7 +17,6 @@
*/
package org.apache.atlas.model.impexp;
-
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -50,10 +49,10 @@
public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
- public static final String START_POSITION_KEY = "startPosition";
+ public static final String START_POSITION_KEY = "startPosition";
+ public static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName";
- private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
private static final String OPTION_KEY_STREAM_SIZE = "size";
private Map<String, String> options;
diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
index 3430fda..1db5f20 100644
--- a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
+++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
@@ -21,12 +21,7 @@
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.atlas.model.AtlasBaseModelObject;
import org.apache.atlas.model.impexp.MigrationStatus;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.Serializable;
-import java.util.Date;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@@ -36,27 +31,43 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class MigrationImportStatus extends MigrationStatus {
private String name;
+ private String fileHash;
public MigrationImportStatus() {
}
public MigrationImportStatus(String name) {
- this.name = name;
+ this.name = name;
+ this.fileHash = name;
+ }
+
+ public MigrationImportStatus(String name, String fileHash) {
+ this.name = name;
+ this.fileHash = fileHash;
}
public String getName() {
return name;
}
+ public String getFileHash() {
+ return fileHash;
+ }
+
public void setName(String name) {
this.name = name;
}
+ public void setFileHash(String fileHash) {
+ this.fileHash = fileHash;
+ }
+
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(", name=").append(name);
- sb.append(super.toString());
+ final StringBuilder sb = new StringBuilder("MigrationImportStatus{");
+ sb.append("name='").append(name).append('\'');
+ sb.append(", fileHash='").append(fileHash).append('\'');
+ sb.append('}');
return sb.toString();
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
index a22c687..5b22f9c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -24,9 +24,12 @@
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.Date;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
@@ -50,7 +53,12 @@
public void init(String fileToImport) {
- this.status = new MigrationImportStatus(fileToImport);
+ try {
+ this.status = new MigrationImportStatus(fileToImport, DigestUtils.md5Hex(new FileInputStream(fileToImport)));
+ } catch (IOException e) {
+ LOG.error("Not able to create Migration status", e);
+ }
+
if (!this.migrationStatusVertexManagement.exists(fileToImport)) {
return;
}
@@ -59,21 +67,28 @@
}
public MigrationImportStatus getCreate(String fileName) {
- return getCreate(new MigrationImportStatus(fileName));
+ MigrationImportStatus create = null;
+ try {
+ create = getCreate(new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName))));
+ } catch (IOException e) {
+ LOG.error("Exception occurred while creating migration import", e);
+ }
+
+ return create;
}
public MigrationImportStatus getCreate(MigrationImportStatus status) {
try {
this.status = this.migrationStatusVertexManagement.createOrUpdate(status);
} catch (Exception ex) {
- LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex);
+ LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getFileHash(), ex);
}
return this.status;
}
public MigrationImportStatus getStatus() {
- if (this.status != null && this.migrationStatusVertexManagement.exists(this.status.getName())) {
+ if (this.status != null && this.migrationStatusVertexManagement.exists(this.status.getFileHash())) {
return getCreate(this.status);
}
@@ -89,8 +104,8 @@
return;
}
- MigrationImportStatus status = getByName(this.status.getName());
- this.migrationStatusVertexManagement.delete(status.getName());
+ MigrationImportStatus status = getByName(this.status.getFileHash());
+ this.migrationStatusVertexManagement.delete(status.getFileHash());
this.status = null;
}
@@ -118,7 +133,7 @@
}
public MigrationImportStatus createOrUpdate(MigrationImportStatus status) {
- this.vertex = findByNameInternal(status.getName());
+ this.vertex = findByNameInternal(status.getFileHash());
if (this.vertex == null) {
this.vertex = graph.addVertex();
@@ -192,7 +207,7 @@
private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) {
try {
- setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName());
+ setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getFileHash());
setEncodedProperty(vertex, PROPERTY_KEY_START_TIME,
(status.getStartTime() != null)
@@ -213,7 +228,7 @@
MigrationImportStatus ret = new MigrationImportStatus();
try {
- ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
+ ret.setFileHash(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class);
if (dateValue != null) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index bfb1148..2b3f179 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -26,7 +26,9 @@
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.atlas.repository.impexp.ZipExportFileNames;
import org.apache.atlas.type.AtlasType;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang.ArrayUtils;
@@ -196,7 +198,13 @@
}
private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
- MigrationImportStatus status = new MigrationImportStatus(fileName);
+ MigrationImportStatus status = null;
+ try {
+ status = new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName)));
+ } catch (IOException e) {
+ LOG.error("Exception occurred while creating migration import", e);
+ }
+
status.setTotalCount(streamSize);
MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
index f1dc990..6d368f4 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
@@ -19,13 +19,14 @@
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
-import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.Date;
import static org.testng.Assert.assertEquals;
@@ -38,12 +39,13 @@
AtlasGraph atlasGraph;
@Test
- public void createUpdateDelete() {
+ public void createUpdateDelete() throws IOException {
final String STATUS_DONE = "DONE";
DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph);
- MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip");
+ MigrationImportStatus expected = new MigrationImportStatus("DUMMY-HASH");
+
expected.setTotalCount(3333);
expected.setCurrentIndex(20);
expected.setStartTime(new Date());
@@ -51,7 +53,7 @@
MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected);
assertNotNull(ret);
- assertEquals(ret.getName(), expected.getName());
+ assertEquals(ret.getFileHash(), expected.getFileHash());
assertEquals(ret.getStartTime(), expected.getStartTime());
assertEquals(ret.getTotalCount(), expected.getTotalCount());
assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 135b94b..0580f7f 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -831,13 +831,17 @@
}
private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
- List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
-
Map<String, Object> optionMap = new HashMap<>();
optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
String params = AtlasJson.toJson(optionMap);
- auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+ if(result.getExportResult().getRequest() == null) {
+ int resultCount = result.getProcessedEntities().size();
+ auditService.add(AuditOperation.IMPORT, params, AtlasJson.toJson(result.getMetrics()), resultCount);
+ } else {
+ List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
+ auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+ }
}
private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {