ATLAS-1919: Refactored ZipSink to record committed guids

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 3538cfd..8f45e9f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -433,6 +433,10 @@
     }
 
     private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException {
+        if(context.sink.hasEntity(entity.getEntity().getGuid())) {
+            return;
+        }
+
         context.sink.add(entity);
 
         context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
index 4bb04da..17ebbf1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
@@ -27,7 +27,9 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
@@ -35,6 +37,8 @@
     private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
 
     private ZipOutputStream zipOutputStream;
+    final Set<String>       guids = new HashSet<>();
+
 
     public ZipSink(OutputStream outputStream) {
         zipOutputStream = new ZipOutputStream(outputStream);
@@ -43,11 +47,13 @@
     public void add(AtlasEntity entity) throws AtlasBaseException {
         String jsonData = convertToJSON(entity);
         saveToZip(entity.getGuid(), jsonData);
+        recordAddedEntityGuids(entity);
     }
 
     public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
         String jsonData = convertToJSON(entityWithExtInfo);
         saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
+        recordAddedEntityGuids(entityWithExtInfo);
     }
 
     public void setResult(AtlasExportResult result) throws AtlasBaseException {
@@ -100,4 +106,19 @@
         zipOutputStream.write(payload.getBytes());
         zipOutputStream.closeEntry();
     }
+
+    public boolean hasEntity(String guid) {
+        return guids.contains(guid);
+    }
+
+    private void recordAddedEntityGuids(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+        guids.add(entityWithExtInfo.getEntity().getGuid());
+        if(entityWithExtInfo.getReferredEntities() != null) {
+            guids.addAll(entityWithExtInfo.getReferredEntities().keySet());
+        }
+    }
+
+    private void recordAddedEntityGuids(AtlasEntity entity) {
+        guids.add(entity.getGuid());
+    }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
index 635caf7..e8bbeb5 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
@@ -21,6 +21,7 @@
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.type.AtlasType;
 import org.testng.Assert;
@@ -35,11 +36,15 @@
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
+import static org.testng.Assert.*;
+
 public class ZipSinkTest {
     private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     private ZipSink zipSink;
     private List<String> defaultExportOrder = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
     private AtlasExportResult defaultExportResult;
+    private String knownEntityGuidFormat = "111-222-333-%s";
+
 
     private void initZipSinkWithExportOrder() throws AtlasBaseException {
         zipSink = new ZipSink(byteArrayOutputStream);
@@ -80,7 +85,7 @@
     @Test
     public void correctInit_succeeds() throws AtlasBaseException {
         initZipSinkWithExportOrder();
-        Assert.assertTrue(true);
+        assertTrue(true);
         Assert.assertNotNull(zipSink);
     }
 
@@ -95,11 +100,11 @@
                 Assert.assertNull(zis.getNextEntry());
             } catch (IOException e) {
 
-                Assert.assertTrue(false);
+                assertTrue(false);
             }
         } catch (AtlasBaseException e) {
 
-            Assert.assertTrue(false, "No exception should be thrown.");
+            assertTrue(false, "No exception should be thrown.");
         }
     }
 
@@ -109,7 +114,7 @@
         ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
         ZipEntry ze = zis.getNextEntry();
 
-        Assert.assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
+        assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
     }
 
     @Test
@@ -118,28 +123,80 @@
         ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
         zis.getNextEntry();
 
-        Assert.assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']");
+        assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']");
     }
 
     @Test
     public void zipWithExactlyTwoEntries_ContentsVerified() throws AtlasBaseException, IOException {
 
         ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
-        useZipSinkToCreateZipWithTwoEntries(byteOutputStream);
+        useZipSinkToCreateEntries(byteOutputStream);
 
         ByteArrayInputStream bis = new ByteArrayInputStream(byteOutputStream.toByteArray());
         ZipInputStream zipStream = new ZipInputStream(bis);
         ZipEntry entry = zipStream.getNextEntry();
 
-        Assert.assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]");
-        Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
+        assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]");
+        assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
 
         entry = zipStream.getNextEntry();
-        Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
-        Assert.assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult));
+        assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
+        assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult));
     }
 
-    private void useZipSinkToCreateZipWithTwoEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException {
+    @Test
+    public void recordsEntityEntries() throws AtlasBaseException {
+        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+        ZipSink zs = new ZipSink(byteOutputStream);
+
+        AtlasEntity entity = new AtlasEntity();
+        entity.setGuid(String.format(knownEntityGuidFormat, 0));
+
+        zs.add(entity);
+        assertTrue(zs.hasEntity(String.format(knownEntityGuidFormat, 0)));
+
+        zs.close();
+    }
+
+    @Test
+    public void recordsEntityWithExtInfoEntries() throws AtlasBaseException {
+        final int max_entries = 3;
+        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+        ZipSink zs = new ZipSink(byteOutputStream);
+
+        AtlasEntity entity = new AtlasEntity();
+        entity.setGuid(String.format(knownEntityGuidFormat, 0));
+
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(entity);
+        addReferredEntities(entityWithExtInfo, max_entries);
+
+        zs.add(entityWithExtInfo);
+        for (int i = 0; i <= max_entries; i++) {
+            String g = String.format(knownEntityGuidFormat, i);
+            assertTrue(zs.hasEntity(g));
+        }
+
+        zs.close();
+    }
+
+    private void addReferredEntities(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, int maxEntries) {
+
+        for (int i = 1; i <= maxEntries; i++) {
+            AtlasEntity entity1 = new AtlasEntity();
+            entity1.setGuid(String.format(knownEntityGuidFormat, i));
+            entityWithExtInfo.addReferredEntity(entity1);
+        }
+    }
+
+    @Test
+    public void recordsDoesNotRecordEntityEntries() throws AtlasBaseException {
+        initZipSinkWithExportOrder();
+
+        assertNotNull(zipSink);
+        assertFalse(zipSink.hasEntity(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()));
+    }
+
+    private void useZipSinkToCreateEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException {
         ZipSink zs = new ZipSink(byteOutputStream);
         zs.setExportOrder(defaultExportOrder);
         zs.setResult(getDefaultExportResult());