ATLAS-3566: improvements in upgrade patches, to avoid full-scan

Signed-off-by: Sarath Subramanian <sarath@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
index 2af50ba..8351942 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
@@ -18,11 +18,22 @@
 package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 
 public class ClassificationTextPatch extends AtlasPatchHandler {
@@ -57,16 +68,54 @@
         }
 
         @Override
-        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
-            processItem(vertexId, vertex, typeName, entityType);
-        }
-
-        @Override
         protected void prepareForExecution() {
             //do nothing
         }
 
-        protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+        @Override
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            AtlasTypeRegistry typeRegistry = getTypeRegistry();
+            AtlasGraph        graph        = getGraph();
+            Set<Long>         vertexIds    = new HashSet<>();
+
+            for (AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) {
+                LOG.info("finding classification of type {}", classificationType.getTypeName());
+
+                Iterable<AtlasVertex> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, classificationType.getTypeName()).vertices();
+                int                   count    = 0;
+
+                for (Iterator<AtlasVertex> iter = iterable.iterator(); iter.hasNext(); ) {
+                    AtlasVertex         classificationVertex = iter.next();
+                    Iterable<AtlasEdge> edges                = classificationVertex.getEdges(AtlasEdgeDirection.IN);
+
+                    for (AtlasEdge edge : edges) {
+                        AtlasVertex entityVertex = edge.getOutVertex();
+                        Long        vertexId     = (Long) entityVertex.getId();
+
+                        if (vertexIds.contains(vertexId)) {
+                            continue;
+                        }
+
+                        vertexIds.add(vertexId);
+
+                        manager.checkProduce(vertexId);
+                    }
+
+                    count++;
+                }
+
+                LOG.info("found {} classification of type {}", count, classificationType.getTypeName());
+            }
+
+            LOG.info("found {} entities with classifications", vertexIds.size());
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+            processItem(vertexId, vertex, typeName, entityType);
+        }
+
+        private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
             }
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
index 3eedb98..5a9ac2a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.pc.WorkItemBuilder;
@@ -30,23 +29,57 @@
 import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class ConcurrentPatchProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
 
-    private static final String     NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
-    private static final String     BATCH_SIZE_PROPERTY  = "atlas.patch.batchSize";
-    private static final String     ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
-    private static final String     WORKER_NAME_PREFIX   = "patchWorkItem";
-    private static final int        NUM_WORKERS;
-    private static final int        BATCH_SIZE;
-    private final EntityGraphMapper entityGraphMapper;
+    private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
+    private static final String BATCH_SIZE_PROPERTY  = "atlas.patch.batchSize";
+    private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
+    private static final String WORKER_NAME_PREFIX   = "patchWorkItem";
+    private static final int    NUM_WORKERS;
+    private static final int    BATCH_SIZE;
+
+    private final EntityGraphMapper        entityGraphMapper;
+    private final AtlasGraph               graph;
+    private final GraphBackedSearchIndexer indexer;
+    private final AtlasTypeRegistry        typeRegistry;
+
+    static {
+        int numWorkers = 3;
+        int batchSize  = 300;
+
+        try {
+            Configuration config = ApplicationProperties.get();
+
+            numWorkers = config.getInt(NUM_WORKERS_PROPERTY, config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
+            batchSize  = config.getInt(BATCH_SIZE_PROPERTY, 300);
+
+            LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+        } catch (Exception e) {
+            LOG.error("Error retrieving configuration.", e);
+        }
+
+        NUM_WORKERS = numWorkers;
+        BATCH_SIZE  = batchSize;
+    }
+
+    public ConcurrentPatchProcessor(PatchContext context) {
+        this.graph             = context.getGraph();
+        this.indexer           = context.getIndexer();
+        this.typeRegistry      = context.getTypeRegistry();
+        this.entityGraphMapper = context.getEntityGraphMapper();
+    }
+
+    public EntityGraphMapper getEntityGraphMapper() {
+        return entityGraphMapper;
+    }
 
     public AtlasGraph getGraph() {
         return graph;
@@ -60,56 +93,21 @@
         return typeRegistry;
     }
 
-    private final AtlasGraph graph;
-    private final GraphBackedSearchIndexer indexer;
-    private final AtlasTypeRegistry typeRegistry;
-
-    static {
-        int numWorkers = 3;
-        int batchSize  = 300;
-
-        try {
-            numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
-            batchSize  = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
-
-            LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
-        } catch (Exception e) {
-            LOG.error("Error retrieving configuration.", e);
-        }
-
-        NUM_WORKERS = numWorkers;
-        BATCH_SIZE  = batchSize;
-    }
-
-    private static int getDefaultNumWorkers() throws AtlasException {
-        return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
-    }
-
-    public ConcurrentPatchProcessor(PatchContext context) {
-        this.graph             = context.getGraph();
-        this.indexer           = context.getIndexer();
-        this.typeRegistry      = context.getTypeRegistry();
-        this.entityGraphMapper = context.getEntityGraphMapper();
-    }
-
-    public EntityGraphMapper getEntityGraphMapper() {
-        return entityGraphMapper;
-    }
     public void apply() throws AtlasBaseException {
         prepareForExecution();
         execute();
     }
 
+    protected abstract void prepareForExecution() throws AtlasBaseException;
+    protected abstract void submitVerticesToUpdate(WorkItemManager manager);
+    protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
+
     private void execute() {
-        Iterable<Object> iterable = graph.query().vertexIds();
-        WorkItemManager manager = new WorkItemManager(
-                new ConsumerBuilder(graph, typeRegistry, this), WORKER_NAME_PREFIX,
-                BATCH_SIZE, NUM_WORKERS, false);
+        WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry, this),
+                                                      WORKER_NAME_PREFIX, BATCH_SIZE, NUM_WORKERS, false);
+
         try {
-            for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
-                Object vertexId = iter.next();
-                submitForProcessing((Long) vertexId, manager);
-            }
+            submitVerticesToUpdate(manager);
 
             manager.drain();
         } finally {
@@ -121,10 +119,6 @@
         }
     }
 
