SENTRY-432: Fixing updateLog bug that can cause a missed entry during first few updates
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index 5fe89a8..cc849b9 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -58,6 +58,21 @@
           }
           retVal.getAuthzPermUpdate().add(update.toThrift());
         }
+        if (LOGGER.isDebugEnabled()) {
+          StringBuilder permSeq = new StringBuilder("<");
+          for (PermissionsUpdate permUpdate : permUpdates) {
+            permSeq.append(permUpdate.getSeqNum()).append(",");
+          }
+          permSeq.append(">");
+          StringBuilder pathSeq = new StringBuilder("<");
+          for (PathsUpdate pathUpdate : pathUpdates) {
+            pathSeq.append(pathUpdate.getSeqNum()).append(",");
+          }
+          pathSeq.append(">");
+          LOGGER.debug("#### Updates requested from HDFS ["
+              + "permReq=" + permSeqNum + ", permResp=" + permSeq + "] "
+              + "[pathReq=" + pathSeqNum + ", pathResp=" + pathSeq + "]");
+        }
       } catch (Exception e) {
         LOGGER.error("Error Sending updates to downstream Cache", e);
         throw new TException(e);
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
index 2815880..f321d3d 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
@@ -169,17 +169,26 @@
 
   private void appendToUpdateLog(K update) {
     synchronized (updateLog) {
+      boolean logCompacted = false;
       if (updateLogSize > 0) {
         if (update.hasFullImage() || (updateLog.size() == updateLogSize)) {
           // Essentially a log compaction
           updateLog.clear();
           updateLog.add(update.hasFullImage() ? update
               : createFullImageUpdate(update.getSeqNum()));
+          logCompacted = true;
         } else {
           updateLog.add(update);
         }
       }
       lastCommittedSeqNum.set(update.getSeqNum());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("#### Appending to Update Log ["
+            + "type=" + update.getClass() + ", "
+            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
+            + "lastSeen=" + lastSeenSeqNum.get() + ", "
+            + "logCompacted=" + logCompacted + "]");
+      }
     }
   }
 
@@ -192,6 +201,14 @@
     List<K> retVal = new LinkedList<K>();
     synchronized (updateLog) {
       long currSeqNum = lastCommittedSeqNum.get();
+      if (LOGGER.isDebugEnabled() && (updateable != null)) {
+        LOGGER.debug("#### GetAllUpdatesFrom ["
+            + "type=" + updateable.getClass() + ", "
+            + "reqSeqNum=" + seqNum + ", "
+            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
+            + "lastSeen=" + lastSeenSeqNum.get() + ", "
+            + "updateLogSize=" + updateLog.size() + "]");
+      }
       if (updateLogSize == 0) {
         // no updatelog configured..
         return retVal;
@@ -227,21 +244,10 @@
       } else {
         // increment iterator to requested seqNum
         Iterator<K> iter = updateLog.iterator();
-        K u = null;
         while (iter.hasNext()) {
-          u = iter.next();
-          if (u.getSeqNum() == seqNum) {
-            break;
-          }
-        }
-        // add all updates from requestedSeq
-        // to committedSeqNum
-        for (long seq = seqNum; seq <= currSeqNum; seq ++) {
-          retVal.add(u);
-          if (iter.hasNext()) {
-            u = iter.next();
-          } else {
-            break;
+          K elem = iter.next();
+          if (elem.getSeqNum() >= seqNum) {
+            retVal.add(elem);
           }
         }
       }
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
index d571df8..0c55bb1 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -38,16 +38,16 @@
   static class DummyUpdate implements Update {
     private long seqNum = 0;
     private boolean hasFullUpdate = false;
-    private String stuff;
+    private String state;
     public DummyUpdate(long seqNum, boolean hasFullUpdate) {
       this.seqNum = seqNum;
       this.hasFullUpdate = hasFullUpdate;
     }
-    public String getStuff() {
-      return stuff;
+    public String getState() {
+      return state;
     }
-    public DummyUpdate setStuff(String stuff) {
-      this.stuff = stuff;
+    public DummyUpdate setState(String stuff) {
+      this.state = stuff;
       return this;
     }
     @Override
@@ -72,7 +72,7 @@
     @Override
     public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
       for (DummyUpdate u : update) {
-        state.add(u.getStuff());
+        state.add(u.getState());
         lastUpdatedSeqNum = u.seqNum;
       }
     }
@@ -81,7 +81,7 @@
     public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
       DummyUpdatable retVal = new DummyUpdatable();
       retVal.lastUpdatedSeqNum = update.seqNum;
-      retVal.state = Lists.newArrayList(update.stuff.split(","));
+      retVal.state = Lists.newArrayList(update.state.split(","));
       return retVal;
     }
 
@@ -93,7 +93,7 @@
     @Override
     public DummyUpdate createFullImageUpdate(long currSeqNum) {
       DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
-      retVal.stuff = Joiner.on(",").join(state);
+      retVal.state = Joiner.on(",").join(state);
       return retVal;
     }
 
@@ -111,7 +111,7 @@
     @Override
     public DummyUpdate retrieveFullImage(long currSeqNum) {
       DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
-      retVal.stuff = state;
+      retVal.state = state;
       return retVal;
     }
   }
@@ -125,12 +125,12 @@
     Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
     List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertTrue(allUpdates.size() == 1);
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
 
     // If the current process has restarted the input seqNum will be > currSeq
     allUpdates = updateForwarder.getAllUpdatesFrom(100);
     Assert.assertTrue(allUpdates.size() == 1);
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
     Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
     allUpdates = updateForwarder.getAllUpdatesFrom(-1);
     Assert.assertEquals(0, allUpdates.size());
@@ -142,15 +142,44 @@
     imageRetreiver.setState("a,b,c");
     UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
         new DummyUpdatable(), imageRetreiver, 5);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
     Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
     List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertEquals(2, allUpdates.size());
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
-    Assert.assertEquals("d", allUpdates.get(1).getStuff());
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
+    Assert.assertEquals("d", allUpdates.get(1).getState());
+  }
+
+  // This happens when we the first update from HMS is a -1 (If the heartbeat
+  // thread checks Sentry's current seqNum before any update has come in)..
+  // This will lead the first and second entries in the updatelog to differ
+  // by more than +1..
+  @Test
+  public void testUpdateReceiveWithNullImageRetriver() throws Exception {
+    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+        new DummyUpdatable(), null, 5);
+    updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1);
+    Assert.assertEquals("a", allUpdates.get(0).getState());
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+    allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(2, allUpdates.size());
+    Assert.assertEquals("b", allUpdates.get(0).getState());
+    Assert.assertEquals("c", allUpdates.get(1).getState());
   }
 
   @Test
