ATLAS-3825: updated import business-metadata to set business attribute values on appropriate entities

(cherry picked from commit 86ba342fe6baee08c6aaa20e50de4c3b8e2ae873)
diff --git a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
index 0ee54e9..047d497 100644
--- a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
+++ b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
@@ -95,6 +95,10 @@
             this(parentObjectName, childObjectName, importStatus, "",-1);
         }
 
+        public ImportInfo( ImportStatus importStatus, String remarks) {
+            this("","", importStatus, remarks, -1);
+        }
+
         public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) {
             this("","", importStatus, remarks, rowNumber);
         }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 89076c1..bf1629c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -41,7 +41,6 @@
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -1551,28 +1550,33 @@
     @GraphTransaction
     public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException {
         BulkImportResponse ret = new BulkImportResponse();
+
         try {
             if (StringUtils.isBlank(fileName)) {
                 throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName);
             }
-            List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
 
+            List<String[]>           fileData              = FileUtils.readFileData(fileName, inputStream);
             Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret);
 
-            for (Map.Entry<String, AtlasEntity> entry : attributesToAssociate.entrySet()) {
-                AtlasEntity entity = entry.getValue();
-                try{
+            for (AtlasEntity entity : attributesToAssociate.values()) {
+                try {
                     addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true);
-                    BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString());
+
+                    BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString());
+
                     ret.setSuccessImportInfoList(successImportInfo);
-                }catch(Exception e){
-                    LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity "+entity.getAttribute(Constants.QUALIFIED_NAME).toString());
-                    BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
-                    ret.setFailedImportInfoList(failedImportInfo);
+                }catch (Exception e) {
+                    LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + entity.getGuid());
+
+                    BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
+
+                    ret.getFailedImportInfoList().add(failedImportInfo);
                 }
             }
         } catch (IOException e) {
-            LOG.error("An Exception occurred while uploading the file : "+e.getMessage());
+            LOG.error("An Exception occurred while uploading the file {}", fileName, e);
+
             throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e);
         }
 
@@ -1580,105 +1584,133 @@
     }
 
     private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
-        Map<String, AtlasEntity> ret = new HashMap<>();
-        Map<String, Map<String, Object>> newBMAttributes = new HashMap<>();
-        Map<String, AtlasVertex> vertexCache = new HashMap<>();
-        Map<String, Object> attribute = new HashMap<>();
+        Map<String, AtlasEntity> ret           = new HashMap<>();
+        Map<String, AtlasVertex> vertexCache   = new HashMap<>();
+        List<String>             failedMsgList = new ArrayList<>();
 
         for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) {
-            List<String> failedTermMsgList = new ArrayList<>();
-            AtlasEntity atlasEntity = new AtlasEntity();
             String[] record = fileData.get(lineIndex);
-            if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) {
-                continue;
-            }
-            String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
-            String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
-            String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
-            String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
-            String uniqueAttrName = Constants.QUALIFIED_NAME;
-            if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
-                uniqueAttrName = typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
-            }
 
-            if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) {
+            boolean missingFields = record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX ||
+                                    StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) ||
+                                    StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) ||
+                                    StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) ||
+                                    StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]);
+
+            if (missingFields){
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": missing fields. " + Arrays.toString(record));
+
                 continue;
             }
 
-            String vertexKey = typeName + "_" + uniqueAttrName + "_" + uniqueAttrValue;
-            AtlasVertex atlasVertex = vertexCache.get(vertexKey);
-            if (atlasVertex == null) {
-                atlasVertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttrName, uniqueAttrValue);
-            }
-
-            if (atlasVertex == null) {
-                LOG.error("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
-                failedTermMsgList.add("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
-            }
-
-            vertexCache.put(vertexKey, atlasVertex);
-            String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
-            if (validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1)) {
-                continue;
-            }
-
-            String bMName = businessAttributeName[0];
-            String bMAttributeName = businessAttributeName[1];
+            String          typeName   = record[FileUtils.TYPENAME_COLUMN_INDEX];
             AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-            if (validateBMAttribute(uniqueAttrValue, businessAttributeName, entityType, bulkImportResponse,lineIndex+1)) {
+
+            if (entityType == null) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid entity-type '" + typeName + "'");
+
                 continue;
             }
 
-            AtlasBusinessAttribute atlasBusinessAttribute = entityType.getBusinessAttributes().get(bMName).get(bMAttributeName);
-            if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
-                AtlasArrayType arrayType = (AtlasArrayType) atlasBusinessAttribute.getAttributeType();
-                List attributeValueData;
+            String uniqueAttrValue  = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
+            String bmAttribute      = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
+            String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
+            String uniqueAttrName   = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
 
-                if(arrayType.getElementType() instanceof AtlasEnumType){
-                    attributeValueData = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedTermMsgList, lineIndex+1);
-                }else{
-                    attributeValueData = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedTermMsgList, lineIndex+1);
+            if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
+                uniqueAttrName = record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
+            }
+
+            AtlasAttribute uniqueAttribute = entityType.getAttribute(uniqueAttrName);
+
+            if (uniqueAttribute == null) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' not found in entity-type '" + typeName + "'");
+
+                continue;
+            }
+
+            if (!uniqueAttribute.getAttributeDef().getIsUnique()) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' is not an unique attribute in entity-type '" + typeName + "'");
+
+                continue;
+            }
+
+            String      vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue;
+            AtlasVertex vertex    = vertexCache.get(vertexKey);
+
+            if (vertex == null) {
+                vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue);
+
+                if (vertex == null) {
+                    failedMsgList.add("Line #" + (lineIndex + 1) + ": no " + typeName + " entity found with " + uniqueAttrName + "=" + uniqueAttrValue);
+
+                    continue;
                 }
