ATLAS-3566: improvements in upgrade patches, to avoid full-scan
Signed-off-by: Sarath Subramanian <sarath@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
index 2af50ba..8351942 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
@@ -18,11 +18,22 @@
package org.apache.atlas.repository.patches;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
public class ClassificationTextPatch extends AtlasPatchHandler {
@@ -57,16 +68,54 @@
}
@Override
- protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
- processItem(vertexId, vertex, typeName, entityType);
- }
-
- @Override
protected void prepareForExecution() {
//do nothing
}
- protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+ @Override
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ AtlasTypeRegistry typeRegistry = getTypeRegistry();
+ AtlasGraph graph = getGraph();
+ Set<Long> vertexIds = new HashSet<>();
+
+ for (AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) {
+ LOG.info("finding classification of type {}", classificationType.getTypeName());
+
+ Iterable<AtlasVertex> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, classificationType.getTypeName()).vertices();
+ int count = 0;
+
+ for (Iterator<AtlasVertex> iter = iterable.iterator(); iter.hasNext(); ) {
+ AtlasVertex classificationVertex = iter.next();
+ Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN);
+
+ for (AtlasEdge edge : edges) {
+ AtlasVertex entityVertex = edge.getOutVertex();
+ Long vertexId = (Long) entityVertex.getId();
+
+ if (vertexIds.contains(vertexId)) {
+ continue;
+ }
+
+ vertexIds.add(vertexId);
+
+ manager.checkProduce(vertexId);
+ }
+
+ count++;
+ }
+
+ LOG.info("found {} classification of type {}", count, classificationType.getTypeName());
+ }
+
+ LOG.info("found {} entities with classifications", vertexIds.size());
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+ processItem(vertexId, vertex, typeName, entityType);
+ }
+
+ private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
if(LOG.isDebugEnabled()) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
index 3eedb98..5a9ac2a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -18,7 +18,6 @@
package org.apache.atlas.repository.patches;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
@@ -30,23 +29,57 @@
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public abstract class ConcurrentPatchProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
- private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
- private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
- private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
- private static final String WORKER_NAME_PREFIX = "patchWorkItem";
- private static final int NUM_WORKERS;
- private static final int BATCH_SIZE;
- private final EntityGraphMapper entityGraphMapper;
+ private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
+ private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
+ private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
+ private static final String WORKER_NAME_PREFIX = "patchWorkItem";
+ private static final int NUM_WORKERS;
+ private static final int BATCH_SIZE;
+
+ private final EntityGraphMapper entityGraphMapper;
+ private final AtlasGraph graph;
+ private final GraphBackedSearchIndexer indexer;
+ private final AtlasTypeRegistry typeRegistry;
+
+ static {
+ int numWorkers = 3;
+ int batchSize = 300;
+
+ try {
+ Configuration config = ApplicationProperties.get();
+
+ numWorkers = config.getInt(NUM_WORKERS_PROPERTY, config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
+ batchSize = config.getInt(BATCH_SIZE_PROPERTY, 300);
+
+ LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+ } catch (Exception e) {
+ LOG.error("Error retrieving configuration.", e);
+ }
+
+ NUM_WORKERS = numWorkers;
+ BATCH_SIZE = batchSize;
+ }
+
+ public ConcurrentPatchProcessor(PatchContext context) {
+ this.graph = context.getGraph();
+ this.indexer = context.getIndexer();
+ this.typeRegistry = context.getTypeRegistry();
+ this.entityGraphMapper = context.getEntityGraphMapper();
+ }
+
+ public EntityGraphMapper getEntityGraphMapper() {
+ return entityGraphMapper;
+ }
public AtlasGraph getGraph() {
return graph;
@@ -60,56 +93,21 @@
return typeRegistry;
}
- private final AtlasGraph graph;
- private final GraphBackedSearchIndexer indexer;
- private final AtlasTypeRegistry typeRegistry;
-
- static {
- int numWorkers = 3;
- int batchSize = 300;
-
- try {
- numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
- batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
-
- LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
- } catch (Exception e) {
- LOG.error("Error retrieving configuration.", e);
- }
-
- NUM_WORKERS = numWorkers;
- BATCH_SIZE = batchSize;
- }
-
- private static int getDefaultNumWorkers() throws AtlasException {
- return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
- }
-
- public ConcurrentPatchProcessor(PatchContext context) {
- this.graph = context.getGraph();
- this.indexer = context.getIndexer();
- this.typeRegistry = context.getTypeRegistry();
- this.entityGraphMapper = context.getEntityGraphMapper();
- }
-
- public EntityGraphMapper getEntityGraphMapper() {
- return entityGraphMapper;
- }
public void apply() throws AtlasBaseException {
prepareForExecution();
execute();
}
+ protected abstract void prepareForExecution() throws AtlasBaseException;
+ protected abstract void submitVerticesToUpdate(WorkItemManager manager);
+ protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
+
private void execute() {
- Iterable<Object> iterable = graph.query().vertexIds();
- WorkItemManager manager = new WorkItemManager(
- new ConsumerBuilder(graph, typeRegistry, this), WORKER_NAME_PREFIX,
- BATCH_SIZE, NUM_WORKERS, false);
+ WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry, this),
+ WORKER_NAME_PREFIX, BATCH_SIZE, NUM_WORKERS, false);
+
try {
- for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
- Object vertexId = iter.next();
- submitForProcessing((Long) vertexId, manager);
- }
+ submitVerticesToUpdate(manager);
manager.drain();
} finally {
@@ -121,10 +119,6 @@
}
}
- private void submitForProcessing(Long vertexId, WorkItemManager manager) {
- manager.checkProduce(vertexId);
- }
-
private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph;
@@ -228,7 +222,4 @@
}
}
}
-
- protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
- protected abstract void prepareForExecution() throws AtlasBaseException;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index bd5e32b..2b58119 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -17,25 +17,24 @@
*/
package org.apache.atlas.repository.patches;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Iterator;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
@@ -66,45 +65,45 @@
}
public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor {
- private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers";
- private static final String BATCH_SIZE_PROPERTY = "atlas.patch.unique_attribute_patch.batchSize";
- private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
- private static final int NUM_WORKERS;
- private static final int BATCH_SIZE;
-
- static {
- int numWorkers = 3;
- int batchSize = 300;
-
- try {
- numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
- batchSize = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
-
- LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
- } catch (Exception e) {
- LOG.error("Error retrieving configuration.", e);
- }
-
- NUM_WORKERS = numWorkers;
- BATCH_SIZE = batchSize;
- }
-
public UniqueAttributePatchProcessor(PatchContext context) {
super(context);
}
@Override
- protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
- //process the vertex
- processItem(vertexId, vertex, typeName, entityType);
- }
-
- @Override
protected void prepareForExecution() {
//create the new attribute for all unique attributes.
createIndexForUniqueAttributes();
}
+ @Override
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ AtlasTypeRegistry typeRegistry = getTypeRegistry();
+ AtlasGraph graph = getGraph();
+
+ for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
+ LOG.info("finding entities of type {}", entityType.getTypeName());
+
+ Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
+ int count = 0;
+
+ for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+ Object vertexId = iter.next();
+
+ manager.checkProduce((Long) vertexId);
+
+ count++;
+ }
+
+ LOG.info("found {} entities of type {}", count, entityType.getTypeName());
+ }
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+ //process the vertex
+ processItem(vertexId, vertex, typeName, entityType);
+ }
+
private void createIndexForUniqueAttributes() {
for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) {
@@ -157,10 +156,6 @@
}
}
- private static int getDefaultNumWorkers() throws AtlasException {
- return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
- }
-
protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);