ATLAS-1684: export should include super-type definitions, import should preserve system attribute values
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 8dd3556..0439ada 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -233,6 +233,9 @@
}
AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId);
+ if(atlasVertex == null) {
+ return;
+ }
if (atlasVertex == null) {
LOG.warn("updateFullTextMapping(): no entity exists with guid {}", entityId);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index fa4c051..32b1ea8 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -170,10 +170,6 @@
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
- if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
- addClassifications(entity.getGuid(), entity.getClassifications());
- }
-
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
@@ -567,6 +563,11 @@
context.addCreated(guid, entity, entityType, vertex);
}
+
+ // during import, update the system attributes
+ if (entityStream instanceof EntityImportStream) {
+ entityGraphMapper.updateSystemAttributes(vertex, entity);
+ }
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 29bda93..9d11aa5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -51,6 +51,7 @@
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +113,28 @@
return ret;
}
+ public void updateSystemAttributes(AtlasVertex vertex, AtlasEntity entity) {
+ if (entity.getStatus() != null) {
+ AtlasGraphUtilsV1.setProperty(vertex, Constants.STATE_PROPERTY_KEY, entity.getStatus().name());
+ }
+
+ if (entity.getCreateTime() != null) {
+ AtlasGraphUtilsV1.setProperty(vertex, Constants.TIMESTAMP_PROPERTY_KEY, entity.getCreateTime().getTime());
+ }
+
+ if (entity.getUpdateTime() != null) {
+ AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, entity.getUpdateTime().getTime());
+ }
+
+ if (StringUtils.isNotEmpty(entity.getCreatedBy())) {
+ AtlasGraphUtilsV1.setProperty(vertex, Constants.CREATED_BY_KEY, entity.getCreatedBy());
+ }
+
+ if (StringUtils.isNotEmpty(entity.getUpdatedBy())) {
+ AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFIED_BY_KEY, entity.getUpdatedBy());
+ }
+ }
+
public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException {
EntityMutationResponse resp = new EntityMutationResponse();
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index 54faee0..ffdbfac 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -21,6 +21,7 @@
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
@@ -29,13 +30,22 @@
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
@@ -88,6 +98,32 @@
long endTime = System.currentTimeMillis();
+ AtlasTypesDef typesDef = context.result.getData().getTypesDef();
+
+ for (String entityType : context.entityTypes) {
+ AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(entityType);
+
+ typesDef.getEntityDefs().add(entityDef);
+ }
+
+ for (String classificationType : context.classificationTypes) {
+ AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationType);
+
+ typesDef.getClassificationDefs().add(classificationDef);
+ }
+
+ for (String structType : context.structTypes) {
+ AtlasStructDef structDef = typeRegistry.getStructDefByName(structType);
+
+ typesDef.getStructDefs().add(structDef);
+ }
+
+ for (String enumType : context.enumTypes) {
+ AtlasEnumDef enumDef = typeRegistry.getEnumDefByName(enumType);
+
+ typesDef.getEnumDefs().add(enumDef);
+ }
+
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setData(null);
@@ -221,16 +257,14 @@
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
addEntity(entityWithExtInfo, context);
- addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context);
- addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context);
+ addTypes(entityWithExtInfo.getEntity(), context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
if(entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
- addTypesAsNeeded(e.getTypeName(), context);
- addClassificationsAsNeeded(e, context);
+ addTypes(e, context);
getConntedEntitiesBasedOnOption(e, context, direction);
}
@@ -371,33 +405,114 @@
context.reportProgress();
}
- private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) {
- AtlasExportResult result = context.result;
- AtlasTypesDef typesDef = result.getData().getTypesDef();
+ private void addTypes(AtlasEntity entity, ExportContext context) {
+ addEntityType(entity.getTypeName(), context);
if(CollectionUtils.isNotEmpty(entity.getClassifications())) {
for (AtlasClassification c : entity.getClassifications()) {
- if (typesDef.hasClassificationDef(c.getTypeName())) {
- continue;
- }
-
- AtlasClassificationDef cd = typeRegistry.getClassificationDefByName(c.getTypeName());
-
- typesDef.getClassificationDefs().add(cd);
- result.incrementMeticsCounter("typedef:classification");
+ addClassificationType(c.getTypeName(), context);
}
}
}
- private void addTypesAsNeeded(String typeName, ExportContext context) {
- AtlasExportResult result = context.result;
- AtlasTypesDef typesDef = result.getData().getTypesDef();
+ private void addType(String typeName, ExportContext context) {
+ AtlasType type = null;
- if(!typesDef.hasEntityDef(typeName)) {
- AtlasEntityDef typeDefinition = typeRegistry.getEntityDefByName(typeName);
+ try {
+ type = typeRegistry.getType(typeName);
- typesDef.getEntityDefs().add(typeDefinition);
- result.incrementMeticsCounter("typedef:" + typeDefinition.getName());
+ addType(type, context);
+ } catch (AtlasBaseException excp) {
+ LOG.error("unknown type {}", typeName);
+ }
+ }
+
+ private void addEntityType(String typeName, ExportContext context) {
+ if (!context.entityTypes.contains(typeName)) {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+ addEntityType(entityType, context);
+ }
+ }
+
+ private void addClassificationType(String typeName, ExportContext context) {
+ if (!context.classificationTypes.contains(typeName)) {
+ AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName);
+
+ addClassificationType(classificationType, context);
+ }
+ }
+
+ private void addType(AtlasType type, ExportContext context) {
+ if (type.getTypeCategory() == TypeCategory.PRIMITIVE) {
+ return;
+ }
+
+ if (type instanceof AtlasArrayType) {
+ AtlasArrayType arrayType = (AtlasArrayType)type;
+
+ addType(arrayType.getElementType(), context);
+ } else if (type instanceof AtlasMapType) {
+ AtlasMapType mapType = (AtlasMapType)type;
+
+ addType(mapType.getKeyType(), context);
+ addType(mapType.getValueType(), context);
+ } else if (type instanceof AtlasEntityType) {
+ addEntityType((AtlasEntityType)type, context);
+ } else if (type instanceof AtlasClassificationType) {
+ addClassificationType((AtlasClassificationType)type, context);
+ } else if (type instanceof AtlasStructType) {
+ addStructType((AtlasStructType)type, context);
+ } else if (type instanceof AtlasEnumType) {
+ addEnumType((AtlasEnumType)type, context);
+ }
+ }
+
+ private void addEntityType(AtlasEntityType entityType, ExportContext context) {
+ if (!context.entityTypes.contains(entityType.getTypeName())) {
+ context.entityTypes.add(entityType.getTypeName());
+
+ addAttributeTypes(entityType, context);
+
+ if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) {
+ for (String superType : entityType.getAllSuperTypes()) {
+ addEntityType(superType, context);
+ }
+ }
+ }
+ }
+
+ private void addClassificationType(AtlasClassificationType classificationType, ExportContext context) {
+ if (!context.classificationTypes.contains(classificationType.getTypeName())) {
+ context.classificationTypes.add(classificationType.getTypeName());
+
+ addAttributeTypes(classificationType, context);
+
+ if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) {
+ for (String superType : classificationType.getAllSuperTypes()) {
+ addClassificationType(superType, context);
+ }
+ }
+ }
+ }
+
+ private void addStructType(AtlasStructType structType, ExportContext context) {
+ if (!context.structTypes.contains(structType.getTypeName())) {
+ context.structTypes.add(structType.getTypeName());
+
+ addAttributeTypes(structType, context);
+ }
+ }
+
+ private void addEnumType(AtlasEnumType enumType, ExportContext context) {
+ if (!context.enumTypes.contains(enumType.getTypeName())) {
+ context.enumTypes.add(enumType.getTypeName());
+ }
+ }
+
+ private void addAttributeTypes(AtlasStructType structType, ExportContext context) {
+ for (AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) {
+ addType(attributeDef.getTypeName(), context);
}
}
@@ -499,6 +614,10 @@
final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> guidsLineageToProcess = new UniqueList<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
+ final Set<String> entityTypes = new HashSet<>();
+ final Set<String> classificationTypes = new HashSet<>();
+ final Set<String> structTypes = new HashSet<>();
+ final Set<String> enumTypes = new HashSet<>();
final AtlasExportResult result;
final ZipSink sink;
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
index 857553d..eb81e3c 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java
@@ -120,13 +120,14 @@
private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException {
setGuidToEmpty(typeDefinitionMap);
+
AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry);
+
if (!typesToCreate.isEmpty()) {
typeDefStore.createTypesDef(typesToCreate);
- }
- typeDefStore.updateTypesDef(typeDefinitionMap);
- updateMetricsForTypesDef(typeDefinitionMap, result);
+ updateMetricsForTypesDef(typesToCreate, result);
+ }
}
private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) {