-                attribute.put(bmAttribute, attributeValueData);
-            } else {
-                attribute.put(bmAttribute, bmAttributeValue);
+
+                vertexCache.put(vertexKey, vertex);
             }
 
-            if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, bulkImportResponse, lineIndex+1)) {
+            AtlasBusinessAttribute businessAttribute = entityType.getBusinesAAttribute(bmAttribute);
+
+            if (businessAttribute == null) {
+                failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid business attribute name '" + bmAttribute + "'");
+
                 continue;
             }
 
-            if(ret.containsKey(vertexKey)) {
-                atlasEntity = ret.get(vertexKey);
-                atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
-                ret.put(vertexKey, atlasEntity);
+            final Object attrValue;
+
+            if (businessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
+                AtlasArrayType arrayType = (AtlasArrayType) businessAttribute.getAttributeType();
+                List           arrayValue;
+
+                if (arrayType.getElementType() instanceof AtlasEnumType) {
+                    arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex+1);
+                } else {
+                    arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex+1);
+                }
+
+                attrValue = arrayValue;
             } else {
-                String guid = GraphHelper.getGuid(atlasVertex);
-                atlasEntity.setGuid(guid);
-                atlasEntity.setTypeName(typeName);
-                atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue);
-                newBMAttributes = entityRetriever.getBusinessMetadata(atlasVertex) != null ? entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes;
-                atlasEntity.setBusinessAttributes(newBMAttributes);
-                atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
-                ret.put(vertexKey, atlasEntity);
+                attrValue = bmAttributeValue;
+            }
+
+            if (ret.containsKey(vertexKey)) {
+                AtlasEntity entity = ret.get(vertexKey);
+
+                entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue);
+            } else {
+                AtlasEntity                      entity             = new AtlasEntity();
+                String                           guid               = GraphHelper.getGuid(vertex);
+                Map<String, Map<String, Object>> businessAttributes = entityRetriever.getBusinessMetadata(vertex);
+
+                entity.setGuid(guid);
+                entity.setTypeName(typeName);
+                entity.setAttribute(uniqueAttribute.getName(), uniqueAttrValue);
+
+                if (businessAttributes == null) {
+                    businessAttributes = new HashMap<>();
+                }
+
+                entity.setBusinessAttributes(businessAttributes);
+                entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue);
+
+                ret.put(vertexKey, entity);
             }
         }
-        return ret;
-    }
 
-    private boolean validateTypeName(String typeName, BulkImportResponse bulkImportResponse, int lineIndex) {
-        boolean ret = false;
+        for (String failedMsg : failedMsgList) {
+            LOG.error(failedMsg);
 
-        if(!typeRegistry.getAllEntityDefNames().contains(typeName)){
-            ret = true;
-            LOG.error("Invalid entity-type: " + typeName + " at line #" + lineIndex);
-            String failedTermMsgs = "Invalid entity-type: " + typeName + " at line #" + lineIndex;
-            BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
+            BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedMsg);
+
             bulkImportResponse.getFailedImportInfoList().add(importInfo);
         }
+
         return ret;
     }
 
+
     private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) {
 
         String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
@@ -1731,44 +1763,4 @@
         }
         return missingFieldsCheck;
     }
-
-    private boolean validateBMAttributeName(String uniqueAttrValue, String bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) {
-        boolean ret = false;
-        String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
-        if(businessAttributeName.length < 2){
-            LOG.error("Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex);
-            String failedTermMsgs = "Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex;
-            BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute,  BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
-            bulkImportResponse.getFailedImportInfoList().add(importInfo);
-            ret = true;
-        }
-        return ret;
-    }
-
-    private boolean validateBMAttribute(String uniqueAttrValue,String[] businessAttributeName, AtlasEntityType entityType, BulkImportResponse bulkImportResponse, int lineIndex) {
-        boolean ret = false;
-        String bMName = businessAttributeName[0];
-        String bMAttributeName = businessAttributeName[1];
-
-        if(entityType.getBusinessAttributes(bMName) == null ||
-                entityType.getBusinessAttributes(bMName).get(bMAttributeName) == null){
-            ret = true;
-            LOG.error("Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex);
-            String failedTermMsgs = "Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex;
-            BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
-            bulkImportResponse.getFailedImportInfoList().add(importInfo);
-        }
-        return ret;
-    }
-
-    private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int lineIndex) {
-        boolean ret = false;
-        if(!failedTermMsgList.isEmpty()){
-            ret = true;
-            String failedTermMsg = StringUtils.join(failedTermMsgList, "\n");
-            BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex);
-            bulkImportResponse.getFailedImportInfoList().add(importInfo);
-        }
-        return ret;
-    }
 }