[OMID-145] add commit cache to compation
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index 9fdf937..70c1d27 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -21,6 +21,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
+import org.apache.commons.collections4.map.LRUMap;
import org.apache.omid.HBaseShims;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
@@ -44,7 +45,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
@@ -63,6 +64,7 @@
private boolean hasMoreRows = false;
private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
+ private final LRUMap<Long ,Optional<CommitTimestamp>> commitCache;
public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner internalScanner,
@@ -76,6 +78,7 @@
this.lowWatermark = getLowWatermarkFromCommitTable();
// Obtain the table in which the scanner is going to operate
this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
+ commitCache = new LRUMap<>(1000);
LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
lowWatermark, hRegion.getRegionInfo());
}
@@ -226,9 +229,14 @@
}
private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
+ Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
+ if (cachedValue != null) {
+ return cachedValue;
+ }
try {
Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
if (ct.isPresent()) {
+ commitCache.put(cell.getTimestamp(), ct);
return Optional.of(ct.get());
} else {
Get g = new Get(CellUtil.cloneRow(cell));
@@ -240,8 +248,11 @@
g.setTimeStamp(cell.getTimestamp());
Result r = hRegion.get(g);
if (r.containsColumn(family, qualifier)) {
- return Optional.of(new CommitTimestamp(SHADOW_CELL,
+ Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
Bytes.toLong(r.getValue(family, qualifier)), true));
+ commitCache.put(cell.getTimestamp(), retval);
+ return retval;
+
}
}
} catch (InterruptedException e) {
@@ -250,7 +261,7 @@
} catch (ExecutionException e) {
throw new IOException("Error getting commit timestamp from commit table", e);
}
-
+ commitCache.put(cell.getTimestamp(), Optional.<CommitTimestamp>absent());
return Optional.absent();
}