ATLAS-1851: update import API to support resume from a specific entity/position

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 4f2c1fb..b19f709 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -19,6 +19,7 @@
 
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -41,6 +42,8 @@
 public class AtlasImportRequest implements Serializable {
     private static final long   serialVersionUID = 1L;
     public  static final String TRANSFORMS_KEY   = "transforms";
+    private static final String START_POSITION_KEY = "startPosition";
+    private static final String START_GUID_KEY = "startGuid";
 
     private Map<String, String> options;
 
@@ -70,4 +73,22 @@
     public String toString() {
         return toString(new StringBuilder()).toString();
     }
+
+    @JsonIgnore
+    public String getStartGuid() {
+        if (this.options == null || !this.options.containsKey(START_GUID_KEY)) {
+            return null;
+        }
+
+        return (String) this.options.get(START_GUID_KEY);
+    }
+
+    @JsonIgnore
+    public String getStartPosition() {
+        if (this.options == null || !this.options.containsKey(START_POSITION_KEY)) {
+            return null;
+        }
+
+        return (String) this.options.get(START_GUID_KEY);
+    }
  }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 8a7e358..4ffbb88 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -71,8 +71,10 @@
             source.setImportTransform(ImportTransforms.fromJson(transforms));
             startTimestamp = System.currentTimeMillis();
             processTypes(source.getTypesDef(), result);
+            setStartPosition(request, source);
             processEntities(source, result);
 
+
             result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
         } catch (AtlasBaseException excp) {
             LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
@@ -90,6 +92,14 @@
         return result;
     }
 
+    private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
+        if(request.getStartGuid() != null) {
+            source.setPositionUsingEntityGuid(request.getStartGuid());
+        } else if(request.getStartPosition() != null) {
+            source.setPosition(Integer.parseInt(request.getStartPosition()));
+        }
+    }
+
     public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP)
             throws AtlasBaseException {
         String fileName = (String) request.getOptions().get("FILENAME");
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 76451c9..aa1477f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -23,6 +23,7 @@
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
+import org.apache.commons.lang.StringUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
@@ -49,6 +50,7 @@
     private Iterator<String>     iterator;
     private Map<String, String>  guidEntityJsonMap;
     private ImportTransforms     importTransform;
+    private int currentPosition;
 
     public ZipSource(InputStream inputStream) throws IOException {
         this(inputStream, null);
@@ -190,6 +192,7 @@
     @Override
     public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
         try {
+            currentPosition++;
             return getEntityWithExtInfo(this.iterator.next());
         } catch (AtlasBaseException e) {
             e.printStackTrace();
@@ -227,8 +230,43 @@
         return null;
     }
 
+    public int size() {
+        return this.creationOrder.size();
+    }
+
     @Override
     public void onImportComplete(String guid) {
         guidEntityJsonMap.remove(guid);
     }
+
+
+    @Override
+    public void setPosition(int index) {
+        currentPosition = index;
+        reset();
+        for (int i = 0; i < creationOrder.size() && i <= index; i++) {
+            iterator.next();
+        }
+    }
+
+    @Override
+    public void setPositionUsingEntityGuid(String guid) {
+        if(StringUtils.isBlank(guid)) {
+            return;
+        }
+
+        int index = creationOrder.indexOf(guid);
+        if (index == -1) {
+            return;
+        }
+
+        setPosition(index);
+    }
+
+    @Override
+    public int getPosition() {
+        return currentPosition;
+    }
+
+
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 27c0b5d..75e9132 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -160,7 +160,8 @@
         ret.setGuidAssignments(new HashMap<String, String>());
 
         Set<String> processedGuids          = new HashSet<>();
-        int         progressReportedAtCount = 0;
+        int         streamSize              = entityStream.size();
+        float       currentPercent          = 0f;
 
         while (entityStream.hasNext()) {
             AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo();
@@ -173,16 +174,8 @@
             AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
 
             EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true);
-
-            updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
-            updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
-
-            if ((processedGuids.size() - progressReportedAtCount) > 1000) {
-                progressReportedAtCount = processedGuids.size();
-
-                LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount);
-            }
+            currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
+                    entityStream.getPosition(), streamSize, currentPercent);
 
             if (resp.getGuidAssignments() != null) {
                 ret.getGuidAssignments().putAll(resp.getGuidAssignments());
@@ -192,12 +185,47 @@
         }
 
         importResult.getProcessedEntities().addAll(processedGuids);
