ATLAS-4473: GlossaryTerms Bulk Create Performance Improvement

Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
index 9c84598..f81b538 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.glossary;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.SortOrder;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.bulkimport.BulkImportResponse;
@@ -30,6 +31,8 @@
 import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader;
 import org.apache.atlas.model.glossary.relations.AtlasTermCategorizationHeader;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.ogm.DataAccess;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@@ -38,6 +41,7 @@
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.FileUtils;
 import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -50,6 +54,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -64,9 +69,16 @@
 
 @Service
 public class GlossaryService {
-    private static final Logger  LOG                 = LoggerFactory.getLogger(GlossaryService.class);
-    private static final boolean DEBUG_ENABLED       = LOG.isDebugEnabled();
-    private static final String  QUALIFIED_NAME_ATTR = "qualifiedName";
+    private static final Logger  LOG                              = LoggerFactory.getLogger(GlossaryService.class);
+    private static final boolean DEBUG_ENABLED                    = LOG.isDebugEnabled();
+    private static final String  ATLAS_GLOSSARY_TERM              = "AtlasGlossaryTerm";
+    private static final String  NAME_ATTR                        = "name";
+    private static final String  QUALIFIED_NAME_ATTR              = "qualifiedName";
+    private static final String  GLOSSARY_QUALIFIED_NAME_PROPERTY = "AtlasGlossary." + QUALIFIED_NAME_ATTR;
+    private static final String  GLOSSARY_CATEGORY_NAME_PROPERTY  = "AtlasGlossaryCategory.name";
+    private static final String  GLOSSARY_TERM_NAME_PROPERTY      = ATLAS_GLOSSARY_TERM + "." + NAME_ATTR;
+    private static final String  TERM_UNIQUE_QUALIFIED_NAME_PROPERTY = ATLAS_GLOSSARY_TERM + ".__u_" + QUALIFIED_NAME_ATTR;
+    private static final String  GLOSSARY_TERM_ANCHOR_EDGE_LABEL  = "r:AtlasGlossaryTermAnchor";
 
     private final DataAccess                dataAccess;
     private final GlossaryTermUtils         glossaryTermUtils;
@@ -76,6 +88,9 @@
 
     private static final char[] invalidNameChars = { '@', '.' };
 
+    private static final Map<String, String> glossaryGuidQualifiedNameCache = new HashMap<>();
+    private static final Map<String, String> categoryGuidNameCache          = new HashMap<>();
+
     @Inject
     public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore,
                            final AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
@@ -327,65 +342,77 @@
     }
 
     @GraphTransaction
-    public AtlasGlossaryTerm createTerm(AtlasGlossaryTerm glossaryTerm) throws AtlasBaseException {
+    public AtlasGlossaryTerm createTerm(AtlasGlossaryTerm term) throws AtlasBaseException {
         if (DEBUG_ENABLED) {
-            LOG.debug("==> GlossaryService.create({})", glossaryTerm);
+            LOG.debug("==> GlossaryService.create({})", term);
         }
-        if (Objects.isNull(glossaryTerm)) {
+
+        if (Objects.isNull(term)) {
             throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "GlossaryTerm definition missing");
         }
-        if (Objects.isNull(glossaryTerm.getAnchor())) {
+
+        if (Objects.isNull(term.getAnchor())) {
             throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ANCHOR);
         }
-        if (StringUtils.isEmpty(glossaryTerm.getName())) {
+
+        if (StringUtils.isEmpty(term.getName())) {
             throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_QUALIFIED_NAME_CANT_BE_DERIVED);
         }
 
-        if (isNameInvalid(glossaryTerm.getName())){
+        if (isNameInvalid(term.getName())){
             throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME);
         } else {
             // Derive the qualifiedName
-            String        anchorGlossaryGuid = glossaryTerm.getAnchor().getGlossaryGuid();
-            AtlasGlossary glossary           = dataAccess.load(getGlossarySkeleton(anchorGlossaryGuid));
-            glossaryTerm.setQualifiedName(glossaryTerm.getName() + "@" + glossary.getQualifiedName());
+            String anchorGlossaryGuid    = term.getAnchor().getGlossaryGuid();
+            String glossaryQualifiedName = getGlossaryQualifiedName(anchorGlossaryGuid);
+
+            if (StringUtils.isEmpty(glossaryQualifiedName)) {
+                throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_QUALIFIED_NAME_CANT_BE_DERIVED);
+            }
+
+            term.setQualifiedName(term.getName() + "@" + glossaryQualifiedName);
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Derived qualifiedName = {}", glossaryTerm.getQualifiedName());
+                LOG.debug("Derived qualifiedName = {}", term.getQualifiedName());
             }
         }
 
         // This might fail for the case when the term's qualifiedName has been updated and the duplicate request comes in with old name
