ATLAS-1948: export fix to correct the import order
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 8f45e9f..de48573 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -98,7 +98,6 @@
AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
processTypesDef(context);
-
updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime));
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
@@ -113,6 +112,7 @@
}
private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[] statuses, int duration) throws AtlasBaseException {
+ context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef());
clearContextData(context);
@@ -201,9 +201,10 @@
processEntity(guid, context);
}
- if (!context.guidsLineageToProcess.isEmpty()) {
- context.guidsToProcess.addAll(context.guidsLineageToProcess);
- context.guidsLineageToProcess.clear();
+ if (!context.lineageToProcess.isEmpty()) {
+ context.guidsToProcess.addAll(context.lineageToProcess);
+ context.lineageProcessed.addAll(context.lineageToProcess.getList());
+ context.lineageToProcess.clear();
}
}
} catch (AtlasBaseException excp) {
@@ -295,7 +296,9 @@
TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
- context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
+ if(!context.lineageProcessed.contains(guid)) {
+ context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
+ }
addEntity(entityWithExtInfo, context);
addTypes(entityWithExtInfo.getEntity(), context);
@@ -651,13 +654,18 @@
list.clear();
set.clear();
}
+
+ public List<T> getList() {
+ return list;
+ }
}
private class ExportContext {
final Set<String> guidsProcessed = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>();
- final UniqueList<String> guidsLineageToProcess = new UniqueList<>();
+ final UniqueList<String> lineageToProcess = new UniqueList<>();
+ final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
final Set<String> entityTypes = new HashSet<>();
final Set<String> classificationTypes = new HashSet<>();
@@ -719,7 +727,7 @@
}
if(isSuperTypeProcess) {
- guidsLineageToProcess.add(guid);
+ lineageToProcess.add(guid);
}
guidDirection.put(guid, direction);
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index edb816f..4c23582 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -196,7 +196,7 @@
AtlasEntity entity = getEntity(guid);
return entity;
} catch (AtlasBaseException e) {
- e.printStackTrace();
+ LOG.error("getByGuid: {} failed!", guid, e);
return null;
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 5ea4ff2..f340330 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -69,7 +69,7 @@
private final DeleteHandlerV1 deleteHandler;
private final AtlasTypeRegistry typeRegistry;
private final AtlasEntityChangeNotifier entityChangeNotifier;
- private final EntityGraphMapper entityGraphMapper;
+ private final EntityGraphMapper entityGraphMapper;
@Inject
public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry,
@@ -77,7 +77,7 @@
this.deleteHandler = deleteHandler;
this.typeRegistry = typeRegistry;
this.entityChangeNotifier = entityChangeNotifier;
- this.entityGraphMapper = entityGraphMapper;
+ this.entityGraphMapper = entityGraphMapper;
}
@Override
@@ -123,7 +123,7 @@
@Override
@GraphTransaction
public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
- throws AtlasBaseException {
+ throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
}
@@ -136,7 +136,7 @@
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
- uniqAttributes.toString());
+ uniqAttributes.toString());
}
if (LOG.isDebugEnabled()) {
@@ -160,29 +160,36 @@
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<String, String>());
- Set<String> processedGuids = new HashSet<>();
- int streamSize = entityStream.size();
- float currentPercent = 0f;
+ Set<String> processedGuids = new HashSet<>();
+ float currentPercent = 0f;
- while (entityStream.hasNext()) {
- AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
+ List<String> residualList = new ArrayList<>();
+ EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
+ while (entityImportStreamWithResidualList.hasNext()) {
+ AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
- if(entity == null || processedGuids.contains(entity.getGuid())) {
+ if (entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
+ try {
+ EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
- EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
- currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
- entityStream.getPosition(), streamSize, currentPercent);
+ if (resp.getGuidAssignments() != null) {
+ ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+ }
- if (resp.getGuidAssignments() != null) {
- ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+ currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(),
+ entityImportStreamWithResidualList.getStreamSize(), currentPercent);
+
+ entityStream.onImportComplete(entity.getGuid());
+ } catch (AtlasBaseException e) {
+ if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
+ throw e;
+ }
}
-
- entityStream.onImportComplete(entity.getGuid());
}
importResult.getProcessedEntities().addAll(processedGuids);
@@ -191,20 +198,28 @@
return ret;
}
+ private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
+ if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
+ return false;
+ }
+
+ lineageList.add(guid);
+ return true;
+ }
+
private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
-
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
- currentEntity.getEntity().getTypeName(),
- currentIndex,
- currentEntity.getEntity().getGuid());
+ currentEntity.getEntity().getTypeName(),
+ currentIndex,
+ currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
}
@@ -214,10 +229,10 @@
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
- float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize);
+ float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
- ((updateLog) ? ++currentPercent : currentPercent);
+ ((updateLog) ? ++currentPercent : currentPercent);
if (updateLog) {
log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
@@ -232,7 +247,7 @@
}
for (AtlasEntityHeader h : list) {
- if(processedGuids.contains(h.getGuid())) {
+ if (processedGuids.contains(h.getGuid())) {
continue;
}
@@ -298,7 +313,7 @@
@Override
@GraphTransaction
public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue)
- throws AtlasBaseException {
+ throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue);
}
@@ -490,8 +505,8 @@
List<AtlasClassification> updatedClassifications = new ArrayList<>();
for (AtlasClassification newClassification : newClassifications) {
- String classificationName = newClassification.getTypeName();
- AtlasClassification oldClassification = getClassification(guid, classificationName);
+ String classificationName = newClassification.getTypeName();
+ AtlasClassification oldClassification = getClassification(guid, classificationName);
if (oldClassification == null) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
@@ -704,7 +719,8 @@
/**
* Validate if classification is not already associated with the entities
- * @param guid unique entity id
+ *
+ * @param guid unique entity id
* @param classifications list of classifications to be associated
*/
private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
@@ -715,7 +731,7 @@
if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid +
- ", already associated with classification: " + newClassification);
+ ", already associated with classification: " + newClassification);
}
}
}
@@ -734,4 +750,43 @@
return ret;
}
+
+ private static class EntityImportStreamWithResidualList {
+ private final EntityImportStream stream;
+ private final List<String> residualList;
+ private boolean navigateResidualList;
+ private int currentResidualListIndex;
+
+
+ public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
+ this.stream = stream;
+ this.residualList = residualList;
+ this.navigateResidualList = false;
+ this.currentResidualListIndex = 0;
+ }
+
+ public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ if (navigateResidualList == false) {
+ return stream.getNextEntityWithExtInfo();
+ } else {
+ stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
+ return stream.getNextEntityWithExtInfo();
+ }
+ }
+
+ public boolean hasNext() {
+ if (!navigateResidualList) {
+ boolean streamHasNext = stream.hasNext();
+ navigateResidualList = (streamHasNext == false);
+ return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
+ } else {
+ return (currentResidualListIndex < residualList.size());
+ }
+ }
+
+ public int getStreamSize() {
+ return stream.size() + residualList.size();
+ }
+ }
+
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index de8e7ef..404225c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -133,4 +133,22 @@
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
}
+
+ @DataProvider(name = "ctas")
+ public static Object[][] getDataFromCtas(ITestContext context) throws IOException {
+ return getZipSource("ctas.zip");
+ }
+
+ @Test(dataProvider = "ctas")
+ public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException {
+ loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry);
+ loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry);
+
+ AtlasImportRequest request = getDefaultImportRequest();
+ runImportWithParameters(getImportService(), getDefaultImportRequest(), zipSource);
+ }
+
+ private ImportService getImportService() {
+ return new ImportService(typeDefStore, entityStore, typeRegistry);
+ }
}
diff --git a/repository/src/test/resources/ctas.zip b/repository/src/test/resources/ctas.zip
new file mode 100644
index 0000000..a77966c
--- /dev/null
+++ b/repository/src/test/resources/ctas.zip
Binary files differ