@@ -159,7 +188,7 @@
     imageRetreiver.setState("a,b,c");
     UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
         new DummyUpdatable(), imageRetreiver, 5);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
@@ -167,8 +196,8 @@
     List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertEquals(2, allUpdates.size());
 
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
 
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -176,23 +205,23 @@
     Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
     allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertEquals(4, allUpdates.size());
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
     Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
-    Assert.assertEquals("d", allUpdates.get(1).getStuff());
+    Assert.assertEquals("d", allUpdates.get(1).getState());
     Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
-    Assert.assertEquals("e", allUpdates.get(2).getStuff());
+    Assert.assertEquals("e", allUpdates.get(2).getState());
     Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
-    Assert.assertEquals("f", allUpdates.get(3).getStuff());
+    Assert.assertEquals("f", allUpdates.get(3).getState());
     Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
 
-    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
     Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
     allUpdates = updateForwarder.getAllUpdatesFrom(8);
     Assert.assertEquals(1, allUpdates.size());
-    Assert.assertEquals("g", allUpdates.get(0).getStuff());
+    Assert.assertEquals("g", allUpdates.get(0).getState());
   }
 
   @Test
@@ -201,13 +230,13 @@
     imageRetreiver.setState("a,b,c");
     UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
         new DummyUpdatable(), imageRetreiver, 5);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
 
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
 
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -215,29 +244,29 @@
     Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
     List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertEquals(4, allUpdates.size());
-    Assert.assertEquals("f", allUpdates.get(3).getStuff());
+    Assert.assertEquals("f", allUpdates.get(3).getState());
     Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
 
-    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
     Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
     allUpdates = updateForwarder.getAllUpdatesFrom(8);
     Assert.assertEquals(1, allUpdates.size());
-    Assert.assertEquals("g", allUpdates.get(0).getStuff());
+    Assert.assertEquals("g", allUpdates.get(0).getState());
 
     imageRetreiver.setState("a,b,c,d,e,f,g,h");
 
     // New update comes with SeqNum = 1
-    updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setStuff("h"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
     // NN plugin asks for next update
     allUpdates = updateForwarder.getAllUpdatesFrom(9);
     Assert.assertEquals(1, allUpdates.size());
-    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff());
+    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
     Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
   }
 
@@ -247,7 +276,7 @@
     imageRetreiver.setState("a,b,c");
     UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
         new DummyUpdatable(), imageRetreiver, 5);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
     }
@@ -255,12 +284,12 @@
     List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertEquals(2, allUpdates.size());
 
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setStuff("h"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setStuff("i"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setStuff("j"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j"));
 
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -268,11 +297,11 @@
     Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
     allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertEquals(3, allUpdates.size());
-    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff());
+    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
     Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
-    Assert.assertEquals("i", allUpdates.get(1).getStuff());
+    Assert.assertEquals("i", allUpdates.get(1).getState());
     Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
-    Assert.assertEquals("j", allUpdates.get(2).getStuff());
+    Assert.assertEquals("j", allUpdates.get(2).getState());
     Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
   }
 }