ATLAS-4473: GlossaryTerms Bulk Create Performance Improvement
Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
index 9c84598..f81b538 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
@@ -18,6 +18,7 @@
package org.apache.atlas.glossary;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.bulkimport.BulkImportResponse;
@@ -30,6 +31,8 @@
import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader;
import org.apache.atlas.model.glossary.relations.AtlasTermCategorizationHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@@ -38,6 +41,7 @@
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -50,6 +54,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -64,9 +69,16 @@
@Service
public class GlossaryService {
- private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class);
- private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled();
- private static final String QUALIFIED_NAME_ATTR = "qualifiedName";
+ private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class);
+ private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled();
+ private static final String ATLAS_GLOSSARY_TERM = "AtlasGlossaryTerm";
+ private static final String NAME_ATTR = "name";
+ private static final String QUALIFIED_NAME_ATTR = "qualifiedName";
+ private static final String GLOSSARY_QUALIFIED_NAME_PROPERTY = "AtlasGlossary." + QUALIFIED_NAME_ATTR;
+ private static final String GLOSSARY_CATEGORY_NAME_PROPERTY = "AtlasGlossaryCategory.name";
+ private static final String GLOSSARY_TERM_NAME_PROPERTY = ATLAS_GLOSSARY_TERM + "." + NAME_ATTR;
+ private static final String TERM_UNIQUE_QUALIFIED_NAME_PROPERTY = ATLAS_GLOSSARY_TERM + ".__u_" + QUALIFIED_NAME_ATTR;
+ private static final String GLOSSARY_TERM_ANCHOR_EDGE_LABEL = "r:AtlasGlossaryTermAnchor";
private final DataAccess dataAccess;
private final GlossaryTermUtils glossaryTermUtils;
@@ -76,6 +88,9 @@
private static final char[] invalidNameChars = { '@', '.' };
+ private static final Map<String, String> glossaryGuidQualifiedNameCache = new HashMap<>();
+ private static final Map<String, String> categoryGuidNameCache = new HashMap<>();
+
@Inject
public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore,
final AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
@@ -327,65 +342,77 @@
}
@GraphTransaction
- public AtlasGlossaryTerm createTerm(AtlasGlossaryTerm glossaryTerm) throws AtlasBaseException {
+ public AtlasGlossaryTerm createTerm(AtlasGlossaryTerm term) throws AtlasBaseException {
if (DEBUG_ENABLED) {
- LOG.debug("==> GlossaryService.create({})", glossaryTerm);
+ LOG.debug("==> GlossaryService.create({})", term);
}
- if (Objects.isNull(glossaryTerm)) {
+
+ if (Objects.isNull(term)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "GlossaryTerm definition missing");
}
- if (Objects.isNull(glossaryTerm.getAnchor())) {
+
+ if (Objects.isNull(term.getAnchor())) {
throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ANCHOR);
}
- if (StringUtils.isEmpty(glossaryTerm.getName())) {
+
+ if (StringUtils.isEmpty(term.getName())) {
throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_QUALIFIED_NAME_CANT_BE_DERIVED);
}
- if (isNameInvalid(glossaryTerm.getName())){
+ if (isNameInvalid(term.getName())){
throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME);
} else {
// Derive the qualifiedName
- String anchorGlossaryGuid = glossaryTerm.getAnchor().getGlossaryGuid();
- AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(anchorGlossaryGuid));
- glossaryTerm.setQualifiedName(glossaryTerm.getName() + "@" + glossary.getQualifiedName());
+ String anchorGlossaryGuid = term.getAnchor().getGlossaryGuid();
+ String glossaryQualifiedName = getGlossaryQualifiedName(anchorGlossaryGuid);
+
+ if (StringUtils.isEmpty(glossaryQualifiedName)) {
+ throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_QUALIFIED_NAME_CANT_BE_DERIVED);
+ }
+
+ term.setQualifiedName(term.getName() + "@" + glossaryQualifiedName);
if (LOG.isDebugEnabled()) {
- LOG.debug("Derived qualifiedName = {}", glossaryTerm.getQualifiedName());
+ LOG.debug("Derived qualifiedName = {}", term.getQualifiedName());
}
}
// This might fail for the case when the term's qualifiedName has been updated and the duplicate request comes in with old name
- if (termExists(glossaryTerm)) {
- throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, glossaryTerm.getQualifiedName());
+ if (termExists2(term)) {
+ throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, term.getQualifiedName());
}
- AtlasGlossaryTerm storeObject = dataAccess.save(glossaryTerm);
- glossaryTermUtils.processTermRelations(storeObject, glossaryTerm, GlossaryUtils.RelationshipOperation.CREATE);
+ AtlasGlossaryTerm storeGlossaryTerm = dataAccess.save(term);
+
+ glossaryTermUtils.processTermRelations(storeGlossaryTerm, term, GlossaryUtils.RelationshipOperation.CREATE);
// Re-load term after handling relations
- storeObject = dataAccess.load(glossaryTerm);
- setInfoForRelations(storeObject);
+ storeGlossaryTerm = dataAccess.load(term);
+ setInfoForRelations(storeGlossaryTerm);
if (DEBUG_ENABLED) {
- LOG.debug("<== GlossaryService.create() : {}", storeObject);
+ LOG.debug("<== GlossaryService.create() : {}", storeGlossaryTerm);
}
- return storeObject;
+
+ return storeGlossaryTerm;
}
@GraphTransaction
- public List<AtlasGlossaryTerm> createTerms(List<AtlasGlossaryTerm> glossaryTerm) throws AtlasBaseException {
+ public List<AtlasGlossaryTerm> createTerms(List<AtlasGlossaryTerm> glossaryTerms) throws AtlasBaseException {
if (DEBUG_ENABLED) {
- LOG.debug("==> GlossaryService.create({})", glossaryTerm);
+ LOG.debug("==> GlossaryService.create({})", glossaryTerms);
}
- if (Objects.isNull(glossaryTerm)) {
+ if (Objects.isNull(glossaryTerms)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "glossaryTerm(s) is null/empty");
}
List<AtlasGlossaryTerm> ret = new ArrayList<>();
- for (AtlasGlossaryTerm atlasGlossaryTerm : glossaryTerm) {
- ret.add(createTerm(atlasGlossaryTerm));
+ for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) {
+ AtlasGlossaryTerm term = createTerm(glossaryTerm);
+
+ ret.add(term);
}
if (LOG.isDebugEnabled()) {
@@ -939,13 +966,64 @@
return Objects.nonNull(vertex);
}
+ private String getGlossaryQualifiedName(String glossaryGuid) {
+ String ret = glossaryGuidQualifiedNameCache.get(glossaryGuid);
+
+ if (StringUtils.isEmpty(ret)) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(glossaryGuid);
+
+ if (vertex != null) {
+ ret = vertex.getProperty(GLOSSARY_QUALIFIED_NAME_PROPERTY, String.class);
+
+ glossaryGuidQualifiedNameCache.put(glossaryGuid, ret);
+ }
+ }
+
+ return ret;
+ }
+
+ private String getGlossaryCategoryName(String glossaryCategoryGuid) {
+ String ret = categoryGuidNameCache.get(glossaryCategoryGuid);
+
+ if (StringUtils.isEmpty(ret)) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(glossaryCategoryGuid);
+
+ if (vertex != null) {
+ ret = vertex.getProperty(GLOSSARY_CATEGORY_NAME_PROPERTY, String.class);
+
+ categoryGuidNameCache.put(glossaryCategoryGuid, ret);
+ }
+ }
+
+ return ret;
+ }
+
private boolean termExists(AtlasGlossaryTerm term) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(atlasTypeRegistry.getEntityTypeByName(GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME), new HashMap<String, Object>() {{
put(QUALIFIED_NAME_ATTR, term.getQualifiedName());
}});
+
return Objects.nonNull(vertex);
}
+ private boolean termExists2(AtlasGlossaryTerm term) {
+ boolean ret = false;
+ AtlasVertex glossaryVertex = AtlasGraphUtilsV2.findByGuid(term.getAnchor().getGlossaryGuid());
+ Iterable<AtlasEdge> glossaryTermEdges = glossaryVertex.getEdges(AtlasEdgeDirection.OUT, GLOSSARY_TERM_ANCHOR_EDGE_LABEL);
+
+ for (Iterator<AtlasEdge> iter = glossaryTermEdges.iterator(); iter.hasNext(); ) {
+ AtlasVertex termVertex = iter.next().getInVertex();
+ String termQualifiedName = termVertex.getProperty(TERM_UNIQUE_QUALIFIED_NAME_PROPERTY, String.class);
+
+ if (StringUtils.equals(termQualifiedName, term.getQualifiedName())) {
+ ret = true;
+ break;
+ }
+ }
+
+ return ret;
+ }
+
private boolean categoryExists(AtlasGlossaryCategory category) {
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(atlasTypeRegistry.getEntityTypeByName(GlossaryUtils.ATLAS_GLOSSARY_CATEGORY_TYPENAME), new HashMap<String, Object>() {{
put(QUALIFIED_NAME_ATTR, category.getQualifiedName());
@@ -979,7 +1057,7 @@
private void setInfoForRelations(final AtlasGlossary ret) throws AtlasBaseException {
if (Objects.nonNull(ret.getTerms())) {
- setInfoForTerms(ret.getTerms());
+ setDisplayNameForTerms(ret.getTerms());
}
if (Objects.nonNull(ret.getCategories())) {
@@ -987,13 +1065,12 @@
}
}
- private void setInfoForRelations(final AtlasGlossaryTerm ret) throws AtlasBaseException {
- if (Objects.nonNull(ret.getCategories())) {
- setDisplayNameForTermCategories(ret.getCategories());
- }
- if (Objects.nonNull(ret.getRelatedTerms())) {
- for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : ret.getRelatedTerms().entrySet()) {
- setInfoForTerms(entry.getValue());
+ private void setInfoForRelations(final AtlasGlossaryTerm glossaryTerm) throws AtlasBaseException {
+ setDisplayNameForCategories(glossaryTerm.getCategories());
+
+ if (Objects.nonNull(glossaryTerm.getRelatedTerms())) {
+ for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : glossaryTerm.getRelatedTerms().entrySet()) {
+ setDisplayNameForTerms(entry.getValue());
}
}
}
@@ -1003,18 +1080,23 @@
setInfoForRelatedCategories(glossaryCategory.getChildrenCategories());
}
if (Objects.nonNull(glossaryCategory.getTerms())) {
- setInfoForTerms(glossaryCategory.getTerms());
+ setDisplayNameForTerms(glossaryCategory.getTerms());
}
}
- private void setDisplayNameForTermCategories(final Set<AtlasTermCategorizationHeader> categorizationHeaders) throws AtlasBaseException {
- List<AtlasGlossaryCategory> categories = categorizationHeaders
- .stream()
- .map(id -> getAtlasGlossaryCategorySkeleton(id.getCategoryGuid()))
- .collect(Collectors.toList());
- Map<String, AtlasGlossaryCategory> categoryMap = new HashMap<>();
- dataAccess.load(categories).forEach(c -> categoryMap.put(c.getGuid(), c));
- categorizationHeaders.forEach(c -> c.setDisplayText(categoryMap.get(c.getCategoryGuid()).getName()));
+ private void setDisplayNameForCategories(final Set<AtlasTermCategorizationHeader> categorizationHeaders) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(categorizationHeaders)) {
+ return;
+ }
+
+ for (AtlasTermCategorizationHeader termCategorizationHeader : categorizationHeaders) {
+ String categoryGuid = termCategorizationHeader.getCategoryGuid();
+ String categoryName = getGlossaryCategoryName(categoryGuid);
+
+ if (StringUtils.isNotEmpty(categoryName)) {
+ termCategorizationHeader.setDisplayText(categoryName);
+ }
+ }
}
private void setInfoForRelatedCategories(final Collection<AtlasRelatedCategoryHeader> categoryHeaders) throws AtlasBaseException {
@@ -1033,15 +1115,17 @@
}
}
- private void setInfoForTerms(final Collection<AtlasRelatedTermHeader> termHeaders) throws AtlasBaseException {
- List<AtlasGlossaryTerm> terms = termHeaders
- .stream()
- .map(id -> getAtlasGlossaryTermSkeleton(id.getTermGuid()))
- .collect(Collectors.toList());
- Map<String, AtlasGlossaryTerm> termMap = new HashMap<>();
- dataAccess.load(terms).iterator().forEachRemaining(t -> termMap.put(t.getGuid(), t));
+ private void setDisplayNameForTerms(final Collection<AtlasRelatedTermHeader> termHeaders) throws AtlasBaseException {
- termHeaders.forEach(t -> t.setDisplayText(getDisplayText(termMap.get(t.getTermGuid()))));
+ for (AtlasRelatedTermHeader termHeader : termHeaders) {
+ String termGuid = termHeader.getTermGuid();
+ AtlasVertex termVertex = AtlasGraphUtilsV2.findByGuid(termGuid);
+ String termDisplayText = termVertex.getProperty(GLOSSARY_TERM_NAME_PROPERTY, String.class);
+
+ if (StringUtils.isNotEmpty(termDisplayText)) {
+ termHeader.setDisplayText(termDisplayText);
+ }
+ }
}
public static boolean isNameInvalid(String name) {
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
index d92daee..8a161e4 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
@@ -18,6 +18,7 @@
package org.apache.atlas.glossary;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.exception.AtlasBaseException;
@@ -37,6 +38,7 @@
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.FileUtils;
+import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.ArrayUtils;
@@ -161,58 +163,62 @@
return StringUtils.equals(relatedObjectId.getRelationshipGuid(), storeObject.getRelationshipGuid());
}
- private void processTermAnchor(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
+ private void processTermAnchor(AtlasGlossaryTerm currentTerm, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
if (Objects.isNull(updatedTerm.getAnchor()) && op != RelationshipOperation.DELETE) {
throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ANCHOR);
}
- AtlasGlossaryHeader existingAnchor = storeObject.getAnchor();
- AtlasGlossaryHeader updatedTermAnchor = updatedTerm.getAnchor();
+ AtlasGlossaryHeader currentTermGlossary = currentTerm.getAnchor(); // glossary_g1
+ AtlasGlossaryHeader updatedTermGlossary = updatedTerm.getAnchor(); // glossary_g2
+ String updatedTermGlossaryGuid = updatedTermGlossary.getGlossaryGuid();
+ String currentTermGlossaryGuid = currentTermGlossary.getGlossaryGuid();
switch (op) {
case CREATE:
- if (Objects.isNull(updatedTermAnchor.getGlossaryGuid())) {
+ if (Objects.isNull(updatedTermGlossaryGuid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_NEW_ANCHOR_GUID);
} else {
if (DEBUG_ENABLED) {
- LOG.debug("Creating new term anchor, category = {}, glossary = {}", storeObject.getGuid(), updatedTerm.getAnchor().getGlossaryGuid());
+ LOG.debug("Creating new term anchor, category = {}, glossary = {}", currentTerm.getGuid(), updatedTerm.getAnchor().getGlossaryGuid());
}
- createRelationship(defineTermAnchorRelation(updatedTermAnchor.getGlossaryGuid(), storeObject.getGuid()));
+ if (!StringUtils.equals(updatedTermGlossaryGuid, currentTermGlossaryGuid)) {
+ createRelationship(defineTermAnchorRelation(updatedTermGlossaryGuid, currentTerm.getGuid()));
+ }
}
break;
case UPDATE:
- if (!Objects.equals(updatedTermAnchor, existingAnchor)) {
- if (Objects.isNull(updatedTermAnchor.getGlossaryGuid())) {
+ if (!Objects.equals(updatedTermGlossary, currentTermGlossary)) {
+ if (Objects.isNull(updatedTermGlossaryGuid)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_NEW_ANCHOR_GUID);
}
if (DEBUG_ENABLED) {
LOG.debug("Updating term anchor, currAnchor = {}, newAnchor = {} and term = {}",
- existingAnchor.getGlossaryGuid(),
- updatedTermAnchor.getGlossaryGuid(),
- storeObject.getName());
+ currentTermGlossaryGuid,
+ updatedTermGlossaryGuid,
+ currentTerm.getName());
}
- relationshipStore.deleteById(existingAnchor.getRelationGuid(), true);
+ relationshipStore.deleteById(currentTermGlossary.getRelationGuid(), true);
// Derive the qualifiedName when anchor changes
- String anchorGlossaryGuid = updatedTermAnchor.getGlossaryGuid();
+ String anchorGlossaryGuid = updatedTermGlossaryGuid;
AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(anchorGlossaryGuid));
- storeObject.setQualifiedName(storeObject.getName() + "@" + glossary.getQualifiedName());
+ currentTerm.setQualifiedName(currentTerm.getName() + "@" + glossary.getQualifiedName());
if (LOG.isDebugEnabled()) {
- LOG.debug("Derived qualifiedName = {}", storeObject.getQualifiedName());
+ LOG.debug("Derived qualifiedName = {}", currentTerm.getQualifiedName());
}
- createRelationship(defineTermAnchorRelation(updatedTermAnchor.getGlossaryGuid(), storeObject.getGuid()));
+ createRelationship(defineTermAnchorRelation(updatedTermGlossaryGuid, currentTerm.getGuid()));
}
break;
case DELETE:
- if (Objects.nonNull(existingAnchor)) {
+ if (Objects.nonNull(currentTermGlossary)) {
if (DEBUG_ENABLED) {
LOG.debug("Deleting term anchor");
}
- relationshipStore.deleteById(existingAnchor.getRelationGuid(), true);
+ relationshipStore.deleteById(currentTermGlossary.getRelationGuid(), true);
}
break;
}
@@ -221,6 +227,7 @@
private void processRelatedTerms(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> newRelatedTerms = updatedTerm.getRelatedTerms();
Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> existingRelatedTerms = storeObject.getRelatedTerms();
+
switch (op) {
case CREATE:
for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : newRelatedTerms.entrySet()) {
@@ -313,6 +320,7 @@
private void processAssociatedCategories(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
Map<String, AtlasTermCategorizationHeader> newCategories = getAssociatedCategories(updatedTerm);
Map<String, AtlasTermCategorizationHeader> existingCategories = getAssociatedCategories(storeObject);
+
switch (op) {
case CREATE:
if (Objects.nonNull(newCategories)) {