ATLAS-4627 : Delete Existing entity in Atlas which are not present in Hive, DB wise through Import-hive command.

Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 28365bc..49c721c 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -249,6 +249,9 @@
         System.out.println("    Usage 7: import-hive.sh -i -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
         System.out.println("        To create zip file with exported data without importing to Atlas which can be imported later ...");
         System.out.println();
+        System.out.println("    Usage 8: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>] [-deleteNonExisting] ");
+        System.out.println("        Delete database and table wise which are not present in ATLAS and present in HIVE ...");
+        System.out.println();
     }
 
     /**
@@ -319,7 +322,7 @@
         LOG.info("delete non existing flag : {} ", deleteNonExisting);
 
         if (deleteNonExisting) {
-            deleteEntitiesForNonExistingHiveMetadata(failOnError);
+            deleteEntitiesForNonExistingHiveMetadata(failOnError, databaseToImport, tableToImport);
             ret = true;
         } else if (StringUtils.isNotEmpty(fileToImport)) {
             File f = new File(fileToImport);
@@ -1161,13 +1164,18 @@
         }
     }
 
-    public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError) throws Exception {
+    public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError, String databaseToDelete, String tableToDelete) throws Exception {
 
         //fetch databases from Atlas
         List<AtlasEntityHeader> dbs = null;
         try {
-            dbs = getAllDatabaseInCluster();
-            LOG.info("Total Databases in cluster {} : {} ", metadataNamespace, dbs.size());
+            if (!StringUtils.isEmpty(databaseToDelete))
+                dbs = getSingleDatabaseInCluster(databaseToDelete);
+           else {
+                dbs = getAllDatabaseInCluster();
+                LOG.info("Total Databases in cluster {} : {} ", metadataNamespace, dbs.size());
+            }
+
         } catch (AtlasServiceException e) {
             LOG.error("Failed to retrieve database entities for cluster {} from Atlas", metadataNamespace, e);
             if (failOnError) {
@@ -1189,8 +1197,13 @@
 
                 List<AtlasEntityHeader> tables;
                 try {
-                    tables = getAllTablesInDb(dbGuid);
-                    LOG.info("Total Tables in database {} : {} ", hiveDbName, tables.size());
+                    if (!StringUtils.isEmpty(tableToDelete))
+                       tables = getSingleTableInCluster(databaseToDelete, tableToDelete);
+                    else {
+                        tables = getAllTablesInDb(dbGuid);
+                        LOG.info("Total Tables in database {} : {} ", hiveDbName, tables.size());
+                    }
+
                 } catch (AtlasServiceException e) {
                     LOG.error("Failed to retrieve table entities for database {} from Atlas", hiveDbName, e);
                     if (failOnError) {
@@ -1261,4 +1274,38 @@
         }
 
     }
+
+    private List<AtlasEntityHeader> getSingleDatabaseInCluster(String databaseName) throws AtlasServiceException {
+
+        String dbQualifiedName = getDBQualifiedName(metadataNamespace, databaseName.toLowerCase());
+
+        SearchParameters.FilterCriteria fc = new SearchParameters.FilterCriteria();
+        fc.setAttributeName(ATTRIBUTE_QUALIFIED_NAME);
+        fc.setAttributeValue(dbQualifiedName);
+        fc.setOperator(SearchParameters.Operator.EQ);
+        fc.setCondition(SearchParameters.FilterCriteria.Condition.AND);
+        LOG.info("Searching for database : {}", dbQualifiedName);
+
+        AtlasSearchResult searchResult = atlasClientV2.basicSearch(HIVE_TYPE_DB, fc, null, null, true, 25, 0);
+
+        List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
+        return entityHeaders;
+    }
+
+    private List<AtlasEntityHeader> getSingleTableInCluster(String databaseName, String tableName) throws AtlasServiceException {
+
+        String tableQualifiedName = getTableQualifiedName(metadataNamespace, databaseName.toLowerCase(), tableName.toLowerCase());
+
+        SearchParameters.FilterCriteria fc = new SearchParameters.FilterCriteria();
+        fc.setAttributeName(ATTRIBUTE_QUALIFIED_NAME);
+        fc.setAttributeValue(tableQualifiedName);
+        fc.setOperator(SearchParameters.Operator.EQ);
+        fc.setCondition(SearchParameters.FilterCriteria.Condition.AND);
+        LOG.info("Searching for table : {}", tableQualifiedName);
+
+        AtlasSearchResult searchResult = atlasClientV2.basicSearch(HIVE_TYPE_TABLE, fc, null, null, true, 25, 0);
+
+        List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
+        return entityHeaders;
+    }
 }
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index ae7ab1a..aca60a2 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -22,8 +22,12 @@
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -47,6 +51,7 @@
 import java.util.List;
 
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.mock;
@@ -57,6 +62,12 @@
     private static final String TEST_DB_NAME       = "default";
     public  static final String METADATA_NAMESPACE = "primary";
     public  static final String TEST_TABLE_NAME    = "test_table";
+    public  static final String TEST_DB_NAME_2     = "enr_edl";
+    public  static final String TEST_DB_NAME_3     = "dummy";
+    public  static final String TEST_TABLE_NAME_2  = "testing_enr_edl_1";
+    public  static final String TEST_TABLE_NAME_3  = "testing_enr_edl_2";
+    public  static final String TEST_TABLE_NAME_4  = "testing_dummy_1";
+    public  static final String TEST_TABLE_NAME_5  = "testing_dummy_2";
 
     @Mock
     private Hive hiveClient;
@@ -324,4 +335,103 @@
             return attrValue.equals(((AtlasEntity) o).getAttribute(attrName));
         }
     }
+
+    @Test
+    public void testDeleteEntitiesForNonExistingHiveMetadata() throws Exception {
+
+        String DB1_GUID = "72e06b34-9151-4023-aa9d-b82103a50e76";
+        String DB2_GUID = "98w06b34-9151-4023-aa9d-b82103a50w67";
+        String DB1_TABLE1_GUID = "82e06b34-9151-4023-aa9d-b82103a50e77";
+        String DB1_TABLE2_GUID = "66e06b34-9151-4023-aa9d-b82103a50e55";
+        String DB2_TABLE1_GUID = "99q06b34-9151-4023-aa9d-b82103a50i22";
+        String DB2_TABLE2_GUID = "48z06b34-9151-4023-aa9d-b82103a50n39";
+
+        // IN BOTH HIVE AND ATLAS GUID IS PRESENT MEANS TABLE/ENTITY IS PRESENT SO WILL DO IMPORT HIVE SCRIPT RUN
+        // 1) WHEN DB 1 AND TABLE 1 BOTH ARE PRESENT IN IMPORT-HIVE SCRIPT COMMAND, THEN DELETING ONLY SINGLE TABLE FROM DB 1.
+
+        AtlasEntityHeader atlasEntityHeader = new AtlasEntityHeader(HIVE_TYPE_DB, DB1_TABLE1_GUID,
+                Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getDBQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME)));
+        AtlasSearchResult atlasSearchResult = new AtlasSearchResult();
+        atlasSearchResult.setEntities(Collections.singletonList(atlasEntityHeader));
+
+        SearchParameters.FilterCriteria filterCriteria = new SearchParameters.FilterCriteria();
+        filterCriteria.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
+        filterCriteria.setAttributeValue(METADATA_NAMESPACE);
+        filterCriteria.setOperator(SearchParameters.Operator.EQ);
+
+        when(atlasClientV2.basicSearch(HIVE_TYPE_DB, filterCriteria, null, TEST_DB_NAME_2, true, 1, 100))
+                .thenReturn(atlasSearchResult);
+
+        AtlasEntityHeader atlasEntityHeader1 = new AtlasEntityHeader(HIVE_TYPE_TABLE, DB1_TABLE1_GUID,
+                Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_2, TEST_TABLE_NAME_2)));
+        AtlasEntityHeader atlasEntityHeader2 = new AtlasEntityHeader(HIVE_TYPE_TABLE, DB1_TABLE2_GUID,
+                Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_2, TEST_TABLE_NAME_3)));
+        AtlasSearchResult atlasSearchResult1 = new AtlasSearchResult();
+        atlasSearchResult1.setEntities(Arrays.asList(atlasEntityHeader1, atlasEntityHeader2));
+
+        SearchParameters.FilterCriteria filterCriteria1 = new SearchParameters.FilterCriteria();
+        filterCriteria1.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
+        filterCriteria1.setAttributeValue(HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_2.toLowerCase(), TEST_TABLE_NAME_2.toLowerCase()));
+        filterCriteria1.setAttributeValue(METADATA_NAMESPACE);
+        filterCriteria1.setOperator(SearchParameters.Operator.EQ);
+
+        when(atlasClientV2.basicSearch(HIVE_TYPE_TABLE, filterCriteria1, null, TEST_TABLE_NAME_2, true, 1, 100))
+                .thenReturn(atlasSearchResult1);
+
+        EntityMutationResponse entityMutationResponse1 = new EntityMutationResponse();
+        entityMutationResponse1.setMutatedEntities(Collections.singletonMap(EntityMutations.EntityOperation.DELETE, Arrays.asList(atlasEntityHeader1)));
+        when(atlasClientV2.deleteEntityByGuid(DB1_TABLE1_GUID)).thenReturn(entityMutationResponse1);
+
+        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
+        hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(true, TEST_DB_NAME_2, TEST_TABLE_NAME_2);
+
+        assertEquals(DB1_TABLE1_GUID, entityMutationResponse1.getMutatedEntities().get(EntityMutations.EntityOperation.DELETE).get(0).getGuid());
+
+        // 1) WHEN DB 2 AND TABLE 1 BOTH ARE PRESENT, THEN DELETING ONLY SINGLE TABLE FROM DB 2.
+
+        AtlasEntityHeader atlasEntityHeader3 = new AtlasEntityHeader(HIVE_TYPE_DB, DB2_TABLE1_GUID,
+                Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getDBQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME)));
+        AtlasSearchResult atlasSearchResult2 = new AtlasSearchResult();
+        atlasSearchResult2.setEntities(Collections.singletonList(atlasEntityHeader3));
+
+        when(atlasClientV2.basicSearch(HIVE_TYPE_DB, filterCriteria, null, TEST_DB_NAME_3, true, 1, 100))
+                .thenReturn(atlasSearchResult2);
+
+        AtlasSearchResult atlasSearchResult3 = new AtlasSearchResult();
+        atlasSearchResult3.setEntities(Arrays.asList(new AtlasEntityHeader(HIVE_TYPE_TABLE, DB2_TABLE1_GUID,
+                        Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_3, TEST_TABLE_NAME_4))),
+                new AtlasEntityHeader(HIVE_TYPE_TABLE, DB2_TABLE1_GUID,
+                        Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_3, TEST_TABLE_NAME_4)))));
+
+        SearchParameters.FilterCriteria filterCriteria2 = new SearchParameters.FilterCriteria();
+        filterCriteria2.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
+        filterCriteria2.setAttributeValue(HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_3.toLowerCase(), TEST_TABLE_NAME_4.toLowerCase()));
+        filterCriteria2.setAttributeValue(METADATA_NAMESPACE);
+        filterCriteria2.setOperator(SearchParameters.Operator.EQ);
+
+        when(atlasClientV2.basicSearch(HIVE_TYPE_TABLE, filterCriteria2, null, TEST_TABLE_NAME_4, true, 1, 100))
+                .thenReturn(atlasSearchResult1);
+
+        EntityMutationResponse entityMutationResponse2 = new EntityMutationResponse();
+        entityMutationResponse2.setMutatedEntities(Collections.singletonMap(EntityMutations.EntityOperation.DELETE, Arrays.asList(new AtlasEntityHeader(HIVE_TYPE_TABLE, DB2_TABLE1_GUID,
+                Collections.singletonMap(AtlasClient.QUALIFIED_NAME, HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME_3, TEST_TABLE_NAME_4))))));
+        when(atlasClientV2.deleteEntityByGuid(DB2_TABLE1_GUID)).thenReturn(entityMutationResponse2);
+
+        hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(true, TEST_DB_NAME_3, TEST_TABLE_NAME_4);
+
+        assertEquals(DB2_TABLE1_GUID, entityMutationResponse2.getMutatedEntities().get(EntityMutations.EntityOperation.DELETE).get(0).getGuid());
+
+        // 3) WHEN DB 1 IS PRESENT, THEN DELETING ALL TABLE FROM DB
+
+        EntityMutationResponse entityMutationResponse3 = new EntityMutationResponse();
+        entityMutationResponse3.setMutatedEntities(Collections.singletonMap(EntityMutations.EntityOperation.DELETE, Arrays.asList(atlasEntityHeader1, atlasEntityHeader2)));
+        when(atlasClientV2.deleteEntityByGuid(DB1_TABLE1_GUID)).thenReturn(entityMutationResponse2);
+        when(atlasClientV2.deleteEntityByGuid(DB1_TABLE2_GUID)).thenReturn(entityMutationResponse2);
+        hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(true, TEST_DB_NAME_2, null);
+
+        assertEquals(DB1_TABLE1_GUID, entityMutationResponse3.getMutatedEntities().get(EntityMutations.EntityOperation.DELETE).get(0).getGuid());
+        assertEquals(DB1_TABLE2_GUID, entityMutationResponse3.getMutatedEntities().get(EntityMutations.EntityOperation.DELETE).get(1).getGuid());
+    }
+
+
 }