-        if (termExists(glossaryTerm)) {
-            throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, glossaryTerm.getQualifiedName());
+        if (termExists2(term)) {
+            throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, term.getQualifiedName());
         }
 
-        AtlasGlossaryTerm storeObject = dataAccess.save(glossaryTerm);
-        glossaryTermUtils.processTermRelations(storeObject, glossaryTerm, GlossaryUtils.RelationshipOperation.CREATE);
+        AtlasGlossaryTerm storeGlossaryTerm = dataAccess.save(term);
+
+        glossaryTermUtils.processTermRelations(storeGlossaryTerm, term, GlossaryUtils.RelationshipOperation.CREATE);
 
         // Re-load term after handling relations
-        storeObject = dataAccess.load(glossaryTerm);
-        setInfoForRelations(storeObject);
+        storeGlossaryTerm = dataAccess.load(term);
+        setInfoForRelations(storeGlossaryTerm);
 
         if (DEBUG_ENABLED) {
-            LOG.debug("<== GlossaryService.create() : {}", storeObject);
+            LOG.debug("<== GlossaryService.create() : {}", storeGlossaryTerm);
         }
-        return storeObject;
+
+        return storeGlossaryTerm;
     }
 
     @GraphTransaction
-    public List<AtlasGlossaryTerm> createTerms(List<AtlasGlossaryTerm> glossaryTerm) throws AtlasBaseException {
+    public List<AtlasGlossaryTerm> createTerms(List<AtlasGlossaryTerm> glossaryTerms) throws AtlasBaseException {
         if (DEBUG_ENABLED) {
-            LOG.debug("==> GlossaryService.create({})", glossaryTerm);
+            LOG.debug("==> GlossaryService.create({})", glossaryTerms);
         }
 
-        if (Objects.isNull(glossaryTerm)) {
+        if (Objects.isNull(glossaryTerms)) {
             throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "glossaryTerm(s) is null/empty");
         }
 
 
         List<AtlasGlossaryTerm> ret = new ArrayList<>();
-        for (AtlasGlossaryTerm atlasGlossaryTerm : glossaryTerm) {
-            ret.add(createTerm(atlasGlossaryTerm));
+        for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) {
+            AtlasGlossaryTerm term = createTerm(glossaryTerm);
+
+            ret.add(term);
         }
 
         if (LOG.isDebugEnabled()) {
@@ -939,13 +966,64 @@
         return Objects.nonNull(vertex);
     }
 
+    private String getGlossaryQualifiedName(String glossaryGuid) {
+        String ret = glossaryGuidQualifiedNameCache.get(glossaryGuid);
+
+        if (StringUtils.isEmpty(ret)) {
+            AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(glossaryGuid);
+
+            if (vertex != null) {
+                ret = vertex.getProperty(GLOSSARY_QUALIFIED_NAME_PROPERTY, String.class);
+
+                glossaryGuidQualifiedNameCache.put(glossaryGuid, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    private String getGlossaryCategoryName(String glossaryCategoryGuid) {
+        String ret = categoryGuidNameCache.get(glossaryCategoryGuid);
+
+        if (StringUtils.isEmpty(ret)) {
+            AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(glossaryCategoryGuid);
+
+            if (vertex != null) {
+                ret = vertex.getProperty(GLOSSARY_CATEGORY_NAME_PROPERTY, String.class);
+
+                categoryGuidNameCache.put(glossaryCategoryGuid, ret);
+            }
+        }
+
+        return ret;
+    }
+
     private boolean termExists(AtlasGlossaryTerm term) {
         AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(atlasTypeRegistry.getEntityTypeByName(GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME), new HashMap<String, Object>() {{
             put(QUALIFIED_NAME_ATTR, term.getQualifiedName());
         }});
+
         return Objects.nonNull(vertex);
     }
 
+    private boolean termExists2(AtlasGlossaryTerm term) {
+        boolean             ret               = false;
+        AtlasVertex         glossaryVertex    = AtlasGraphUtilsV2.findByGuid(term.getAnchor().getGlossaryGuid());
+        Iterable<AtlasEdge> glossaryTermEdges = glossaryVertex.getEdges(AtlasEdgeDirection.OUT, GLOSSARY_TERM_ANCHOR_EDGE_LABEL);
+
+        for (Iterator<AtlasEdge> iter = glossaryTermEdges.iterator(); iter.hasNext(); ) {
+            AtlasVertex termVertex        = iter.next().getInVertex();
+            String      termQualifiedName = termVertex.getProperty(TERM_UNIQUE_QUALIFIED_NAME_PROPERTY, String.class);
+
+            if (StringUtils.equals(termQualifiedName, term.getQualifiedName())) {
+                ret = true;
+                break;
+            }
+        }
+
+        return ret;
+    }
+
     private boolean categoryExists(AtlasGlossaryCategory category) {
         AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(atlasTypeRegistry.getEntityTypeByName(GlossaryUtils.ATLAS_GLOSSARY_CATEGORY_TYPENAME), new HashMap<String, Object>() {{
             put(QUALIFIED_NAME_ATTR, category.getQualifiedName());
@@ -979,7 +1057,7 @@
 
     private void setInfoForRelations(final AtlasGlossary ret) throws AtlasBaseException {
         if (Objects.nonNull(ret.getTerms())) {
-            setInfoForTerms(ret.getTerms());
+            setDisplayNameForTerms(ret.getTerms());
         }
 
         if (Objects.nonNull(ret.getCategories())) {
@@ -987,13 +1065,12 @@
         }
     }
 
-    private void setInfoForRelations(final AtlasGlossaryTerm ret) throws AtlasBaseException {
-        if (Objects.nonNull(ret.getCategories())) {
-            setDisplayNameForTermCategories(ret.getCategories());
-        }
-        if (Objects.nonNull(ret.getRelatedTerms())) {
-            for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : ret.getRelatedTerms().entrySet()) {
-                setInfoForTerms(entry.getValue());
+    private void setInfoForRelations(final AtlasGlossaryTerm glossaryTerm) throws AtlasBaseException {
+        setDisplayNameForCategories(glossaryTerm.getCategories());
+
+        if (Objects.nonNull(glossaryTerm.getRelatedTerms())) {
+            for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : glossaryTerm.getRelatedTerms().entrySet()) {
+                setDisplayNameForTerms(entry.getValue());
             }
         }
     }
@@ -1003,18 +1080,23 @@
             setInfoForRelatedCategories(glossaryCategory.getChildrenCategories());
         }
         if (Objects.nonNull(glossaryCategory.getTerms())) {
-            setInfoForTerms(glossaryCategory.getTerms());
+            setDisplayNameForTerms(glossaryCategory.getTerms());
         }
     }
 
-    private void setDisplayNameForTermCategories(final Set<AtlasTermCategorizationHeader> categorizationHeaders) throws AtlasBaseException {
-        List<AtlasGlossaryCategory> categories = categorizationHeaders
-                                                         .stream()
-                                                         .map(id -> getAtlasGlossaryCategorySkeleton(id.getCategoryGuid()))
-                                                         .collect(Collectors.toList());
-        Map<String, AtlasGlossaryCategory> categoryMap = new HashMap<>();
-        dataAccess.load(categories).forEach(c -> categoryMap.put(c.getGuid(), c));
-        categorizationHeaders.forEach(c -> c.setDisplayText(categoryMap.get(c.getCategoryGuid()).getName()));
+    private void setDisplayNameForCategories(final Set<AtlasTermCategorizationHeader> categorizationHeaders) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(categorizationHeaders)) {
+            return;
+        }
+
+        for (AtlasTermCategorizationHeader termCategorizationHeader : categorizationHeaders) {
+            String categoryGuid = termCategorizationHeader.getCategoryGuid();
+            String categoryName = getGlossaryCategoryName(categoryGuid);
+
+            if (StringUtils.isNotEmpty(categoryName)) {
+                termCategorizationHeader.setDisplayText(categoryName);
+            }
+        }
     }
 
     private void setInfoForRelatedCategories(final Collection<AtlasRelatedCategoryHeader> categoryHeaders) throws AtlasBaseException {
@@ -1033,15 +1115,17 @@
         }
     }
 
-    private void setInfoForTerms(final Collection<AtlasRelatedTermHeader> termHeaders) throws AtlasBaseException {
-        List<AtlasGlossaryTerm> terms = termHeaders
-                                                .stream()
-                                                .map(id -> getAtlasGlossaryTermSkeleton(id.getTermGuid()))
-                                                .collect(Collectors.toList());
-        Map<String, AtlasGlossaryTerm> termMap = new HashMap<>();
-        dataAccess.load(terms).iterator().forEachRemaining(t -> termMap.put(t.getGuid(), t));
+    private void setDisplayNameForTerms(final Collection<AtlasRelatedTermHeader> termHeaders) throws AtlasBaseException {
 
-        termHeaders.forEach(t -> t.setDisplayText(getDisplayText(termMap.get(t.getTermGuid()))));
+        for (AtlasRelatedTermHeader termHeader : termHeaders) {
+            String      termGuid        = termHeader.getTermGuid();
+            AtlasVertex termVertex      = AtlasGraphUtilsV2.findByGuid(termGuid);
+            String      termDisplayText = termVertex.getProperty(GLOSSARY_TERM_NAME_PROPERTY, String.class);
+
+            if (StringUtils.isNotEmpty(termDisplayText)) {
+                termHeader.setDisplayText(termDisplayText);
+            }
+        }
     }
 
     public static boolean isNameInvalid(String name) {
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
index d92daee..8a161e4 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.glossary;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.bulkimport.BulkImportResponse;
 import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -37,6 +38,7 @@
 import org.apache.atlas.type.AtlasRelationshipType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.FileUtils;
+import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -161,58 +163,62 @@
         return StringUtils.equals(relatedObjectId.getRelationshipGuid(), storeObject.getRelationshipGuid());
     }
 
-    private void processTermAnchor(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
+    private void processTermAnchor(AtlasGlossaryTerm currentTerm, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
         if (Objects.isNull(updatedTerm.getAnchor()) && op != RelationshipOperation.DELETE) {
             throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ANCHOR);
         }
 
-        AtlasGlossaryHeader existingAnchor    = storeObject.getAnchor();
-        AtlasGlossaryHeader updatedTermAnchor = updatedTerm.getAnchor();
+        AtlasGlossaryHeader currentTermGlossary     = currentTerm.getAnchor(); // glossary_g1
+        AtlasGlossaryHeader updatedTermGlossary     = updatedTerm.getAnchor(); // glossary_g2
+        String              updatedTermGlossaryGuid = updatedTermGlossary.getGlossaryGuid();
+        String              currentTermGlossaryGuid = currentTermGlossary.getGlossaryGuid();
 
         switch (op) {
             case CREATE:
-                if (Objects.isNull(updatedTermAnchor.getGlossaryGuid())) {
+                if (Objects.isNull(updatedTermGlossaryGuid)) {
                     throw new AtlasBaseException(AtlasErrorCode.INVALID_NEW_ANCHOR_GUID);
                 } else {
                     if (DEBUG_ENABLED) {
-                        LOG.debug("Creating new term anchor, category = {}, glossary = {}", storeObject.getGuid(), updatedTerm.getAnchor().getGlossaryGuid());
+                        LOG.debug("Creating new term anchor, category = {}, glossary = {}", currentTerm.getGuid(), updatedTerm.getAnchor().getGlossaryGuid());
                     }
 
-                    createRelationship(defineTermAnchorRelation(updatedTermAnchor.getGlossaryGuid(), storeObject.getGuid()));
+                    if (!StringUtils.equals(updatedTermGlossaryGuid, currentTermGlossaryGuid)) {
+                        createRelationship(defineTermAnchorRelation(updatedTermGlossaryGuid, currentTerm.getGuid()));
+                    }
                 }
                 break;
             case UPDATE:
-                if (!Objects.equals(updatedTermAnchor, existingAnchor)) {
-                    if (Objects.isNull(updatedTermAnchor.getGlossaryGuid())) {
+                if (!Objects.equals(updatedTermGlossary, currentTermGlossary)) {
+                    if (Objects.isNull(updatedTermGlossaryGuid)) {
                         throw new AtlasBaseException(AtlasErrorCode.INVALID_NEW_ANCHOR_GUID);
                     }
 
                     if (DEBUG_ENABLED) {
                         LOG.debug("Updating term anchor, currAnchor = {}, newAnchor = {} and term = {}",
-                                  existingAnchor.getGlossaryGuid(),
-                                  updatedTermAnchor.getGlossaryGuid(),
-                                  storeObject.getName());
+                                currentTermGlossaryGuid,
+                                updatedTermGlossaryGuid,
+                                  currentTerm.getName());
                     }
-                    relationshipStore.deleteById(existingAnchor.getRelationGuid(), true);
+                    relationshipStore.deleteById(currentTermGlossary.getRelationGuid(), true);
 
                     // Derive the qualifiedName when anchor changes
-                    String        anchorGlossaryGuid = updatedTermAnchor.getGlossaryGuid();
+                    String        anchorGlossaryGuid = updatedTermGlossaryGuid;
                     AtlasGlossary glossary           = dataAccess.load(getGlossarySkeleton(anchorGlossaryGuid));
-                    storeObject.setQualifiedName(storeObject.getName() + "@" + glossary.getQualifiedName());
+                    currentTerm.setQualifiedName(currentTerm.getName() + "@" + glossary.getQualifiedName());
 
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Derived qualifiedName = {}", storeObject.getQualifiedName());
+                        LOG.debug("Derived qualifiedName = {}", currentTerm.getQualifiedName());
                     }
 
-                    createRelationship(defineTermAnchorRelation(updatedTermAnchor.getGlossaryGuid(), storeObject.getGuid()));
+                    createRelationship(defineTermAnchorRelation(updatedTermGlossaryGuid, currentTerm.getGuid()));
                 }
                 break;
             case DELETE:
-                if (Objects.nonNull(existingAnchor)) {
+                if (Objects.nonNull(currentTermGlossary)) {
                     if (DEBUG_ENABLED) {
                         LOG.debug("Deleting term anchor");
                     }
-                    relationshipStore.deleteById(existingAnchor.getRelationGuid(), true);
+                    relationshipStore.deleteById(currentTermGlossary.getRelationGuid(), true);
                 }
                 break;
         }
@@ -221,6 +227,7 @@
     private void processRelatedTerms(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
         Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> newRelatedTerms      = updatedTerm.getRelatedTerms();
         Map<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> existingRelatedTerms = storeObject.getRelatedTerms();
+
         switch (op) {
             case CREATE:
                 for (Map.Entry<AtlasGlossaryTerm.Relation, Set<AtlasRelatedTermHeader>> entry : newRelatedTerms.entrySet()) {
@@ -313,6 +320,7 @@
     private void processAssociatedCategories(AtlasGlossaryTerm storeObject, AtlasGlossaryTerm updatedTerm, RelationshipOperation op) throws AtlasBaseException {
         Map<String, AtlasTermCategorizationHeader> newCategories      = getAssociatedCategories(updatedTerm);
         Map<String, AtlasTermCategorizationHeader> existingCategories = getAssociatedCategories(storeObject);
+
         switch (op) {
             case CREATE:
                 if (Objects.nonNull(newCategories)) {