ATLAS-3825: updated import business-metadata to set business attribute values on appropriate entities
(cherry picked from commit 86ba342fe6baee08c6aaa20e50de4c3b8e2ae873)
diff --git a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
index 0ee54e9..047d497 100644
--- a/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
+++ b/intg/src/main/java/org/apache/atlas/bulkimport/BulkImportResponse.java
@@ -95,6 +95,10 @@
this(parentObjectName, childObjectName, importStatus, "",-1);
}
+ public ImportInfo( ImportStatus importStatus, String remarks) {
+ this("","", importStatus, remarks, -1);
+ }
+
public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) {
this("","", importStatus, remarks, rowNumber);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 89076c1..bf1629c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -41,7 +41,6 @@
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
-import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -1551,28 +1550,33 @@
@GraphTransaction
public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream inputStream, String fileName) throws AtlasBaseException {
BulkImportResponse ret = new BulkImportResponse();
+
try {
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName);
}
- List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
+ List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret);
- for (Map.Entry<String, AtlasEntity> entry : attributesToAssociate.entrySet()) {
- AtlasEntity entity = entry.getValue();
- try{
+ for (AtlasEntity entity : attributesToAssociate.values()) {
+ try {
addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true);
- BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString());
+
+ BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString());
+
ret.setSuccessImportInfoList(successImportInfo);
- }catch(Exception e){
- LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity "+entity.getAttribute(Constants.QUALIFIED_NAME).toString());
- BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getAttribute(Constants.QUALIFIED_NAME).toString(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
- ret.setFailedImportInfoList(failedImportInfo);
+ }catch (Exception e) {
+ LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + entity.getGuid());
+
+ BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
+
+ ret.getFailedImportInfoList().add(failedImportInfo);
}
}
} catch (IOException e) {
- LOG.error("An Exception occurred while uploading the file : "+e.getMessage());
+ LOG.error("An Exception occurred while uploading the file {}", fileName, e);
+
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, e);
}
@@ -1580,105 +1584,133 @@
}
private Map<String, AtlasEntity> getBusinessMetadataDefList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
- Map<String, AtlasEntity> ret = new HashMap<>();
- Map<String, Map<String, Object>> newBMAttributes = new HashMap<>();
- Map<String, AtlasVertex> vertexCache = new HashMap<>();
- Map<String, Object> attribute = new HashMap<>();
+ Map<String, AtlasEntity> ret = new HashMap<>();
+ Map<String, AtlasVertex> vertexCache = new HashMap<>();
+ List<String> failedMsgList = new ArrayList<>();
for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) {
- List<String> failedTermMsgList = new ArrayList<>();
- AtlasEntity atlasEntity = new AtlasEntity();
String[] record = fileData.get(lineIndex);
- if (missingFieldsCheck(record, bulkImportResponse, lineIndex+1)) {
- continue;
- }
- String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
- String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
- String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
- String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
- String uniqueAttrName = Constants.QUALIFIED_NAME;
- if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
- uniqueAttrName = typeName+"."+record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
- }
- if (validateTypeName(typeName, bulkImportResponse, lineIndex+1)) {
+ boolean missingFields = record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX ||
+ StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) ||
+ StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) ||
+ StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) ||
+ StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]);
+
+ if (missingFields){
+ failedMsgList.add("Line #" + (lineIndex + 1) + ": missing fields. " + Arrays.toString(record));
+
continue;
}
- String vertexKey = typeName + "_" + uniqueAttrName + "_" + uniqueAttrValue;
- AtlasVertex atlasVertex = vertexCache.get(vertexKey);
- if (atlasVertex == null) {
- atlasVertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttrName, uniqueAttrValue);
- }
-
- if (atlasVertex == null) {
- LOG.error("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
- failedTermMsgList.add("Provided UniqueAttributeValue is not valid : " + uniqueAttrValue + " at line #" + (lineIndex + 1));
- }
-
- vertexCache.put(vertexKey, atlasVertex);
- String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
- if (validateBMAttributeName(uniqueAttrValue,bmAttribute,bulkImportResponse,lineIndex+1)) {
- continue;
- }
-
- String bMName = businessAttributeName[0];
- String bMAttributeName = businessAttributeName[1];
+ String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX];
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
- if (validateBMAttribute(uniqueAttrValue, businessAttributeName, entityType, bulkImportResponse,lineIndex+1)) {
+
+ if (entityType == null) {
+ failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid entity-type '" + typeName + "'");
+
continue;
}
- AtlasBusinessAttribute atlasBusinessAttribute = entityType.getBusinessAttributes().get(bMName).get(bMAttributeName);
- if (atlasBusinessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
- AtlasArrayType arrayType = (AtlasArrayType) atlasBusinessAttribute.getAttributeType();
- List attributeValueData;
+ String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX];
+ String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX];
+ String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX];
+ String uniqueAttrName = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
- if(arrayType.getElementType() instanceof AtlasEnumType){
- attributeValueData = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedTermMsgList, lineIndex+1);
- }else{
- attributeValueData = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedTermMsgList, lineIndex+1);
+ if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) {
+ uniqueAttrName = record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX];
+ }
+
+ AtlasAttribute uniqueAttribute = entityType.getAttribute(uniqueAttrName);
+
+ if (uniqueAttribute == null) {
+ failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' not found in entity-type '" + typeName + "'");
+
+ continue;
+ }
+
+ if (!uniqueAttribute.getAttributeDef().getIsUnique()) {
+ failedMsgList.add("Line #" + (lineIndex + 1) + ": attribute '" + uniqueAttrName + "' is not an unique attribute in entity-type '" + typeName + "'");
+
+ continue;
+ }
+
+ String vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue;
+ AtlasVertex vertex = vertexCache.get(vertexKey);
+
+ if (vertex == null) {
+ vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue);
+
+ if (vertex == null) {
+ failedMsgList.add("Line #" + (lineIndex + 1) + ": no " + typeName + " entity found with " + uniqueAttrName + "=" + uniqueAttrValue);
+
+ continue;
}
- attribute.put(bmAttribute, attributeValueData);
- } else {
- attribute.put(bmAttribute, bmAttributeValue);
+
+ vertexCache.put(vertexKey, vertex);
}
- if(failedMsgCheck(uniqueAttrValue,bmAttribute, failedTermMsgList, bulkImportResponse, lineIndex+1)) {
+ AtlasBusinessAttribute businessAttribute = entityType.getBusinesAAttribute(bmAttribute);
+
+ if (businessAttribute == null) {
+ failedMsgList.add("Line #" + (lineIndex + 1) + ": invalid business attribute name '" + bmAttribute + "'");
+
continue;
}
- if(ret.containsKey(vertexKey)) {
- atlasEntity = ret.get(vertexKey);
- atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
- ret.put(vertexKey, atlasEntity);
+ final Object attrValue;
+
+ if (businessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) {
+ AtlasArrayType arrayType = (AtlasArrayType) businessAttribute.getAttributeType();
+ List arrayValue;
+
+ if (arrayType.getElementType() instanceof AtlasEnumType) {
+ arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex+1);
+ } else {
+ arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex+1);
+ }
+
+ attrValue = arrayValue;
} else {
- String guid = GraphHelper.getGuid(atlasVertex);
- atlasEntity.setGuid(guid);
- atlasEntity.setTypeName(typeName);
- atlasEntity.setAttribute(Constants.QUALIFIED_NAME,uniqueAttrValue);
- newBMAttributes = entityRetriever.getBusinessMetadata(atlasVertex) != null ? entityRetriever.getBusinessMetadata(atlasVertex) : newBMAttributes;
- atlasEntity.setBusinessAttributes(newBMAttributes);
- atlasEntity.setBusinessAttribute(bMName, bMAttributeName, attribute.get(bmAttribute));
- ret.put(vertexKey, atlasEntity);
+ attrValue = bmAttributeValue;
+ }
+
+ if (ret.containsKey(vertexKey)) {
+ AtlasEntity entity = ret.get(vertexKey);
+
+ entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue);
+ } else {
+ AtlasEntity entity = new AtlasEntity();
+ String guid = GraphHelper.getGuid(vertex);
+ Map<String, Map<String, Object>> businessAttributes = entityRetriever.getBusinessMetadata(vertex);
+
+ entity.setGuid(guid);
+ entity.setTypeName(typeName);
+ entity.setAttribute(uniqueAttribute.getName(), uniqueAttrValue);
+
+ if (businessAttributes == null) {
+ businessAttributes = new HashMap<>();
+ }
+
+ entity.setBusinessAttributes(businessAttributes);
+ entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue);
+
+ ret.put(vertexKey, entity);
}
}
- return ret;
- }
- private boolean validateTypeName(String typeName, BulkImportResponse bulkImportResponse, int lineIndex) {
- boolean ret = false;
+ for (String failedMsg : failedMsgList) {
+ LOG.error(failedMsg);
- if(!typeRegistry.getAllEntityDefNames().contains(typeName)){
- ret = true;
- LOG.error("Invalid entity-type: " + typeName + " at line #" + lineIndex);
- String failedTermMsgs = "Invalid entity-type: " + typeName + " at line #" + lineIndex;
- BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
+ BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedMsg);
+
bulkImportResponse.getFailedImportInfoList().add(importInfo);
}
+
return ret;
}
+
private List assignMultipleValues(String bmAttributeValues, String elementTypeName, List failedTermMsgList, int lineIndex) {
String[] arr = bmAttributeValues.split(FileUtils.ESCAPE_CHARACTER + FileUtils.PIPE_CHARACTER);
@@ -1731,44 +1763,4 @@
}
return missingFieldsCheck;
}
-
- private boolean validateBMAttributeName(String uniqueAttrValue, String bmAttribute, BulkImportResponse bulkImportResponse, int lineIndex) {
- boolean ret = false;
- String[] businessAttributeName = bmAttribute.split(FileUtils.ESCAPE_CHARACTER + ".");
- if(businessAttributeName.length < 2){
- LOG.error("Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex);
- String failedTermMsgs = "Provided businessAttributeName is not in proper format : " + bmAttribute + " at line #" + lineIndex;
- BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
- bulkImportResponse.getFailedImportInfoList().add(importInfo);
- ret = true;
- }
- return ret;
- }
-
- private boolean validateBMAttribute(String uniqueAttrValue,String[] businessAttributeName, AtlasEntityType entityType, BulkImportResponse bulkImportResponse, int lineIndex) {
- boolean ret = false;
- String bMName = businessAttributeName[0];
- String bMAttributeName = businessAttributeName[1];
-
- if(entityType.getBusinessAttributes(bMName) == null ||
- entityType.getBusinessAttributes(bMName).get(bMAttributeName) == null){
- ret = true;
- LOG.error("Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex);
- String failedTermMsgs = "Provided businessAttributeName is not valid : " + bMName+"."+bMAttributeName + " at line #" + lineIndex;
- BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bMName+"."+bMAttributeName, BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
- bulkImportResponse.getFailedImportInfoList().add(importInfo);
- }
- return ret;
- }
-
- private boolean failedMsgCheck(String uniqueAttrValue, String bmAttribute, List<String> failedTermMsgList,BulkImportResponse bulkImportResponse,int lineIndex) {
- boolean ret = false;
- if(!failedTermMsgList.isEmpty()){
- ret = true;
- String failedTermMsg = StringUtils.join(failedTermMsgList, "\n");
- BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(uniqueAttrValue, bmAttribute, BulkImportResponse.ImportStatus.FAILED, failedTermMsg, lineIndex);
- bulkImportResponse.getFailedImportInfoList().add(importInfo);
- }
- return ret;
- }
}