SENTRY-2422: HMS synchronization is causing multiple entries of the same ID in SENTRY_HMS_NOTIFICATION_ID (Na Li, reviewed by Kalyan Kumar Kalvagadda)
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index d2a46cb..63e5dfe 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -3341,7 +3341,7 @@
               pm.setDetachAllOnCommit(false); // No need to detach objects
               deleteNotificationsSince(pm, notificationID + 1);
               // persist the notification ID
-              pm.makePersistent(new MSentryHmsNotification(notificationID));
+              persistUniqueNotificationIDCore(pm, notificationID);
 
               // persist the full snapshot
               long snapshotID = getCurrentAuthzPathsSnapshotID(pm);
@@ -4470,6 +4470,21 @@
   }
 
   /**
+   * Make sure the persisted notification ID is unique. There is no entries of duplicated value
+   * @param pm
+   * @param notificationId
+   * @return
+   */
+  MSentryHmsNotification persistUniqueNotificationIDCore(PersistenceManager pm, Long notificationId) {
+    Long maxNotificationId = getMaxPersistedIDCore(pm, MSentryHmsNotification.class, "notificationId", EMPTY_NOTIFICATION_ID);
+    if (notificationId <= maxNotificationId) {
+      return new MSentryHmsNotification(maxNotificationId);
+    }
+
+    return pm.makePersistent(new MSentryHmsNotification(notificationId));
+  }
+
+  /**
    * Set the notification ID of last processed HMS notification and remove all
    * subsequent notifications stored.
    */
@@ -4478,7 +4493,7 @@
     tm.executeTransaction(
             pm -> {
               deleteNotificationsSince(pm, notificationId + 1);
-              return pm.makePersistent(new MSentryHmsNotification(notificationId));
+              return persistUniqueNotificationIDCore(pm, notificationId);
             });
   }
 
@@ -4488,7 +4503,7 @@
   public void persistLastProcessedNotificationID(final Long notificationId) throws Exception {
     LOGGER.debug("Persisting Last Processed Notification ID {}", notificationId);
     tm.executeTransaction(
-            pm -> pm.makePersistent(new MSentryHmsNotification(notificationId)));
+            pm -> persistUniqueNotificationIDCore(pm, notificationId));
   }
 
   /**
diff --git a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index fd14963..f538988 100644
--- a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.sentry.provider.db.service.persistent;
 
+import com.codahale.metrics.Gauge;
 import java.io.File;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -2825,6 +2826,34 @@
   }
 
   @Test
+  public void testPersistOnlyUniqueNotificationId() throws Exception {
+    long notificationID = 1;
+
+    // Persist the same ID twice should not cause any issues
+    sentryStore.persistLastProcessedNotificationID(notificationID);
+    sentryStore.persistLastProcessedNotificationID(notificationID);
+
+    // Retrieving number of entries, and only unique values are stored
+    Gauge<Long> notificationGage = sentryStore.getLastNotificationIdGauge();
+    assertEquals((long)1, notificationGage.getValue().longValue());
+  }
+
+  @Test
+  public void testPersistOnlyMaxUniqueNotificationId() throws Exception {
+    long notificationID = 1;
+
+    // Persist the larger ID first, then less value
+    sentryStore.persistLastProcessedNotificationID(notificationID + 2);
+    sentryStore.persistLastProcessedNotificationID(notificationID);
+    sentryStore.persistLastProcessedNotificationID(notificationID + 1);
+
+    // Retrieving latest peristed ID should match with the max persisted ID
+    long latestID = sentryStore.getLastProcessedNotificationID();
+    assertEquals(notificationID + 2, latestID);
+  }
+
+
+  @Test
   public void testAddDeleteAuthzPathsMapping() throws Exception {
     long notificationID = 0;