[OMID-145] add commit cache to compation
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index 9fdf937..70c1d27 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -21,6 +21,7 @@
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
+import org.apache.commons.collections4.map.LRUMap;
 import org.apache.omid.HBaseShims;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.Client;
@@ -44,7 +45,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+
 import java.util.SortedMap;
 import java.util.concurrent.ExecutionException;
 
@@ -63,6 +64,7 @@
 
     private boolean hasMoreRows = false;
     private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
+    private final LRUMap<Long ,Optional<CommitTimestamp>> commitCache;
 
     public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
                             InternalScanner internalScanner,
@@ -76,6 +78,7 @@
         this.lowWatermark = getLowWatermarkFromCommitTable();
         // Obtain the table in which the scanner is going to operate
         this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
+        commitCache = new LRUMap<>(1000);
         LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
                 lowWatermark, hRegion.getRegionInfo());
     }
@@ -226,9 +229,14 @@
     }
 
     private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
+        Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
+        if (cachedValue != null) {
+            return cachedValue;
+        }
         try {
             Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
             if (ct.isPresent()) {
+                commitCache.put(cell.getTimestamp(), ct);
                 return Optional.of(ct.get());
             } else {
                 Get g = new Get(CellUtil.cloneRow(cell));
@@ -240,8 +248,11 @@
                 g.setTimeStamp(cell.getTimestamp());
                 Result r = hRegion.get(g);
                 if (r.containsColumn(family, qualifier)) {
-                    return Optional.of(new CommitTimestamp(SHADOW_CELL,
+                    Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
                             Bytes.toLong(r.getValue(family, qualifier)), true));
+                    commitCache.put(cell.getTimestamp(), retval);
+                    return retval;
+
                 }
             }
         } catch (InterruptedException e) {
@@ -250,7 +261,7 @@
         } catch (ExecutionException e) {
             throw new IOException("Error getting commit timestamp from commit table", e);
         }
-
+        commitCache.put(cell.getTimestamp(), Optional.<CommitTimestamp>absent());
         return Optional.absent();
     }