-        LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size());
+        LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
 
         return ret;
     }
 
-    private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+    private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
+                                      EntityMutationResponse resp,
+                                      AtlasImportResult importResult,
+                                      Set<String> processedGuids,
+                                      int currentIndex, int streamSize, float currentPercent) {
+
+        updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+        updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+        updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+
+        String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
+                                            currentEntity.getEntity().getTypeName(),
+                                            currentIndex,
+                                            currentEntity.getEntity().getGuid());
+
+        return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
+    }
+
+    private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
+                                              String additionalInfo) {
+        final double tolerance = 0.000001;
+        final int MAX_PERCENT = 100;
+
+        float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize);
+        boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
+        float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
+                                ((updateLog) ? ++currentPercent : currentPercent);
+
+        if (updateLog) {
+            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
+        }
+
+        return updatedPercent;
+    }
+
+    private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
         if (list == null) {
             return;
         }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
index 69140e6..90ae15d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java
@@ -21,12 +21,15 @@
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 
 public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
+    private int currentPosition = 0;
+
     public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
         super(entityWithExtInfo, entityStream);
     }
 
     @Override
     public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+        currentPosition++;
         AtlasEntity entity = next();
 
         return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null;
@@ -44,6 +47,25 @@
     }
 
     @Override
+    public int size() {
+        return 1;
+    }
+
+    @Override
+    public void setPosition(int position) {
+        // not applicable for a single entity stream
+    }
+
+    @Override
+    public int getPosition() {
+        return currentPosition;
+    }
+
+    @Override
+    public void setPositionUsingEntityGuid(String guid) {
+    }
+
+    @Override
     public void onImportComplete(String guid) {
 
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
index 0f711db..d4b6c55 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java
@@ -22,6 +22,12 @@
 
 public interface EntityImportStream extends EntityStream {
 
+    int size();
+    void setPosition(int position);
+    int getPosition();
+
+    void setPositionUsingEntityGuid(String guid);
+
     AtlasEntityWithExtInfo getNextEntityWithExtInfo();
 
     void onImportComplete(String guid);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
index be9c20b..8a57dcd 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java
@@ -21,6 +21,7 @@
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.testng.Assert;
+import org.testng.ITestContext;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -29,7 +30,9 @@
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.AssertJUnit.assertTrue;
 
@@ -41,6 +44,11 @@
         return new Object[][] {{ new ZipSource(fs) }};
     }
 
+    @DataProvider(name = "sales")
+    public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException {
+        return getZipSource("sales-v1-full.zip");
+    }
+
     @Test
     public void improperInit_ReturnsNullCreationOrder() throws IOException, AtlasBaseException {
         byte bytes[] = new byte[10];
@@ -108,7 +116,27 @@
             assertEquals(e.getGuid(), creationOrder.get(i));
         }
 
-        Assert.assertFalse(zipSource.hasNext());
+        assertFalse(zipSource.hasNext());
+    }
+
+    @Test(dataProvider = "sales")
+    public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException {
+        Assert.assertTrue(zipSource.hasNext());
+
+        List<String> creationOrder = zipSource.getCreationOrder();
+        int moveToPosition_2 = 2;
+        zipSource.setPosition(moveToPosition_2);
+
+        assertEquals(zipSource.getPosition(), moveToPosition_2);
+        assertTrue(zipSource.getPosition() < creationOrder.size());
+
+        assertTrue(zipSource.hasNext());
+        for (int i = 1; i < 4; i++) {
+            zipSource.next();
+            assertEquals(zipSource.getPosition(), moveToPosition_2 + i);
+        }
+
+        assertTrue(zipSource.hasNext());
     }
 
     @Test(dataProvider = "zipFileStocks")