-    private void submitForProcessing(Long vertexId, WorkItemManager manager) {
-        manager.checkProduce(vertexId);
-    }
-
     private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
         private final AtlasTypeRegistry typeRegistry;
         private final AtlasGraph graph;
@@ -228,7 +222,4 @@
             }
         }
     }
-
-    protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
-    protected abstract void prepareForExecution() throws AtlasBaseException;
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index bd5e32b..2b58119 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -17,25 +17,24 @@
  */
 package org.apache.atlas.repository.patches;
 
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.IndexException;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.*;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Iterator;
 
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
@@ -66,45 +65,45 @@
     }
 
     public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor {
-        private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers";
-        private static final String BATCH_SIZE_PROPERTY  = "atlas.patch.unique_attribute_patch.batchSize";
-        private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
-        private static final int    NUM_WORKERS;
-        private static final int    BATCH_SIZE;
-
-        static {
-            int numWorkers = 3;
-            int batchSize  = 300;
-
-            try {
-                numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
-                batchSize  = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
-
-                LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
-            } catch (Exception e) {
-                LOG.error("Error retrieving configuration.", e);
-            }
-
-            NUM_WORKERS = numWorkers;
-            BATCH_SIZE  = batchSize;
-        }
-
         public UniqueAttributePatchProcessor(PatchContext context) {
             super(context);
         }
 
         @Override
-        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
-            //process the vertex
-            processItem(vertexId, vertex, typeName, entityType);
-        }
-
-        @Override
         protected void prepareForExecution() {
             //create the new attribute for all unique attributes.
             createIndexForUniqueAttributes();
         }
 
+        @Override
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            AtlasTypeRegistry typeRegistry = getTypeRegistry();
+            AtlasGraph        graph        = getGraph();
+
+            for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
+                LOG.info("finding entities of type {}", entityType.getTypeName());
+
+                Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
+                int              count    = 0;
+
+                for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+                    Object vertexId = iter.next();
+
+                    manager.checkProduce((Long) vertexId);
+
+                    count++;
+                }
+
+                LOG.info("found {} entities of type {}", count, entityType.getTypeName());
+            }
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+            //process the vertex
+            processItem(vertexId, vertex, typeName, entityType);
+        }
+
         private void createIndexForUniqueAttributes() {
             for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) {
 
@@ -157,10 +156,6 @@
             }
         }
 
-        private static int getDefaultNumWorkers() throws AtlasException {
-            return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
-        }
-
         protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
             LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);