SENTRY-432: Fixing updateLog bug that can cause a missed entry during first few updates
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index 5fe89a8..cc849b9 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -58,6 +58,21 @@
}
retVal.getAuthzPermUpdate().add(update.toThrift());
}
+ if (LOGGER.isDebugEnabled()) {
+ StringBuilder permSeq = new StringBuilder("<");
+ for (PermissionsUpdate permUpdate : permUpdates) {
+ permSeq.append(permUpdate.getSeqNum()).append(",");
+ }
+ permSeq.append(">");
+ StringBuilder pathSeq = new StringBuilder("<");
+ for (PathsUpdate pathUpdate : pathUpdates) {
+ pathSeq.append(pathUpdate.getSeqNum()).append(",");
+ }
+ pathSeq.append(">");
+ LOGGER.debug("#### Updates requested from HDFS ["
+ + "permReq=" + permSeqNum + ", permResp=" + permSeq + "] "
+ + "[pathReq=" + pathSeqNum + ", pathResp=" + pathSeq + "]");
+ }
} catch (Exception e) {
LOGGER.error("Error Sending updates to downstream Cache", e);
throw new TException(e);
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
index 2815880..f321d3d 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
@@ -169,17 +169,26 @@
private void appendToUpdateLog(K update) {
synchronized (updateLog) {
+ boolean logCompacted = false;
if (updateLogSize > 0) {
if (update.hasFullImage() || (updateLog.size() == updateLogSize)) {
// Essentially a log compaction
updateLog.clear();
updateLog.add(update.hasFullImage() ? update
: createFullImageUpdate(update.getSeqNum()));
+ logCompacted = true;
} else {
updateLog.add(update);
}
}
lastCommittedSeqNum.set(update.getSeqNum());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("#### Appending to Update Log ["
+ + "type=" + update.getClass() + ", "
+ + "lastCommit=" + lastCommittedSeqNum.get() + ", "
+ + "lastSeen=" + lastSeenSeqNum.get() + ", "
+ + "logCompacted=" + logCompacted + "]");
+ }
}
}
@@ -192,6 +201,14 @@
List<K> retVal = new LinkedList<K>();
synchronized (updateLog) {
long currSeqNum = lastCommittedSeqNum.get();
+ if (LOGGER.isDebugEnabled() && (updateable != null)) {
+ LOGGER.debug("#### GetAllUpdatesFrom ["
+ + "type=" + updateable.getClass() + ", "
+ + "reqSeqNum=" + seqNum + ", "
+ + "lastCommit=" + lastCommittedSeqNum.get() + ", "
+ + "lastSeen=" + lastSeenSeqNum.get() + ", "
+ + "updateLogSize=" + updateLog.size() + "]");
+ }
if (updateLogSize == 0) {
// no updatelog configured..
return retVal;
@@ -227,21 +244,10 @@
} else {
// increment iterator to requested seqNum
Iterator<K> iter = updateLog.iterator();
- K u = null;
while (iter.hasNext()) {
- u = iter.next();
- if (u.getSeqNum() == seqNum) {
- break;
- }
- }
- // add all updates from requestedSeq
- // to committedSeqNum
- for (long seq = seqNum; seq <= currSeqNum; seq ++) {
- retVal.add(u);
- if (iter.hasNext()) {
- u = iter.next();
- } else {
- break;
+ K elem = iter.next();
+ if (elem.getSeqNum() >= seqNum) {
+ retVal.add(elem);
}
}
}
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
index d571df8..0c55bb1 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -38,16 +38,16 @@
static class DummyUpdate implements Update {
private long seqNum = 0;
private boolean hasFullUpdate = false;
- private String stuff;
+ private String state;
public DummyUpdate(long seqNum, boolean hasFullUpdate) {
this.seqNum = seqNum;
this.hasFullUpdate = hasFullUpdate;
}
- public String getStuff() {
- return stuff;
+ public String getState() {
+ return state;
}
- public DummyUpdate setStuff(String stuff) {
- this.stuff = stuff;
+ public DummyUpdate setState(String stuff) {
+ this.state = stuff;
return this;
}
@Override
@@ -72,7 +72,7 @@
@Override
public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
for (DummyUpdate u : update) {
- state.add(u.getStuff());
+ state.add(u.getState());
lastUpdatedSeqNum = u.seqNum;
}
}
@@ -81,7 +81,7 @@
public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
DummyUpdatable retVal = new DummyUpdatable();
retVal.lastUpdatedSeqNum = update.seqNum;
- retVal.state = Lists.newArrayList(update.stuff.split(","));
+ retVal.state = Lists.newArrayList(update.state.split(","));
return retVal;
}
@@ -93,7 +93,7 @@
@Override
public DummyUpdate createFullImageUpdate(long currSeqNum) {
DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
- retVal.stuff = Joiner.on(",").join(state);
+ retVal.state = Joiner.on(",").join(state);
return retVal;
}
@@ -111,7 +111,7 @@
@Override
public DummyUpdate retrieveFullImage(long currSeqNum) {
DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
- retVal.stuff = state;
+ retVal.state = state;
return retVal;
}
}
@@ -125,12 +125,12 @@
Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertTrue(allUpdates.size() == 1);
- Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
// If the current process has restarted the input seqNum will be > currSeq
allUpdates = updateForwarder.getAllUpdatesFrom(100);
Assert.assertTrue(allUpdates.size() == 1);
- Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
allUpdates = updateForwarder.getAllUpdatesFrom(-1);
Assert.assertEquals(0, allUpdates.size());
@@ -142,15 +142,44 @@
imageRetreiver.setState("a,b,c");
UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
new DummyUpdatable(), imageRetreiver, 5);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertEquals(2, allUpdates.size());
- Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
- Assert.assertEquals("d", allUpdates.get(1).getStuff());
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
+ Assert.assertEquals("d", allUpdates.get(1).getState());
+ }
+
+ // This happens when we the first update from HMS is a -1 (If the heartbeat
+ // thread checks Sentry's current seqNum before any update has come in)..
+ // This will lead the first and second entries in the updatelog to differ
+ // by more than +1..
+ @Test
+ public void testUpdateReceiveWithNullImageRetriver() throws Exception {
+ UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+ new DummyUpdatable(), null, 5);
+ updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1);
+ Assert.assertEquals("a", allUpdates.get(0).getState());
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c"));
+ while(!updateForwarder.areAllUpdatesCommited()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+ allUpdates = updateForwarder.getAllUpdatesFrom(0);
+ Assert.assertEquals(2, allUpdates.size());
+ Assert.assertEquals("b", allUpdates.get(0).getState());
+ Assert.assertEquals("c", allUpdates.get(1).getState());
}
@Test
@@ -159,7 +188,7 @@
imageRetreiver.setState("a,b,c");
UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
new DummyUpdatable(), imageRetreiver, 5);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
@@ -167,8 +196,8 @@
List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertEquals(2, allUpdates.size());
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -176,23 +205,23 @@
Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertEquals(4, allUpdates.size());
- Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+ Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
- Assert.assertEquals("d", allUpdates.get(1).getStuff());
+ Assert.assertEquals("d", allUpdates.get(1).getState());
Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
- Assert.assertEquals("e", allUpdates.get(2).getStuff());
+ Assert.assertEquals("e", allUpdates.get(2).getState());
Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
- Assert.assertEquals("f", allUpdates.get(3).getStuff());
+ Assert.assertEquals("f", allUpdates.get(3).getState());
Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
allUpdates = updateForwarder.getAllUpdatesFrom(8);
Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("g", allUpdates.get(0).getStuff());
+ Assert.assertEquals("g", allUpdates.get(0).getState());
}
@Test
@@ -201,13 +230,13 @@
imageRetreiver.setState("a,b,c");
UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
new DummyUpdatable(), imageRetreiver, 5);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -215,29 +244,29 @@
Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertEquals(4, allUpdates.size());
- Assert.assertEquals("f", allUpdates.get(3).getStuff());
+ Assert.assertEquals("f", allUpdates.get(3).getState());
Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
allUpdates = updateForwarder.getAllUpdatesFrom(8);
Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("g", allUpdates.get(0).getStuff());
+ Assert.assertEquals("g", allUpdates.get(0).getState());
imageRetreiver.setState("a,b,c,d,e,f,g,h");
// New update comes with SeqNum = 1
- updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setStuff("h"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
// NN plugin asks for next update
allUpdates = updateForwarder.getAllUpdatesFrom(9);
Assert.assertEquals(1, allUpdates.size());
- Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff());
+ Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
}
@@ -247,7 +276,7 @@
imageRetreiver.setState("a,b,c");
UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
new DummyUpdatable(), imageRetreiver, 5);
- updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
}
@@ -255,12 +284,12 @@
List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertEquals(2, allUpdates.size());
- updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setStuff("h"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setStuff("i"));
- updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setStuff("j"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i"));
+ updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -268,11 +297,11 @@
Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertEquals(3, allUpdates.size());
- Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff());
+ Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
- Assert.assertEquals("i", allUpdates.get(1).getStuff());
+ Assert.assertEquals("i", allUpdates.get(1).getState());
Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
- Assert.assertEquals("j", allUpdates.get(2).getStuff());
+ Assert.assertEquals("j", allUpdates.get(2).getState());
Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
}
}