ATLAS-1851: update import API to support resume from a specific entity/position
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 4f2c1fb..b19f709 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -19,6 +19,7 @@
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -41,6 +42,8 @@
public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms";
+ private static final String START_POSITION_KEY = "startPosition";
+ private static final String START_GUID_KEY = "startGuid";
private Map<String, String> options;
@@ -70,4 +73,22 @@
public String toString() {
return toString(new StringBuilder()).toString();
}
+
+ @JsonIgnore
+ public String getStartGuid() {
+ if (this.options == null || !this.options.containsKey(START_GUID_KEY)) {
+ return null;
+ }
+
+ return (String) this.options.get(START_GUID_KEY);
+ }
+
+ @JsonIgnore
+ public String getStartPosition() {
+ if (this.options == null || !this.options.containsKey(START_POSITION_KEY)) {
+ return null;
+ }
+
+ return (String) this.options.get(START_GUID_KEY);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 8a7e358..4ffbb88 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -71,8 +71,10 @@
source.setImportTransform(ImportTransforms.fromJson(transforms));
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
+ setStartPosition(request, source);
processEntities(source, result);
+
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
@@ -90,6 +92,14 @@
return result;
}
+ private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
+ if(request.getStartGuid() != null) {
+ source.setPositionUsingEntityGuid(request.getStartGuid());
+ } else if(request.getStartPosition() != null) {
+ source.setPosition(Integer.parseInt(request.getStartPosition()));
+ }
+ }
+
public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
throws AtlasBaseException {
String fileName = (String) request.getOptions().get("FILENAME");
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 76451c9..aa1477f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -23,6 +23,7 @@
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
+import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
@@ -49,6 +50,7 @@
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
private ImportTransforms importTransform;
+ private int currentPosition;
public ZipSource(InputStream inputStream) throws IOException {
this(inputStream, null);
@@ -190,6 +192,7 @@
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
+ currentPosition++;
return getEntityWithExtInfo(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
@@ -227,8 +230,43 @@
return null;
}
+ public int size() {
+ return this.creationOrder.size();
+ }
+
@Override
public void onImportComplete(String guid) {
guidEntityJsonMap.remove(guid);
}
+
+
+ @Override
+ public void setPosition(int index) {
+ currentPosition = index;
+ reset();
+ for (int i = 0; i < creationOrder.size() && i <= index; i++) {
+ iterator.next();
+ }
+ }
+
+ @Override
+ public void setPositionUsingEntityGuid(String guid) {
+ if(StringUtils.isBlank(guid)) {
+ return;
+ }
+
+ int index = creationOrder.indexOf(guid);
+ if (index == -1) {
+ return;
+ }
+
+ setPosition(index);
+ }
+
+ @Override
+ public int getPosition() {
+ return currentPosition;
+ }
+
+
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 27c0b5d..75e9132 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -160,7 +160,8 @@
ret.setGuidAssignments(new HashMap<String, String>());
Set<String> processedGuids = new HashSet<>();
- int progressReportedAtCount = 0;
+ int streamSize = entityStream.size();
+ float currentPercent = 0f;
while (entityStream.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
@@ -173,16 +174,8 @@
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
-
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
-
- if ((processedGuids.size() - progressReportedAtCount) > 1000) {
- progressReportedAtCount = processedGuids.size();
-
- LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
- }
+ currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
+ entityStream.getPosition(), streamSize, currentPercent);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
@@ -192,12 +185,47 @@
}
importResult.getProcessedEntities().addAll(processedGuids);
- LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size());
+ LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
- private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+ private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
+ EntityMutationResponse resp,
+ AtlasImportResult importResult,
+ Set<String> processedGuids,
+ int currentIndex, int streamSize, float currentPercent) {
+
+ updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+
+ String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
+ currentEntity.getEntity().getTypeName(),
+ currentIndex,
+ currentEntity.getEntity().getGuid());
+
+ return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
+ }
+
+ private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
+ String additionalInfo) {
+ final double tolerance = 0.000001;
+ final int MAX_PERCENT = 100;
+
+ float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize);
+ boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
+ float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
+ ((updateLog) ? ++currentPercent : currentPercent);
+
+ if (updateLog) {
+ log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
+ }
+
+ return updatedPercent;
+ }
+
+ private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
index 69140e6..90ae15d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -21,12 +21,15 @@
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
+ private int currentPosition = 0;
+
public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
super(entityWithExtInfo, entityStream);
}
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ currentPosition++;
AtlasEntity entity = next();
return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null;
@@ -44,6 +47,25 @@
}
@Override
+ public int size() {
+ return 1;
+ }
+
+ @Override
+ public void setPosition(int position) {
+ // not applicable for a single entity stream
+ }
+
+ @Override
+ public int getPosition() {
+ return currentPosition;
+ }
+
+ @Override
+ public void setPositionUsingEntityGuid(String guid) {
+ }
+
+ @Override
public void onImportComplete(String guid) {
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
index 0f711db..d4b6c55 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
@@ -22,6 +22,12 @@
public interface EntityImportStream extends EntityStream {
+ int size();
+ void setPosition(int position);
+ int getPosition();
+
+ void setPositionUsingEntityGuid(String guid);
+
AtlasEntityWithExtInfo getNextEntityWithExtInfo();
void onImportComplete(String guid);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
index be9c20b..8a57dcd 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
@@ -21,6 +21,7 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.testng.Assert;
+import org.testng.ITestContext;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -29,7 +30,9 @@
import java.io.IOException;
import java.util.List;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
@@ -41,6 +44,11 @@
return new Object[][] {{ new ZipSource(fs) }};
}
+ @DataProvider(name = "sales")
+ public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException {
+ return getZipSource("sales-v1-full.zip");
+ }
+
@Test
public void improperInit_ReturnsNullCreationOrder() throws IOException, AtlasBaseException {
byte bytes[] = new byte[10];
@@ -108,7 +116,27 @@
assertEquals(e.getGuid(), creationOrder.get(i));
}
- Assert.assertFalse(zipSource.hasNext());
+ assertFalse(zipSource.hasNext());
+ }
+
+ @Test(dataProvider = "sales")
+ public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
+ Assert.assertTrue(zipSource.hasNext());
+
+ List<String> creationOrder = zipSource.getCreationOrder();
+ int moveToPosition_2 = 2;
+ zipSource.setPosition(moveToPosition_2);
+
+ assertEquals(zipSource.getPosition(), moveToPosition_2);
+ assertTrue(zipSource.getPosition() < creationOrder.size());
+
+ assertTrue(zipSource.hasNext());
+ for (int i = 1; i < 4; i++) {
+ zipSource.next();
+ assertEquals(zipSource.getPosition(), moveToPosition_2 + i);
+ }
+
+ assertTrue(zipSource.hasNext());
}
@Test(dataProvider = "zipFileStocks")
@@ -123,6 +151,7 @@
if(e.getTypeName().equals("hive_db")) {
Object o = e.getAttribute("qualifiedName");
String s = (String) o;
+
assertNotNull(e);
assertTrue(s.contains("@cl2"));
break;
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
new file mode 100644
index 0000000..10becc1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class AtlasEntityStoreV1BulkImportPercentTest {
+
+ private final int MAX_PERCENT = 100;
+ private List<Integer> percentHolder;
+ private Logger log;
+
+ public void setupPercentHolder(int max) {
+ percentHolder = new ArrayList<>();
+ }
+
+ @BeforeClass
+ void mockLog() {
+ log = mock(Logger.class);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Object[] args = invocationOnMock.getArguments();
+ Integer d = (Integer) args[1];
+ percentHolder.add(d.intValue());
+ return null;
+ }
+ }).when(log).info(anyString(), anyFloat(), anyInt(), anyString());
+ }
+
+ @Test
+ public void percentTest_Equal4() throws Exception {
+ runWithSize(4);
+ assertEqualsForPercentHolder(25.0, 50.0, 75.0, 100.0);
+ }
+
+ @Test
+ public void percentTest_Equal10() throws Exception {
+ runWithSize(10);
+
+ assertEqualsForPercentHolder(10.0, 20.0, 30.0, 40.0, 50, 60, 70, 80, 90, 100);
+ }
+
+ private void assertEqualsForPercentHolder(double... expected) {
+ assertEquals(percentHolder.size(), expected.length);
+ Object actual[] = percentHolder.toArray();
+ for (int i = 0; i < expected.length; i++) {
+ assertTrue((int) Double.compare((int) actual[i], expected[i]) == 0);
+ }
+ }
+
+ @Test
+ public void bulkImportPercentageTestLessThan100() throws Exception {
+ int streamSize = 20;
+
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100);
+ }
+
+ @Test
+ public void percentTest_Equal101() throws Exception {
+ int streamSize = 101;
+
+ double[] expected = fillPercentHolderWith100();
+
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(expected);
+ }
+
+ @Test
+ public void percentTest_Equal200() throws Exception {
+ int streamSize = 200;
+
+ double[] expected = fillPercentHolderWith100();
+
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(expected);
+ }
+
+ @Test
+ public void percentTest_Equal202() throws Exception {
+ int streamSize = 202;
+
+ double[] expected = fillPercentHolderWith100();
+
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(expected);
+ }
+
+ @Test
+ public void percentTest_Equal1001() throws Exception {
+ int streamSize = 1001;
+ double[] expected = fillPercentHolderWith100();
+
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(expected);
+ }
+
+ @Test
+ public void percentTest_Equal4323() throws Exception {
+ int streamSize = 4323;
+
+ double[] expected = fillPercentHolderWith100();
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(expected);
+ }
+
+ @Test
+ public void percentTest_Equal269() throws Exception {
+ int streamSize = 269;
+
+ double[] expected = fillPercentHolderWith100();
+ runWithSize(streamSize);
+ assertEqualsForPercentHolder(expected);
+ }
+
+ private void runWithSize(int streamSize) throws Exception {
+ float currentPercent = 0;
+ setupPercentHolder(streamSize);
+ for (int currentIndex = 0; currentIndex < streamSize; currentIndex++) {
+ currentPercent = invokeBulkImportProgress(currentIndex + 1, streamSize, currentPercent);
+ }
+ }
+
+ private float invokeBulkImportProgress(int currentIndex, int streamSize, float currentPercent) throws Exception {
+ return Whitebox.invokeMethod(AtlasEntityStoreV1.class, "updateImportProgress", log, currentIndex, streamSize, currentPercent, "additional info");
+ }
+
+ private double[] fillPercentHolderWith100() {
+ double start = 1;
+ double expected[] = new double[MAX_PERCENT];
+ for (int i = 0; i < expected.length; i++) {
+ expected[i] = start;
+ start ++;
+ }
+ return expected;
+ }
+}