ATLAS-1665: export optimization to reduce file-size and export-time
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 4e3895d..0e277b1 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -196,6 +196,7 @@
}
sb.append("AtlasEntity{");
+ super.toString(sb);
sb.append("guid='").append(guid).append('\'');
sb.append(", status=").append(status);
sb.append(", createdBy='").append(createdBy).append('\'');
@@ -207,7 +208,6 @@
AtlasBaseTypeDef.dumpObjects(classifications, sb);
sb.append(']');
sb.append(", ");
- super.toString(sb);
sb.append('}');
return sb;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 6c88510..12e8bb1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -17,14 +17,6 @@
*/
package org.apache.atlas.repository.store.graph.v1;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
@@ -34,12 +26,26 @@
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
-import org.apache.atlas.type.*;
+import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 86fd6ad..fa4c051 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -159,13 +159,14 @@
int progressReportedAtCount = 0;
while (entityStream.hasNext()) {
- AtlasEntity entity = entityStream.next();
+ AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
+ AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if(entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
- AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity, entityStream);
+ AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
@@ -177,7 +178,7 @@
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
- if ((processedGuids.size() - progressReportedAtCount) > 10) {
+ if ((processedGuids.size() - progressReportedAtCount) > 1000) {
progressReportedAtCount = processedGuids.size();
LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
index 5d9a7d4..eb860ff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
@@ -24,9 +24,9 @@
import java.util.Iterator;
public class AtlasEntityStream implements EntityStream {
- private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
- private final EntityStream entityStream;
- private Iterator<AtlasEntity> iterator;
+ protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
+ protected final EntityStream entityStream;
+ private Iterator<AtlasEntity> iterator;
public AtlasEntityStream(AtlasEntity entity) {
@@ -49,6 +49,12 @@
this.entityStream = entityStream;
}
+ public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
+ this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo);
+ this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ this.entityStream = entityStream;
+ }
+
@Override
public boolean hasNext() {
return iterator.hasNext();
@@ -66,7 +72,7 @@
@Override
public AtlasEntity getByGuid(String guid) {
- return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
+ return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
index 8cb36ac..69140e6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -18,17 +18,29 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-
-import java.util.List;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
- public AtlasEntityStreamForImport(AtlasEntity entity) {
- super(entity);
+ public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
+ super(entityWithExtInfo, entityStream);
}
- public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) {
- super(entity, entityStream);
+ @Override
+ public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ AtlasEntity entity = next();
+
+ return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null;
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
+
+ if(ent == null && entityStream != null) {
+ return entityStream.getByGuid(guid);
+ }
+
+ return ent;
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
index 73994b9..0f711db 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
@@ -18,7 +18,11 @@
package org.apache.atlas.repository.store.graph.v1;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+
public interface EntityImportStream extends EntityStream {
+ AtlasEntityWithExtInfo getNextEntityWithExtInfo();
+
void onImportComplete(String guid);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
index 4c43921..3444bfd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java
@@ -18,7 +18,6 @@
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
public interface EntityStream {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
index 241f6d0..68d7f11 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java
@@ -19,9 +19,7 @@
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index 4743b73..d3413c2 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -38,11 +38,11 @@
case ENTITIES_FOR_TAG_METRIC:
return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.getProperty('__type.category').name() == 'TRAIT'}.'__type.name'.toSet()).groupCount{it.getProperty('__typeName')}.cap.toList()";
case EXPORT_BY_GUID_FULL:
- return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
+ return "g.V('__guid', startGuid).bothE().bothV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
case EXPORT_BY_GUID_CONNECTED_IN_EDGE:
- return "g.V('__guid', startGuid).inE().outV().has('__guid').__guid.dedup().toList()";
+ return "g.V('__guid', startGuid).inE().outV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
case EXPORT_BY_GUID_CONNECTED_OUT_EDGE:
- return "g.V('__guid', startGuid).outE().inV().has('__guid').__guid.dedup().toList()";
+ return "g.V('__guid', startGuid).outE().inV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()";
case EXPORT_TYPE_STARTS_WITH:
return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).startsWith(attrValue)}).has('__guid').__guid.toList()";
case EXPORT_TYPE_ENDS_WITH:
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index e123ff7..54faee0 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -25,6 +25,7 @@
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
@@ -55,14 +56,7 @@
import java.util.Map;
import java.util.Set;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_ATTR_MATCH_TYPE;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
@@ -119,18 +113,22 @@
}
try {
- List<AtlasEntity> entities = getStartingEntity(item, context);
+ List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context);
- for (AtlasEntity entity: entities) {
- processEntity(entity, context, TraversalDirection.UNKNOWN);
+ for (AtlasEntityWithExtInfo entityWithExtInfo : entities) {
+ processEntity(entityWithExtInfo.getEntity().getGuid(), context);
}
- while (!context.guidsToProcessIsEmpty()) {
- String guid = context.guidsToProcessRemove(0);
- TraversalDirection direction = context.guidDirection.get(guid);
- AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
+ while (!context.guidsToProcess.isEmpty()) {
+ while (!context.guidsToProcess.isEmpty()) {
+ String guid = context.guidsToProcess.remove(0);
+ processEntity(guid, context);
+ }
- processEntity(entity, context, direction);
+ if (!context.guidsLineageToProcess.isEmpty()) {
+ context.guidsToProcess.addAll(context.guidsLineageToProcess);
+ context.guidsLineageToProcess.clear();
+ }
}
} catch (AtlasBaseException excp) {
context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
@@ -143,11 +141,11 @@
}
}
- private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
- List<AtlasEntity> ret = new ArrayList<>();
+ private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
+ List<AtlasEntityWithExtInfo> ret = new ArrayList<>();
if (StringUtils.isNotEmpty(item.getGuid())) {
- AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
+ AtlasEntityWithExtInfo entity = entityGraphRetriever.toAtlasEntityWithExtInfo(item);
if (entity != null) {
ret = Collections.singletonList(entity);
@@ -188,17 +186,17 @@
context.bindings.put("attrName", attribute.getQualifiedName());
context.bindings.put("attrValue", attrValue);
- List<String> guids = executeGremlinQuery(queryTemplate, context);
+ List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
if (CollectionUtils.isNotEmpty(guids)) {
for (String guid : guids) {
- AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
+ AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
- if (entity == null) {
+ if (entityWithExtInfo == null) {
continue;
}
- ret.add(entity);
+ ret.add(entityWithExtInfo);
}
}
@@ -211,24 +209,37 @@
return ret;
}
- private void processEntity(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
+ private void processEntity(String guid, ExportContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
+ LOG.debug("==> processEntity({})", guid);
}
- if (!context.guidsProcessed.contains(entity.getGuid())) {
- context.guidsProcessed.add(entity.getGuid());
- context.result.getData().getEntityCreationOrder().add(entity.getGuid());
+ if (!context.guidsProcessed.contains(guid)) {
+ TraversalDirection direction = context.guidDirection.get(guid);
+ AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
- addTypesAsNeeded(entity.getTypeName(), context);
- addClassificationsAsNeeded(entity, context);
- addEntity(entity, context);
+ context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
- getConntedEntitiesBasedOnOption(entity, context, direction);
+ addEntity(entityWithExtInfo, context);
+ addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context);
+ addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context);
+
+ context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
+ getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+
+ if(entityWithExtInfo.getReferredEntities() != null) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ addTypesAsNeeded(e.getTypeName(), context);
+ addClassificationsAsNeeded(e, context);
+ getConntedEntitiesBasedOnOption(e, context, direction);
+ }
+
+ context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
+ }
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
+ LOG.debug("<== processEntity({})", guid);
}
}
@@ -245,7 +256,7 @@
}
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
- if (direction == TraversalDirection.UNKNOWN) {
+ if (direction == null || direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else {
if (isProcessEntity(entity)) {
@@ -272,41 +283,35 @@
String query = getQueryForTraversalDirection(direction);
if (LOG.isDebugEnabled()) {
- LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query);
+ LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
}
context.bindings.clear();
context.bindings.put("startGuid", entity.getGuid());
- List<String> guids = executeGremlinQuery(query, context);
+ List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
- if (CollectionUtils.isEmpty(guids)) {
+ if (CollectionUtils.isEmpty(result)) {
continue;
}
- for (String guid : guids) {
+ for (HashMap<String, Object> hashMap : result) {
+ String guid = (String) hashMap.get("__guid");
TraversalDirection currentDirection = context.guidDirection.get(guid);
+ boolean isLineage = (boolean) hashMap.get("isProcess");
if (currentDirection == null) {
- context.guidDirection.put(guid, direction);
+ context.addToBeProcessed(isLineage, guid, direction);
- if (!context.guidsToProcessContains(guid)) {
- context.guidsToProcessAdd(guid);
- }
} else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
- context.guidDirection.put(guid, direction);
-
// the entity should be reprocessed to get inward entities
context.guidsProcessed.remove(guid);
-
- if (!context.guidsToProcessContains(guid)) {
- context.guidsToProcessAdd(guid);
- }
+ context.addToBeProcessed(isLineage, guid, direction);
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize());
+ LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
}
}
}
@@ -324,7 +329,7 @@
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize());
+ LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
@@ -332,36 +337,38 @@
context.bindings.clear();
context.bindings.put("startGuid", entity.getGuid());
- List<String> result = executeGremlinQuery(query, context);
+ List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
- if (result == null) {
+ if (CollectionUtils.isEmpty(result)) {
return;
}
- for (String guid : result) {
- if (!context.guidsProcessed.contains(guid)) {
- if (!context.guidsToProcessContains(guid)) {
- context.guidsToProcessAdd(guid);
- }
+ for (HashMap<String, Object> hashMap : result) {
+ String guid = (String) hashMap.get("__guid");
+ boolean isLineage = (boolean) hashMap.get("isProcess");
- context.guidDirection.put(guid, TraversalDirection.BOTH);
+ if (!context.guidsProcessed.contains(guid)) {
+ context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize());
+ LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
}
}
- private void addEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException {
+ private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException {
context.sink.add(entity);
- context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName()));
- context.result.incrementMeticsCounter("entities");
-
- if (context.guidsProcessed.size() % 10 == 0) {
- LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size());
+ context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
+ if(entity.getReferredEntities() != null) {
+ for (AtlasEntity e: entity.getReferredEntities().values()) {
+ context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
+ }
}
+
+ context.result.incrementMeticsCounter("entity:withExtInfo");
+ context.reportProgress();
}
private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) {
@@ -394,7 +401,16 @@
}
}
- private List<String> executeGremlinQuery(String query, ExportContext context) {
+ private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) {
+ try {
+ return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
+ } catch (ScriptException e) {
+ LOG.error("Script execution failed for query: ", query, e);
+ return null;
+ }
+ }
+
+ private List<String> executeGremlinQueryForGuids(String query, ExportContext context) {
try {
return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
} catch (ScriptException e) {
@@ -403,7 +419,6 @@
}
}
-
private enum TraversalDirection {
UNKNOWN,
INWARD,
@@ -432,11 +447,57 @@
}
}
+ private class UniqueList<T> {
+ private final List<T> list = new ArrayList<>();
+ private final Set<T> set = new HashSet<>();
+
+ public void add(T e) {
+ if(set.contains(e)) {
+ return;
+ }
+
+ list.add(e);
+ set.add(e);
+ }
+
+ public void addAll(UniqueList<T> uniqueList) {
+ for (T item : uniqueList.list) {
+ if(set.contains(item)) continue;
+
+ set.add(item);
+ list.add(item);
+ }
+ }
+
+ public T remove(int index) {
+ T e = list.remove(index);
+ set.remove(e);
+ return e;
+ }
+
+ public boolean contains(T e) {
+ return set.contains(e);
+ }
+
+ public int size() {
+ return list.size();
+ }
+
+ public boolean isEmpty() {
+ return list.isEmpty();
+ }
+
+ public void clear() {
+ list.clear();
+ set.clear();
+ }
+ }
+
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>();
- private final List<String> guidsToProcessList = new ArrayList<>();
- private final Set<String> guidsToProcessSet = new HashSet<>();
+ final UniqueList<String> guidsToProcess = new UniqueList<>();
+ final UniqueList<String> guidsLineageToProcess = new UniqueList<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final AtlasExportResult result;
final ZipSink sink;
@@ -446,6 +507,8 @@
private final ExportFetchType fetchType;
private final String matchType;
+ private int progressReportCount = 0;
+
ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
this.sink = sink;
@@ -481,33 +544,30 @@
}
public void clear() {
- guidsToProcessList.clear();
- guidsToProcessSet.clear();
+ guidsToProcess.clear();
guidsProcessed.clear();
guidDirection.clear();
}
- public boolean guidsToProcessIsEmpty() {
- return this.guidsToProcessList.isEmpty();
+ public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) {
+ if(!isSuperTypeProcess) {
+ guidsToProcess.add(guid);
+ }
+
+ if(isSuperTypeProcess) {
+ guidsLineageToProcess.add(guid);
+ }
+
+ guidDirection.put(guid, direction);
}
- public String guidsToProcessRemove(int i) {
- String s = this.guidsToProcessList.remove(i);
- guidsToProcessSet.remove(s);
- return s;
- }
+ public void reportProgress() {
- public int guidsToProcessSize() {
- return this.guidsToProcessList.size();
- }
+ if ((guidsProcessed.size() - progressReportCount) > 1000) {
+ progressReportCount = guidsProcessed.size();
- public boolean guidsToProcessContains(String guid) {
- return guidsToProcessSet.contains(guid);
- }
-
- public void guidsToProcessAdd(String guid) {
- this.guidsToProcessList.add(guid);
- guidsToProcessSet.add(guid);
+ LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size());
+ }
}
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
index 37d9eb5..c197d41 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java
@@ -45,6 +45,11 @@
saveToZip(entity.getGuid(), jsonData);
}
+ public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
+ String jsonData = convertToJSON(entityWithExtInfo);
+ saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
+ }
+
public void setResult(AtlasExportResult result) throws AtlasBaseException {
String jsonData = convertToJSON(result);
saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData);
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
index a69f7fa..661542f 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
@@ -17,17 +17,19 @@
*/
package org.apache.atlas.web.resources;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -57,7 +59,7 @@
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
- String s = getFromCache(fileName);
+ String s = (String) getFromCache(fileName);
return convertFromJson(AtlasTypesDef.class, s);
}
@@ -104,9 +106,10 @@
return this.creationOrder;
}
- public AtlasEntity getEntity(String guid) throws AtlasBaseException {
- String s = getFromCache(guid);
- return convertFromJson(AtlasEntity.class, s);
+ public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+ String s = (String) getFromCache(guid);
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s);
+ return entityWithExtInfo;
}
private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException {
@@ -136,9 +139,7 @@
}
private String getFromCache(String entryName) {
- if(!guidEntityJsonMap.containsKey(entryName)) return "";
-
- return guidEntityJsonMap.get(entryName).toString();
+ return guidEntityJsonMap.get(entryName);
}
public void close() {
@@ -158,8 +159,15 @@
@Override
public AtlasEntity next() {
+ AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+ return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ }
+
+ @Override
+ public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
- return getEntity(this.iterator.next());
+ return getEntityWithExtInfo(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
@@ -186,10 +194,16 @@
}
}
+ private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+ if(guidEntityJsonMap.containsKey(guid)) {
+ return getEntityWithExtInfo(guid).getEntity();
+ }
+
+ return null;
+ }
+
@Override
public void onImportComplete(String guid) {
- if(guid != null) {
- guidEntityJsonMap.remove(guid);
- }
+ guidEntityJsonMap.remove(guid);
}
}