ATLAS-1919: Refactored ZipSink to record committed guids
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 3538cfd..8f45e9f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -433,6 +433,10 @@
}
private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException {
+ if(context.sink.hasEntity(entity.getEntity().getGuid())) {
+ return;
+ }
+
context.sink.add(entity);
context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName()));
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
index 4bb04da..17ebbf1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
@@ -27,7 +27,9 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -35,6 +37,8 @@
private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
private ZipOutputStream zipOutputStream;
+ final Set<String> guids = new HashSet<>();
+
public ZipSink(OutputStream outputStream) {
zipOutputStream = new ZipOutputStream(outputStream);
@@ -43,11 +47,13 @@
public void add(AtlasEntity entity) throws AtlasBaseException {
String jsonData = convertToJSON(entity);
saveToZip(entity.getGuid(), jsonData);
+ recordAddedEntityGuids(entity);
}
public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
String jsonData = convertToJSON(entityWithExtInfo);
saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
+ recordAddedEntityGuids(entityWithExtInfo);
}
public void setResult(AtlasExportResult result) throws AtlasBaseException {
@@ -100,4 +106,19 @@
zipOutputStream.write(payload.getBytes());
zipOutputStream.closeEntry();
}
+
+ public boolean hasEntity(String guid) {
+ return guids.contains(guid);
+ }
+
+ private void recordAddedEntityGuids(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ guids.add(entityWithExtInfo.getEntity().getGuid());
+ if(entityWithExtInfo.getReferredEntities() != null) {
+ guids.addAll(entityWithExtInfo.getReferredEntities().keySet());
+ }
+ }
+
+ private void recordAddedEntityGuids(AtlasEntity entity) {
+ guids.add(entity.getGuid());
+ }
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
index 635caf7..e8bbeb5 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
@@ -21,6 +21,7 @@
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.type.AtlasType;
import org.testng.Assert;
@@ -35,11 +36,15 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
+import static org.testng.Assert.*;
+
public class ZipSinkTest {
private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private ZipSink zipSink;
private List<String> defaultExportOrder = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
private AtlasExportResult defaultExportResult;
+ private String knownEntityGuidFormat = "111-222-333-%s";
+
private void initZipSinkWithExportOrder() throws AtlasBaseException {
zipSink = new ZipSink(byteArrayOutputStream);
@@ -80,7 +85,7 @@
@Test
public void correctInit_succeeds() throws AtlasBaseException {
initZipSinkWithExportOrder();
- Assert.assertTrue(true);
+ assertTrue(true);
Assert.assertNotNull(zipSink);
}
@@ -95,11 +100,11 @@
Assert.assertNull(zis.getNextEntry());
} catch (IOException e) {
- Assert.assertTrue(false);
+ assertTrue(false);
}
} catch (AtlasBaseException e) {
- Assert.assertTrue(false, "No exception should be thrown.");
+ assertTrue(false, "No exception should be thrown.");
}
}
@@ -109,7 +114,7 @@
ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
ZipEntry ze = zis.getNextEntry();
- Assert.assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
+ assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
}
@Test
@@ -118,28 +123,80 @@
ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
zis.getNextEntry();
- Assert.assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']");
+ assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']");
}
@Test
public void zipWithExactlyTwoEntries_ContentsVerified() throws AtlasBaseException, IOException {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
- useZipSinkToCreateZipWithTwoEntries(byteOutputStream);
+ useZipSinkToCreateEntries(byteOutputStream);
ByteArrayInputStream bis = new ByteArrayInputStream(byteOutputStream.toByteArray());
ZipInputStream zipStream = new ZipInputStream(bis);
ZipEntry entry = zipStream.getNextEntry();
- Assert.assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]");
- Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
+ assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]");
+ assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
entry = zipStream.getNextEntry();
- Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
- Assert.assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult));
+ assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
+ assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult));
}
- private void useZipSinkToCreateZipWithTwoEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException {
+ @Test
+ public void recordsEntityEntries() throws AtlasBaseException {
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ ZipSink zs = new ZipSink(byteOutputStream);
+
+ AtlasEntity entity = new AtlasEntity();
+ entity.setGuid(String.format(knownEntityGuidFormat, 0));
+
+ zs.add(entity);
+ assertTrue(zs.hasEntity(String.format(knownEntityGuidFormat, 0)));
+
+ zs.close();
+ }
+
+ @Test
+ public void recordsEntityWithExtInfoEntries() throws AtlasBaseException {
+ final int max_entries = 3;
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ ZipSink zs = new ZipSink(byteOutputStream);
+
+ AtlasEntity entity = new AtlasEntity();
+ entity.setGuid(String.format(knownEntityGuidFormat, 0));
+
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(entity);
+ addReferredEntities(entityWithExtInfo, max_entries);
+
+ zs.add(entityWithExtInfo);
+ for (int i = 0; i <= max_entries; i++) {
+ String g = String.format(knownEntityGuidFormat, i);
+ assertTrue(zs.hasEntity(g));
+ }
+
+ zs.close();
+ }
+
+ private void addReferredEntities(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, int maxEntries) {
+
+ for (int i = 1; i <= maxEntries; i++) {
+ AtlasEntity entity1 = new AtlasEntity();
+ entity1.setGuid(String.format(knownEntityGuidFormat, i));
+ entityWithExtInfo.addReferredEntity(entity1);
+ }
+ }
+
+ @Test
+ public void recordsDoesNotRecordEntityEntries() throws AtlasBaseException {
+ initZipSinkWithExportOrder();
+
+ assertNotNull(zipSink);
+ assertFalse(zipSink.hasEntity(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()));
+ }
+
+ private void useZipSinkToCreateEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException {
ZipSink zs = new ZipSink(byteOutputStream);
zs.setExportOrder(defaultExportOrder);
zs.setResult(getDefaultExportResult());