@@ -123,6 +151,7 @@
             if(e.getTypeName().equals("hive_db")) {
                 Object o = e.getAttribute("qualifiedName");
                 String s = (String) o;
+
                 assertNotNull(e);
                 assertTrue(s.contains("@cl2"));
                 break;
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
new file mode 100644
index 0000000..10becc1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class AtlasEntityStoreV1BulkImportPercentTest {
+
+    private final int MAX_PERCENT = 100;
+    private List<Integer> percentHolder;
+    private Logger log;
+
+    public void setupPercentHolder(int max) {
+        percentHolder = new ArrayList<>();
+    }
+
+    @BeforeClass
+    void mockLog() {
+        log = mock(Logger.class);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Object[] args = invocationOnMock.getArguments();
+                Integer d = (Integer) args[1];
+                percentHolder.add(d.intValue());
+                return null;
+            }
+        }).when(log).info(anyString(), anyFloat(), anyInt(), anyString());
+    }
+
+    @Test
+    public void percentTest_Equal4() throws Exception {
+        runWithSize(4);
+        assertEqualsForPercentHolder(25.0, 50.0, 75.0, 100.0);
+    }
+
+    @Test
+    public void percentTest_Equal10() throws Exception {
+        runWithSize(10);
+
+        assertEqualsForPercentHolder(10.0, 20.0, 30.0, 40.0, 50, 60, 70, 80, 90, 100);
+    }
+
+    private void assertEqualsForPercentHolder(double... expected) {
+        assertEquals(percentHolder.size(), expected.length);
+        Object actual[] = percentHolder.toArray();
+        for (int i = 0; i < expected.length; i++) {
+            assertTrue((int) Double.compare((int) actual[i], expected[i]) == 0);
+        }
+    }
+
+    @Test
+    public void bulkImportPercentageTestLessThan100() throws Exception {
+        int streamSize = 20;
+
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100);
+    }
+
+    @Test
+    public void percentTest_Equal101() throws Exception {
+        int streamSize = 101;
+
+        double[] expected = fillPercentHolderWith100();
+
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(expected);
+    }
+
+    @Test
+    public void percentTest_Equal200() throws Exception {
+        int streamSize = 200;
+
+        double[] expected = fillPercentHolderWith100();
+
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(expected);
+    }
+
+    @Test
+    public void percentTest_Equal202() throws Exception {
+        int streamSize = 202;
+
+        double[] expected = fillPercentHolderWith100();
+
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(expected);
+    }
+
+    @Test
+    public void percentTest_Equal1001() throws Exception {
+        int streamSize = 1001;
+        double[] expected = fillPercentHolderWith100();
+
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(expected);
+    }
+
+    @Test
+    public void percentTest_Equal4323() throws Exception {
+        int streamSize = 4323;
+
+        double[] expected = fillPercentHolderWith100();
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(expected);
+    }
+
+    @Test
+    public void percentTest_Equal269() throws Exception {
+        int streamSize = 269;
+
+        double[] expected = fillPercentHolderWith100();
+        runWithSize(streamSize);
+        assertEqualsForPercentHolder(expected);
+    }
+
+    private void runWithSize(int streamSize) throws Exception {
+        float currentPercent = 0;
+        setupPercentHolder(streamSize);
+        for (int currentIndex = 0; currentIndex < streamSize; currentIndex++) {
+            currentPercent = invokeBulkImportProgress(currentIndex + 1, streamSize, currentPercent);
+        }
+    }
+
+    private float invokeBulkImportProgress(int currentIndex, int streamSize, float currentPercent) throws Exception {
+        return Whitebox.invokeMethod(AtlasEntityStoreV1.class, "updateImportProgress", log, currentIndex, streamSize, currentPercent, "additional info");
+    }
+
+    private double[] fillPercentHolderWith100() {
+        double start = 1;
+        double expected[] = new double[MAX_PERCENT];
+        for (int i = 0; i < expected.length; i++) {
+            expected[i] = start;
+            start ++;
+        }
+        return expected;
+    }
+}