fixes #949 Used TimestampSkipping iterator in RollbackCheckIterator (#962)
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
index 5129b1d..962f172 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
@@ -34,7 +34,7 @@
public class RollbackCheckIterator implements SortedKeyValueIterator<Key, Value> {
private static final String TIMESTAMP_OPT = "timestampOpt";
- private SortedKeyValueIterator<Key, Value> source;
+ private TimestampSkippingIterator source;
private long lockTime;
boolean hasTop = false;
@@ -50,7 +50,7 @@
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
IteratorEnvironment env) throws IOException {
- this.source = source;
+ this.source = new TimestampSkippingIterator(source);
this.lockTime = Long.parseLong(options.get(TIMESTAMP_OPT));
}
@@ -95,7 +95,8 @@
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
if (colType == ColumnConstants.TX_DONE_PREFIX) {
- // do nothing if TX_DONE
+ source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
+ continue;
} else if (colType == ColumnConstants.WRITE_PREFIX) {
long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
@@ -107,6 +108,12 @@
hasTop = true;
return;
}
+
+ if (lockTime > timePtr) {
+ source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
+ continue;
+ }
+
} else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
if (ts > invalidationTime) {
invalidationTime = ts;
@@ -117,6 +124,11 @@
return;
}
+ if (lockTime > ts) {
+ source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+ continue;
+ }
+
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
// nothing supersedes this lock, therefore the column is locked