fixes #917 Added read locks (#953)
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
index 056fa8d..d87975a 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
@@ -22,8 +22,11 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.LockValue;
+import org.apache.fluo.accumulo.values.ReadLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -104,6 +107,15 @@
       if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) {
         type = "ACK";
       }
+      if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+        if (ReadLockUtil.isDelete(ts)) {
+          type = "DEL_RLOCK";
+        } else {
+          type = "RLOCK";
+        }
+        ts = ReadLockUtil.decodeTs(ts);
+      }
+
 
       StringBuilder sb = new StringBuilder();
 
@@ -115,6 +127,10 @@
       } else if (type.equals("LOCK")) {
         // TODO can Value be made to extend Bytes w/o breaking API?
         val = new LockValue(entry.getValue().get()).toString();
+      } else if (type.equals("RLOCK")) {
+        val = new ReadLockValue(entry.getValue().get()).toString();
+      } else if (type.equals("DEL_RLOCK")) {
+        val = new DelReadLockValue(entry.getValue().get()).toString();
       } else {
         encNonAscii(sb, entry.getValue().get());
         val = sb.toString();
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index 9ca23e9..1814dfa 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -35,8 +35,10 @@
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 
 /**
@@ -159,6 +161,7 @@
     boolean oldestSeen = false;
     boolean sawAck = false;
     long firstWrite = -1;
+    long lastReadLockDeleteTs = -1;
 
     truncationTime = -1;
 
@@ -243,6 +246,38 @@
         } else if (complete) {
           completeTxs.remove(txDoneTs);
         }
+      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+        boolean keep = false;
+        long rlts = ReadLockUtil.decodeTs(ts);
+        boolean isDelete = ReadLockUtil.isDelete(ts);
+
+        if (isDelete) {
+          lastReadLockDeleteTs = rlts;
+        }
+
+        if (rlts > invalidationTime) {
+          if (isFullMajc) {
+            if (isDelete) {
+              if (DelReadLockValue.isRollback(source.getTopValue().get())) {
+                // can drop rolled back read lock delete markers on any full majc, do not need to consider gcTimestamp
+                keep = false;
+              } else {
+                long rlockCommitTs =
+                    DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+                keep = rlockCommitTs >= gcTimestamp;
+              }
+            } else {
+              keep = lastReadLockDeleteTs != rlts;
+            }
+          } else {
+            // can drop deleted read lock entries.. keep the delete entry.
+            keep = isDelete || lastReadLockDeleteTs != rlts;
+          }
+        }
+
+        if (keep) {
+          keys.add(source.getTopKey(), source.getTopValue());
+        }
       } else if (colType == ColumnConstants.LOCK_PREFIX) {
         if (ts > invalidationTime) {
           keys.add(source.getTopKey(), source.getTopValue());
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
new file mode 100644
index 0000000..dd4de54
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
+
+import static org.apache.fluo.accumulo.util.ColumnConstants.ACK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.DATA_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.DEL_LOCK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.LOCK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.TIMESTAMP_MASK;
+import static org.apache.fluo.accumulo.util.ColumnConstants.TX_DONE_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.WRITE_PREFIX;
+
+public class OpenReadLockIterator implements SortedKeyValueIterator<Key, Value> {
+
+  private TimestampSkippingIterator source;
+
+  private Key lastDelete;
+
+  private void findTop() throws IOException {
+    while (source.hasTop()) {
+
+      long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+
+      if (colType == TX_DONE_PREFIX || colType == WRITE_PREFIX || colType == DEL_LOCK_PREFIX) {
+        source.skipToPrefix(source.getTopKey(), RLOCK_PREFIX);
+        continue;
+      } else if (colType == RLOCK_PREFIX) {
+        if (ReadLockUtil.isDelete(source.getTopKey())) {
+          lastDelete.set(source.getTopKey());
+        } else {
+          if (lastDelete.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+            long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK);
+            long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & TIMESTAMP_MASK);
+
+            if (ts1 != ts2) {
+              // found a read lock that is not suppressed by a delete read lock entry
+              return;
+            }
+          } else {
+            // found a read lock that is not suppressed by a delete read lock entry
+            return;
+          }
+        }
+        source.next();
+        continue;
+      } else if (colType == DATA_PREFIX || colType == LOCK_PREFIX || colType == ACK_PREFIX) {
+        source.skipColumn(source.getTopKey());
+        continue;
+      } else {
+        throw new IllegalArgumentException("Unknown column type " + source.getTopKey());
+      }
+    }
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
+      IteratorEnvironment env) throws IOException {
+    this.source = new TimestampSkippingIterator(source);
+  }
+
+  @Override
+  public boolean hasTop() {
+    return source.hasTop();
+  }
+
+  @Override
+  public void next() throws IOException {
+    source.next();
+    findTop();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+
+    lastDelete = new Key();
+
+    Collection<ByteSequence> fams;
+    if (columnFamilies.isEmpty() && !inclusive) {
+      fams = SnapshotIterator.NOTIFY_CF_SET;
+      inclusive = false;
+    } else {
+      fams = columnFamilies;
+    }
+
+    source.seek(range, fams, inclusive);
+    findTop();
+  }
+
+  @Override
+  public Key getTopKey() {
+    return source.getTopKey();
+  }
+
+  @Override
+  public Value getTopValue() {
+    return source.getTopValue();
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
index a5a160d..b6f9a48 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
@@ -28,6 +28,8 @@
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 
 /**
@@ -37,6 +39,7 @@
   private static final String TIMESTAMP_OPT = "timestampOpt";
   private static final String CHECK_ACK_OPT = "checkAckOpt";
   private static final String NTFY_TIMESTAMP_OPT = "ntfyTsOpt";
+  private static final String READ_LOCK_OPT = "readLock";
 
   private TimestampSkippingIterator source;
   private long snaptime;
@@ -44,6 +47,7 @@
   boolean hasTop = false;
   boolean checkAck = false;
   long ntfyTimestamp = -1;
+  boolean readlock;
 
   public static void setSnaptime(IteratorSetting cfg, long time) {
     if (time < 0 || (ColumnConstants.PREFIX_MASK & time) != 0) {
@@ -52,8 +56,12 @@
     cfg.addOption(TIMESTAMP_OPT, time + "");
   }
 
+  public static void setReadlock(IteratorSetting cfg) {
+    cfg.addOption(READ_LOCK_OPT, Boolean.TRUE.toString());
+  }
+
   public static void enableAckCheck(IteratorSetting cfg, long timestamp) {
-    cfg.addOption(CHECK_ACK_OPT, "true");
+    cfg.addOption(CHECK_ACK_OPT, Boolean.TRUE.toString());
     cfg.addOption(NTFY_TIMESTAMP_OPT, timestamp + "");
   }
 
@@ -66,6 +74,8 @@
       this.checkAck = Boolean.parseBoolean(options.get(CHECK_ACK_OPT));
       this.ntfyTimestamp = Long.parseLong(options.get(NTFY_TIMESTAMP_OPT));
     }
+
+    this.readlock = Boolean.parseBoolean(options.getOrDefault(READ_LOCK_OPT, "false"));
   }
 
   @Override
@@ -140,7 +150,58 @@
           }
         }
 
-        source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
+        if (readlock) {
+          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
+        } else {
+          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.RLOCK_PREFIX);
+        }
+      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+
+        long lastDeleteTs = -1;
+        long rlts = ReadLockUtil.decodeTs(ts);
+
+        if (!readlock) {
+          while (rlts > invalidationTime && colType == ColumnConstants.RLOCK_PREFIX) {
+            if (ReadLockUtil.isDelete(ts)) {
+              // ignore rolled back read locks, these should never prevent a write lock
+              if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
+                if (rlts >= snaptime) {
+                  hasTop = true;
+                  return;
+                } else {
+                  long rlockCommitTs =
+                      DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+                  if (rlockCommitTs > snaptime) {
+                    hasTop = true;
+                    return;
+                  }
+                }
+              }
+
+
+              lastDeleteTs = rlts;
+            } else {
+              if (rlts != lastDeleteTs) {
+                // this read lock is active
+                hasTop = true;
+                return;
+              }
+            }
+
+            source.next();
+            if (source.hasTop()) {
+              colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+              ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+              rlts = ReadLockUtil.decodeTs(ts);
+            } else {
+              break;
+            }
+          }
+        }
+
+        if (source.hasTop() && (colType == ColumnConstants.RLOCK_PREFIX)) {
+          source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
+        }
       } else if (colType == ColumnConstants.LOCK_PREFIX) {
         if (ts > invalidationTime) {
           // nothing supersedes this lock, therefore the column is locked
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
index 962f172..cae383f 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
@@ -129,6 +129,9 @@
           continue;
         }
 
+      } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+        source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+        continue;
       } else if (colType == ColumnConstants.LOCK_PREFIX) {
         if (ts > invalidationTime) {
           // nothing supersedes this lock, therefore the column is locked
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
index 3c6939c..27e63ef 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
@@ -22,6 +22,7 @@
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -38,6 +39,9 @@
 
   @VisibleForTesting
   static final String TIMESTAMP_OPT = "timestampOpt";
+
+  static final String RETURN_READLOCK_PRESENT_OPT = "rrlpOpt";
+
   private static final ByteSequence NOTIFY_CF_BS =
       new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
 
@@ -47,10 +51,35 @@
   private long snaptime;
   private boolean hasTop = false;
 
+  private boolean returnReadLockPresent = false;
+
   private final Key curCol = new Key();
 
+  private Key readLockIgnore;
+  private Key readLockKey;
+  private Value readLockValue;
+
+  private void rememberReadLock(Key key, Value val) {
+    Preconditions.checkState(readLockKey == null && readLockValue == null);
+    if (readLockIgnore == null
+        || !key.equals(readLockIgnore, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+      readLockKey = new Key(key);
+      readLockValue = new Value(val);
+    }
+  }
+
+  private void ignoreReadLock(Key key) {
+    readLockIgnore = key;
+  }
+
+  private void clearReadLock() {
+    readLockKey = null;
+    readLockValue = null;
+    readLockIgnore = null;
+  }
+
   private void findTop() throws IOException {
-    outer: while (source.hasTop()) {
+    outer: while (source.hasTop() && readLockKey == null) {
       long invalidationTime = -1;
       long dataPointer = -1;
 
@@ -89,6 +118,18 @@
           if (ts > invalidationTime) {
             invalidationTime = ts;
           }
+          if (returnReadLockPresent) {
+            source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX);
+          } else {
+            source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+          }
+          continue;
+
+        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+          if (returnReadLockPresent) {
+            rememberReadLock(source.getTopKey(), source.getTopValue());
+          }
+
           source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
           continue;
         } else if (colType == ColumnConstants.LOCK_PREFIX) {
@@ -142,21 +183,26 @@
       IteratorEnvironment env) throws IOException {
     this.source = new TimestampSkippingIterator(source);
     this.snaptime = Long.parseLong(options.get(TIMESTAMP_OPT));
+    this.returnReadLockPresent =
+        Boolean.parseBoolean(options.getOrDefault(RETURN_READLOCK_PRESENT_OPT, "false"));
     // TODO could require client to send version as a sanity check
   }
 
   @Override
   public boolean hasTop() {
-    return hasTop && source.hasTop();
+    return hasTop && (readLockKey != null || source.hasTop());
   }
 
   @Override
   public void next() throws IOException {
-    curCol.set(source.getTopKey());
-    source.skipColumn(curCol);
+    if (readLockKey != null) {
+      clearReadLock();
+    } else {
+      curCol.set(source.getTopKey());
+      source.skipColumn(curCol);
 
-    findTop();
-
+      findTop();
+    }
   }
 
   @Override
@@ -167,16 +213,28 @@
     Collection<ByteSequence> cols;
     boolean inc;
 
+    clearReadLock();
+
     // handle continue case
     hasTop = true;
     if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE
         && !range.isStartKeyInclusive()) {
-      Key nextCol = range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
-      if (range.afterEndKey(nextCol)) {
-        hasTop = false;
-        return;
+
+      if ((range.getStartKey().getTimestamp()
+          & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+        Key currCol = new Key(range.getStartKey());
+        currCol.setTimestamp(Long.MAX_VALUE);
+        newRange = new Range(currCol, true, range.getEndKey(), range.isEndKeyInclusive());
+        ignoreReadLock(currCol);
       } else {
-        newRange = new Range(nextCol, true, range.getEndKey(), range.isEndKeyInclusive());
+
+        Key nextCol = range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
+        if (range.afterEndKey(nextCol)) {
+          hasTop = false;
+          return;
+        } else {
+          newRange = new Range(nextCol, true, range.getEndKey(), range.isEndKeyInclusive());
+        }
       }
     } else {
       newRange = range;
@@ -196,12 +254,20 @@
 
   @Override
   public Key getTopKey() {
-    return source.getTopKey();
+    if (readLockKey != null) {
+      return readLockKey;
+    } else {
+      return source.getTopKey();
+    }
   }
 
   @Override
   public Value getTopValue() {
-    return source.getTopValue();
+    if (readLockValue != null) {
+      return readLockValue;
+    } else {
+      return source.getTopValue();
+    }
   }
 
   @Override
@@ -215,4 +281,8 @@
     }
     cfg.addOption(TIMESTAMP_OPT, time + "");
   }
+
+  public static void setReturnReadLockPresent(IteratorSetting cfg, boolean rrlp) {
+    cfg.addOption(RETURN_READLOCK_PRESENT_OPT, rrlp + "");
+  }
 }
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index 4da1a48..c38b9e6 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -26,6 +26,7 @@
   public static final long TX_DONE_PREFIX = 0x6000000000000000L;
   public static final long WRITE_PREFIX = 0x4000000000000000L;
   public static final long DEL_LOCK_PREFIX = 0x2000000000000000L;
+  public static final long RLOCK_PREFIX = 0x0000000000000000L;
   public static final long LOCK_PREFIX = 0xe000000000000000L;
   public static final long ACK_PREFIX = 0xc000000000000000L;
   public static final long DATA_PREFIX = 0xa000000000000000L;
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ReadLockUtil.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ReadLockUtil.java
new file mode 100644
index 0000000..654ebc1
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ReadLockUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.data.Key;
+
+public class ReadLockUtil {
+  private static final long DEL_MASK = 0x0000000000000001L;
+
+  public static boolean isDelete(Key k) {
+    return isDelete(k.getTimestamp());
+  }
+
+  public static boolean isDelete(long ts) {
+    return (ts & DEL_MASK) == DEL_MASK;
+  }
+
+  public static long encodeTs(long ts, boolean isDelete) {
+    Preconditions.checkArgument((ts & ColumnConstants.PREFIX_MASK) == 0);
+    return ts << 1 | (isDelete ? 1 : 0);
+  }
+
+  public static long decodeTs(Key k) {
+    return decodeTs(k.getTimestamp());
+  }
+
+  public static long decodeTs(long ts) {
+    return ts >> 1;
+  }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/DelReadLockValue.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/DelReadLockValue.java
new file mode 100644
index 0000000..58bf760
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/DelReadLockValue.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.values;
+
+import org.apache.fluo.accumulo.util.ByteArrayUtil;
+
+public class DelReadLockValue {
+
+  long commitTs = -1;
+  boolean rollback;
+
+  public DelReadLockValue(byte[] value) {
+    this.rollback = isRollback(value);
+    if (!this.rollback) {
+      this.commitTs = getCommitTimestamp(value);
+    }
+  }
+
+
+  public static byte[] encodeRollback() {
+    byte[] ba = new byte[1];
+    ba[0] = (byte) 1;
+    return ba;
+  }
+
+  public static byte[] encodeCommit(long commitTs) {
+    byte[] ba = new byte[9];
+    ba[0] = (byte) 0;
+    ByteArrayUtil.encode(ba, 1, commitTs);
+    return ba;
+  }
+
+  public static boolean isRollback(byte[] data) {
+    return (data[0] & 0x01) == 1;
+  }
+
+  public static long getCommitTimestamp(byte[] data) {
+    return ByteArrayUtil.decodeLong(data, 1);
+  }
+
+  @Override
+  public String toString() {
+    return commitTs + (rollback ? " ABORT" : " COMMIT");
+  }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/ReadLockValue.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/ReadLockValue.java
new file mode 100644
index 0000000..73451ae
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/ReadLockValue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.values;
+
+import java.util.List;
+
+import org.apache.fluo.accumulo.util.ByteArrayUtil;
+import org.apache.fluo.accumulo.util.LongUtil;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+import static org.apache.fluo.accumulo.format.FluoFormatter.encNonAscii;
+
+public class ReadLockValue {
+  private final Bytes prow;
+  private final Column pcol;
+  private final Long transactor;
+
+  public ReadLockValue(byte[] enc) {
+    List<Bytes> fields = ByteArrayUtil.split(enc);
+
+    if (fields.size() != 5) {
+      throw new IllegalArgumentException("more fields than expected");
+    }
+
+    this.prow = fields.get(0);
+    this.pcol = new Column(fields.get(1), fields.get(2), fields.get(3));
+    this.transactor = ByteArrayUtil.decodeLong(fields.get(4).toArray());
+  }
+
+  public Bytes getPrimaryRow() {
+    return prow;
+  }
+
+  public Column getPrimaryColumn() {
+    return pcol;
+  }
+
+  public Long getTransactor() {
+    return transactor;
+  }
+
+  public static byte[] encode(Bytes prow, Column pcol, Long transactor) {
+    return ByteArrayUtil.concat(prow, pcol.getFamily(), pcol.getQualifier(), pcol.getVisibility(),
+        Bytes.of(ByteArrayUtil.encode(transactor)));
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    encNonAscii(sb, prow);
+    sb.append(" ");
+    encNonAscii(sb, pcol.getFamily());
+    sb.append(" ");
+    encNonAscii(sb, pcol.getQualifier());
+    sb.append(" ");
+    encNonAscii(sb, pcol.getVisibility());
+    sb.append(" ");
+    sb.append(LongUtil.toMaxRadixString(transactor));
+
+    return sb.toString();
+  }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/CountingIterator.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/CountingIterator.java
new file mode 100644
index 0000000..a2517be
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/CountingIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+
+public class CountingIterator extends SortedMapIterator {
+
+  public static class Counter {
+    public int nextCalls;
+    public int seeks;
+
+    public void reset() {
+      nextCalls = 0;
+      seeks = 0;
+    }
+  }
+
+  private Counter counter;
+
+  CountingIterator(Counter counter, SortedMap<Key, Value> data) {
+    super(data);
+    this.counter = counter;
+  }
+
+  @Override
+  public void next() throws IOException {
+    super.next();
+    if (counter != null) {
+      counter.nextCalls++;
+    }
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+    super.seek(range, columnFamilies, inclusive);
+    if (counter != null) {
+      counter.seeks++;
+    }
+  }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
index a1600d8..591fdce 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
@@ -597,4 +597,104 @@
 
     Assert.assertEquals(expected, output);
   }
+
+  @Test
+  public void testDeletedReadLocks() {
+
+    TestData input = new TestData();
+
+    input.add("0 f q RLOCK 42", "0 f q");
+    input.add("0 f q DEL_RLOCK 42", "50");
+    input.add("0 f q RLOCK 45", "0 f q");
+    input.add("0 f q DEL_RLOCK 45", "53");
+    input.add("0 f q RLOCK 49", "0 f q");
+    input.add("0 f q DEL_RLOCK 49", "ROLLBACK");
+
+    for (long oldestActiveTs : new long[] {20, 40, 42, 45, 46, 49, 50, 51, 52, 53, 54, 70}) {
+      TestData expected = new TestData();
+      if (oldestActiveTs <= 53) {
+        expected.add("0 f q DEL_RLOCK 45", "53");
+      }
+      if (oldestActiveTs <= 50) {
+        expected.add("0 f q DEL_RLOCK 42", "50");
+      }
+
+      TestData output = new TestData(newGCI(input, oldestActiveTs, true));
+      Assert.assertEquals(expected, output);
+
+
+      expected.add("0 f q DEL_RLOCK 45", "53");
+      expected.add("0 f q DEL_RLOCK 42", "50");
+      expected.add("0 f q DEL_RLOCK 49", "ROLLBACK");
+
+      output = new TestData(newGCI(input, oldestActiveTs, false));
+      Assert.assertEquals(expected, output);
+    }
+
+    input.add("0 f q RLOCK 47", "0 f q");
+    input.add("0 f q RLOCK 40", "0 f q");
+
+    for (long oldestActiveTs : new long[] {20, 40, 42, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
+        70}) {
+      TestData expected = new TestData();
+      expected.add("0 f q RLOCK 47", "0 f q");
+      expected.add("0 f q RLOCK 40", "0 f q");
+
+      if (oldestActiveTs <= 53) {
+        expected.add("0 f q DEL_RLOCK 45", "53");
+      }
+      if (oldestActiveTs <= 50) {
+        expected.add("0 f q DEL_RLOCK 42", "50");
+      }
+
+      TestData output = new TestData(newGCI(input, oldestActiveTs, true));
+      Assert.assertEquals(expected, output);
+
+      expected.add("0 f q DEL_RLOCK 45", "53");
+      expected.add("0 f q DEL_RLOCK 42", "50");
+      expected.add("0 f q DEL_RLOCK 49", "ROLLBACK");
+      output = new TestData(newGCI(input, oldestActiveTs, false));
+      Assert.assertEquals(expected, output);
+    }
+  }
+
+  @Test
+  public void testInvalidatedReadLocks() {
+    // a write or delete lock invalidates all read locks, so they can be dropped on partial or full
+    // compactions
+    TestData input = new TestData();
+
+    input.add("0 f q RLOCK 42", "0 f q");
+    input.add("0 f q DEL_RLOCK 42", "50");
+    input.add("0 f q RLOCK 45", "0 f q");
+    input.add("0 f q DEL_RLOCK 45", "53");
+    input.add("0 f q RLOCK 60", "0 f q");
+    input.add("0 f q DEL_RLOCK 60", "70");
+    input.add("1 f q RLOCK 42", "0 f q");
+    input.add("1 f q DEL_RLOCK 42", "50");
+
+    TestData input2 = new TestData(input);
+    input2.add("0 f q WRITE 56", "55");
+    input2.add("0 f q DATA 55", "19");
+
+    TestData expected = new TestData();
+    expected.add("0 f q WRITE 56", "55");
+    expected.add("0 f q DATA 55", "19");
+
+    // invalidation time or oldest active should get rid of all read locks
+    TestData output = new TestData(newGCI(input2, 75, true));
+    Assert.assertEquals(expected, output);
+
+    // only invalidation should get rid of read locks in following three cases
+    expected.add("0 f q DEL_RLOCK 60", "70");
+    expected.add("1 f q DEL_RLOCK 42", "50"); // should not be invalidated, in diff column
+    output = new TestData(newGCI(input2, 75, false));
+    Assert.assertEquals(expected, output);
+
+    output = new TestData(newGCI(input2, 30, true));
+    Assert.assertEquals(expected, output);
+
+    output = new TestData(newGCI(input2, 30, false));
+    Assert.assertEquals(expected, output);
+  }
 }
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/OpenReadLockIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/OpenReadLockIteratorTest.java
new file mode 100644
index 0000000..08b4ee0
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/OpenReadLockIteratorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OpenReadLockIteratorTest {
+  OpenReadLockIterator newORLI(TestData input) {
+    OpenReadLockIterator si = new OpenReadLockIterator();
+
+    IteratorEnvironment env = TestIteratorEnv.create(IteratorScope.scan, true);
+
+    try {
+      SortedKeyValueIterator<Key, Value> source = new SortedMapIterator(input.data);
+      si.init(source, Collections.emptyMap(), env);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return si;
+  }
+
+  @Test
+  public void testBasic() {
+    TestData input = new TestData();
+
+    input.add("0 f q LOCK 11", "1 f q");
+    input.add("0 f q WRITE 16", "11");
+    input.add("0 f q DATA 11", "15");
+    input.add("0 f q DEL_RLOCK 25", "36");
+    input.add("0 f q RLOCK 25", " 0 f q");
+    input.add("0 f q RLOCK 23", " 0 f q");
+    input.add("0 f q DEL_RLOCK 21", "30");
+    input.add("0 f q RLOCK 21", " 0 f q");
+
+    input.add("1 f q1 TX_DONE 16", "11");
+    input.add("1 f q LOCK 11", "1 f q");
+    input.add("1 f q WRITE 16", "11");
+    input.add("1 f q DATA 11", "15");
+    input.add("1 f q RLOCK 25", " 0 f q");
+    input.add("1 f q RLOCK 23", " 0 f q");
+    input.add("1 f q DEL_RLOCK 21", "30");
+    input.add("1 f q RLOCK 21", " 0 f q");
+    input.add("1 f q ACK 11", "");
+
+    input.add("3 f q LOCK 11", "1 f q");
+    input.add("3 f q WRITE 16", "11");
+    input.add("3 f q DATA 11", "15");
+    input.add("3 f q DEL_RLOCK 25", "36");
+    input.add("3 f q RLOCK 25", " 0 f q");
+    input.add("3 f q DEL_RLOCK 23", "33");
+    input.add("3 f q RLOCK 23", " 0 f q");
+    input.add("3 f q DEL_RLOCK 21", "30");
+    input.add("3 f q RLOCK 21", " 0 f q");
+
+    TestData expected = new TestData();
+    expected.add("0 f q RLOCK 23", " 0 f q");
+    expected.add("1 f q RLOCK 25", " 0 f q");
+    expected.add("1 f q RLOCK 23", " 0 f q");
+
+    TestData output = new TestData(newORLI(input));
+    Assert.assertEquals(expected, output);
+
+    output = new TestData(newORLI(input), new Range(), true);
+    Assert.assertEquals(expected, output);
+  }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java
index 959f224..ad601f5 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java
@@ -21,12 +21,29 @@
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedMapIterator;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class PrewriteIteratorTest {
 
+  PrewriteIterator newPI(TestData input, long snapTime, boolean readlock) {
+    PrewriteIterator ni = new PrewriteIterator();
+
+    IteratorEnvironment env = TestIteratorEnv.create(IteratorScope.scan, false);
+
+    try {
+      IteratorSetting cfg = new IteratorSetting(10, PrewriteIterator.class);
+      PrewriteIterator.setSnaptime(cfg, snapTime);
+      if (readlock) {
+        PrewriteIterator.setReadlock(cfg);
+      }
+      ni.init(input.getIterator(), cfg.getOptions(), env);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return ni;
+  }
+
   PrewriteIterator newPI(TestData input, long snapTime) {
     PrewriteIterator ni = new PrewriteIterator();
 
@@ -35,7 +52,7 @@
     try {
       IteratorSetting cfg = new IteratorSetting(10, PrewriteIterator.class);
       PrewriteIterator.setSnaptime(cfg, snapTime);
-      ni.init(new SortedMapIterator(input.data), cfg.getOptions(), env);
+      ni.init(input.getIterator(), cfg.getOptions(), env);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -51,7 +68,7 @@
       IteratorSetting cfg = new IteratorSetting(10, PrewriteIterator.class);
       PrewriteIterator.setSnaptime(cfg, snapTime);
       PrewriteIterator.enableAckCheck(cfg, ntfyTime);
-      ni.init(new SortedMapIterator(input.data), cfg.getOptions(), env);
+      ni.init(input.getIterator(), cfg.getOptions(), env);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -238,4 +255,219 @@
     output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
     Assert.assertEquals(0, output.data.size());
   }
+
+  @Test
+  public void testReadLockPreventsWriteLock() {
+
+    TestData[] initialInputs = new TestData[4];
+
+    initialInputs[0] = new TestData();
+
+    initialInputs[1] = new TestData();
+    initialInputs[1].add("0 f q WRITE 10", "5");
+    initialInputs[1].add("0 f q LOCK 5", "0 f q");
+    initialInputs[1].add("0 f q DATA 5", "15");
+
+    initialInputs[2] = new TestData();
+    initialInputs[2].add("0 f q WRITE 10", "5");
+    initialInputs[2].add("0 f q DATA 5", "15");
+
+    initialInputs[3] = new TestData();
+    initialInputs[3].add("0 f q DEL_LOCK 5", "ABORT");
+
+    for (TestData input : initialInputs) {
+
+      input.add("0 f q RLOCK 42", "0 f q");
+
+      TestData expected = new TestData();
+      expected.add("0 f q RLOCK 42", "0 f q");
+
+      for (int ts : new int[] {40, 45}) {
+        TestData output = new TestData(newPI(input, ts), Range.exact("0", "f", "q"));
+        Assert.assertEquals(expected, output);
+      }
+
+      input.add("0 f q DEL_RLOCK 42", "50");
+
+      expected = new TestData();
+      expected.add("0 f q DEL_RLOCK 42", "50");
+
+      for (int ts : new int[] {40, 45}) {
+        TestData output = new TestData(newPI(input, ts), Range.exact("0", "f", "q"));
+        Assert.assertEquals(expected, output);
+      }
+
+      TestData output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
+      Assert.assertEquals(0, output.data.size());
+
+      input.add("0 f q RLOCK 30", "0 f q");
+
+      expected = new TestData();
+      expected.add("0 f q RLOCK 30", "0 f q");
+
+      output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
+      Assert.assertEquals(expected, output);
+
+      input.add("0 f q DEL_RLOCK 30", "60");
+
+      output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
+      Assert.assertEquals(0, output.data.size());
+
+      output = new TestData(newPI(input, 55), Range.exact("0", "f", "q"));
+      expected = new TestData();
+      expected.add("0 f q DEL_RLOCK 30", "60");
+      Assert.assertEquals(expected, output);
+
+      expected = new TestData();
+      expected.add("0 f q DEL_RLOCK 42", "50");
+
+      for (int ts : new int[] {20, 40, 45}) {
+        output = new TestData(newPI(input, ts), Range.exact("0", "f", "q"));
+        Assert.assertEquals(expected, output);
+      }
+
+    }
+  }
+
+  @Test
+  public void testOnlyDelReadLocks() {
+    //Garbage collection iter may drop read locks and leave del_read_lock entries.  Ensure this case works as expected.
+    TestData input = new TestData();
+    input.add("0 f q DEL_RLOCK 42", "50");
+    input.add("0 f q DEL_RLOCK 39", "44");
+
+    input.add("0 f q WRITE 10", "5");
+    input.add("0 f q LOCK 5", "0 f q");
+    input.add("0 f q DATA 5", "15");
+
+    for (long startTs : new long[] {31, 40, 47}) {
+      TestData expected = new TestData();
+      expected.add("0 f q DEL_RLOCK 42", "50");
+
+      TestData output = new TestData(newPI(input, startTs), Range.exact("0", "f", "q"));
+
+      Assert.assertEquals(expected, output);
+    }
+
+    TestData output = new TestData(newPI(input, 55), Range.exact("0", "f", "q"));
+    Assert.assertEquals(0, output.data.size());
+  }
+
+  @Test
+  public void testWriteLockPreventsReadLock() {
+    for (int i = 0; i < 2; i++) {
+      TestData input = new TestData();
+
+      if (i == 1) {
+        // this should have no impact
+        input.add("0 f q RLOCK 13", "0 f q");
+      }
+
+      input.add("0 f q LOCK 5", "0 f q");
+      input.add("0 f q DATA 5", "15");
+
+      TestData output = new TestData(newPI(input, 117, true), Range.exact("0", "f", "q"));
+      TestData expected = new TestData();
+      expected.add("0 f q LOCK 5", "0 f q");
+      Assert.assertEquals(expected, output);
+
+      input.add("0 f q WRITE 10", "5");
+      output = new TestData(newPI(input, 117, true), Range.exact("0", "f", "q"));
+      Assert.assertEquals(0, output.data.size());
+
+      output = new TestData(newPI(input, 11, true), Range.exact("0", "f", "q"));
+      Assert.assertEquals(0, output.data.size());
+
+      output = new TestData(newPI(input, 7, true), Range.exact("0", "f", "q"));
+      expected = new TestData();
+      expected.add("0 f q WRITE 10", "5");
+      Assert.assertEquals(expected, output);
+
+      output = new TestData(newPI(input, 4, true), Range.exact("0", "f", "q"));
+      Assert.assertEquals(expected, output);
+    }
+  }
+
+  @Test
+  public void testManyReadLocks() {
+    TestData input = new TestData();
+
+    input.add("0 f q WRITE 10", "5");
+    input.add("0 f q DATA 5", "15");
+
+    for (int i = 10; i < 2010; i += 2) {
+      input.add("0 f q DEL_RLOCK " + i, "" + (i + 1));
+      input.add("0 f q RLOCK " + i, "0 f q");
+    }
+
+    TestData output = new TestData(newPI(input, 3000, false), Range.exact("0", "f", "q"));
+    // scans all read locks looking for an active lock
+    Assert.assertEquals(2001, input.counter.nextCalls);
+    Assert.assertEquals(0, output.data.size());
+
+    // read locks do not need to scan for other read locks... this checks that read locks are
+    // skipped
+    input.counter.reset();
+    output = new TestData(newPI(input, 3000, true), Range.exact("0", "f", "q"));
+    // read locks should not scan everything looking for a read lock
+    Assert.assertEquals(12, input.counter.nextCalls); // skipping will read 11 before seeking
+    Assert.assertEquals(0, output.data.size());
+
+    // This write invalidates the 2000 read locks, so should skip read locks
+    input.add("0 f q WRITE 2500", "2490");
+    input.add("0 f q DATA 2490", "16");
+    input.counter.reset();
+    output = new TestData(newPI(input, 3000, false), Range.exact("0", "f", "q"));
+    Assert.assertEquals(13, input.counter.nextCalls);
+    Assert.assertEquals(0, output.data.size());
+  }
+
+  @Test
+  public void testGarbageCollectedReadLocks() {
+    // After a partial compaction the garbage collection iterator will drop read lock entries, but
+    // not del_read_locks entries. This test ensures that prewrite works w/ this.
+
+    TestData input = new TestData();
+
+    input.add("0 f q DEL_RLOCK 42", "50");
+    input.add("0 f q DEL_RLOCK 44", "51");
+
+
+    TestData output = new TestData(newPI(input, 47), Range.exact("0", "f", "q"));
+    TestData expected = new TestData();
+
+    expected.add("0 f q DEL_RLOCK 44", "51");
+
+    Assert.assertEquals(expected, output);
+
+    output = new TestData(newPI(input, 43), Range.exact("0", "f", "q"));
+    Assert.assertEquals(expected, output);
+  }
+
+  @Test
+  public void testAbortedReadLock() {
+    // A read lock that was aborted or rolledback should not prevent a write lock
+    TestData input = new TestData();
+
+    input.add("0 f q DEL_RLOCK 55", "ABORT");
+    input.add("0 f q DEL_RLOCK 42", "50");
+
+    for (int i = 0; i < 2; i++) {
+      for (long startTs : new long[] {31, 40, 47}) {
+        TestData expected = new TestData();
+        expected.add("0 f q DEL_RLOCK 42", "50");
+
+        TestData output = new TestData(newPI(input, startTs), Range.exact("0", "f", "q"));
+
+        Assert.assertEquals(expected, output);
+      }
+
+      for (long startTs : new long[] {51, 55, 56}) {
+        TestData output = new TestData(newPI(input, startTs), Range.exact("0", "f", "q"));
+        Assert.assertEquals(0, output.data.size());
+      }
+      //add this for 2nd iteration, should ignore because delete rolls back
+      input.add("0 f q RLOCK 55", "0 f q");
+    }
+  }
 }
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
index 4605923..4e0c27d 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
@@ -31,10 +31,11 @@
 import org.junit.Test;
 
 public class SnapshotIteratorTest {
-  SnapshotIterator newSI(TestData input, long startTs) {
+  SnapshotIterator newSI(TestData input, long startTs, boolean returnReadLocks) {
     SnapshotIterator si = new SnapshotIterator();
 
     Map<String, String> options = new HashMap<>();
+    options.put(SnapshotIterator.RETURN_READLOCK_PRESENT_OPT, returnReadLocks + "");
     options.put(SnapshotIterator.TIMESTAMP_OPT, startTs + "");
 
     IteratorEnvironment env = TestIteratorEnv.create(IteratorScope.scan, true);
@@ -48,6 +49,10 @@
     return si;
   }
 
+  SnapshotIterator newSI(TestData input, long startTs) {
+    return newSI(input, startTs, true);
+  }
+
   @Test
   public void testBasic() {
     TestData input = new TestData();
@@ -61,13 +66,11 @@
     TestData output = new TestData(newSI(input, 6));
     Assert.assertEquals(0, output.data.size());
 
-    output = new TestData(newSI(input, 11));
     TestData expected = new TestData().add("0 f q DATA 9", "14");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 11);
 
-    output = new TestData(newSI(input, 17));
     expected = new TestData().add("0 f q DATA 11", "15");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 17);
   }
 
   @Test
@@ -85,13 +88,11 @@
     output = new TestData(newSI(input, 15));
     Assert.assertEquals(0, output.data.size());
 
-    output = new TestData(newSI(input, 17));
     TestData expected = new TestData().add("0 f q DATA 11", "15");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 17);
 
-    output = new TestData(newSI(input, 22));
     expected = new TestData().add("0 f q LOCK 21", "1 f q");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 22);
   }
 
   @Test
@@ -115,13 +116,11 @@
     output = new TestData(newSI(input, 15));
     Assert.assertEquals(0, output.data.size());
 
-    output = new TestData(newSI(input, 17));
     TestData expected = new TestData().add("0 f q DATA 11", "15");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 17);
 
-    output = new TestData(newSI(input, 23));
     expected = new TestData().add("0 f q DATA 11", "15");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 23);
 
 
     // test case where there is newer lock thats not invalidated by DEL_LOCK
@@ -137,17 +136,14 @@
     output = new TestData(newSI(input, 6));
     Assert.assertEquals(0, output.data.size());
 
-    output = new TestData(newSI(input, 17));
     expected = new TestData().add("0 f q DATA 11", "15");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 17);
 
-    output = new TestData(newSI(input, 19));
     expected = new TestData().add("0 f q DATA 11", "15");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 19);
 
-    output = new TestData(newSI(input, 23));
     expected = new TestData().add("0 f q LOCK 21", "1 f q");
-    Assert.assertEquals(expected, output);
+    checkInput(input, expected, 23);
   }
 
   @Test
@@ -253,7 +249,54 @@
     for (Range range : ranges) {
       checkManyColumnData(input, numToWrite, range);
     }
+  }
 
+  @Test
+  public void testReadLock() {
+    TestData input = new TestData();
+
+    input.add("0 f q WRITE 16", "11 DELETE");
+    input.add("0 f q DATA 11", "15");
+    input.add("0 f q DEL_RLOCK 5", "6");
+    input.add("0 f q RLOCK 5", " 0 f q");
+
+    input.add("1 f q WRITE 16", "11");
+    input.add("1 f q DATA 11", "15");
+
+    input.add("2 f q WRITE 16", "11");
+    input.add("2 f q DATA 11", "17");
+    input.add("2 f q DEL_RLOCK 5", "6");
+    input.add("2 f q RLOCK 5", " 0 f q");
+
+
+    TestData expected = new TestData();
+    expected.add("0 f q DATA 11", "15");
+    expected.add("1 f q DATA 11", "15");
+    expected.add("2 f q DATA 11", "17");
+
+
+    checkInput(input, expected, 20, false);
+
+    expected.add("0 f q DEL_RLOCK 5", "6");
+    expected.add("2 f q DEL_RLOCK 5", "6");
+
+    checkInput(input, expected, 20);
+
+  }
+
+  private void checkInput(TestData input, TestData expected, long startTs) {
+    checkInput(input, expected, startTs, true);
+  }
+
+  private void checkInput(TestData input, TestData expected, long startTs,
+      boolean returnReadLocks) {
+    // run test with a single seek followed by many next calls
+    TestData output = new TestData(newSI(input, startTs, returnReadLocks), new Range());
+    Assert.assertEquals(expected, output);
+
+    // run test reseeking after each key
+    output = new TestData(newSI(input, startTs, returnReadLocks), new Range(), true);
+    Assert.assertEquals(expected, output);
   }
 
   private void checkManyColumnData(TestData input, int numToWrite, Range range) throws IOException {
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index 2ea42b3..262df15 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -28,8 +28,11 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.fluo.accumulo.format.FluoFormatter;
+import org.apache.fluo.accumulo.iterators.CountingIterator.Counter;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.LockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
@@ -37,6 +40,7 @@
 
 public class TestData {
   TreeMap<Key, Value> data = new TreeMap<>();
+  Counter counter = new Counter();
 
   TestData() {}
 
@@ -44,19 +48,29 @@
     data.putAll(td.data);
   }
 
-  TestData(SortedKeyValueIterator<Key, Value> iter, Range range) {
+  TestData(SortedKeyValueIterator<Key, Value> iter, Range range, boolean reseek) {
     try {
       iter.seek(range, new HashSet<ByteSequence>(), false);
 
       while (iter.hasTop()) {
         data.put(iter.getTopKey(), iter.getTopValue());
-        iter.next();
+        if (reseek) {
+          iter.seek(
+              new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive()),
+              new HashSet<ByteSequence>(), false);
+        } else {
+          iter.next();
+        }
       }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
+  TestData(SortedKeyValueIterator<Key, Value> iter, Range range) {
+    this(iter, range, false);
+  }
+
   TestData(SortedKeyValueIterator<Key, Value> iter) {
     this(iter, new Range());
   }
@@ -116,6 +130,21 @@
           val = DelLockValue.encodeCommit(commitTs, value.contains("PRIMARY"));
         }
         break;
+      case "RLOCK":
+        ts = ReadLockUtil.encodeTs(ts, false);
+        ts |= ColumnConstants.RLOCK_PREFIX;
+        break;
+      case "DEL_RLOCK":
+        ts = ReadLockUtil.encodeTs(ts, true);
+        ts |= ColumnConstants.RLOCK_PREFIX;
+
+        if (value.contains("ROLLBACK") || value.contains("ABORT")) {
+          val = DelReadLockValue.encodeRollback();
+        } else {
+          long commitTs = Long.parseLong(value.split("\\s+")[0]);
+          val = DelReadLockValue.encodeCommit(commitTs);
+        }
+        break;
       case "ntfy":
         break;
       default:
@@ -163,4 +192,8 @@
   public int hashCode() {
     return Objects.hashCode(data);
   }
+
+  public SortedKeyValueIterator<Key, Value> getIterator() {
+    return new CountingIterator(counter, data);
+  }
 }
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
index 69780b2..1ba7f9b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
@@ -49,6 +49,15 @@
    */
   private Map<String, Bytes> s2bCache = new WeakHashMap<String, Bytes>();
 
+  public AbstractSnapshotBase() {}
+
+  /**
+   * @since 1.2.0
+   */
+  protected AbstractSnapshotBase(AbstractSnapshotBase other) {
+    this.s2bCache = other.s2bCache;
+  }
+
   Bytes s2bConv(CharSequence cs) {
     Objects.requireNonNull(cs);
     if (cs instanceof String) {
@@ -64,6 +73,7 @@
     }
   }
 
+  @Override
   public Bytes get(Bytes row, Column column, Bytes defaultValue) {
     Bytes ret = get(row, column);
     if (ret == null) {
@@ -73,19 +83,23 @@
     return ret;
   }
 
+  @Override
   public Map<Column, Bytes> get(Bytes row, Column... columns) {
     return get(row, ImmutableSet.copyOf(columns));
   }
 
+  @Override
   public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Column... columns) {
     return get(rows, ImmutableSet.copyOf(columns));
   }
 
+  @Override
   public Map<RowColumn, String> gets(Collection<RowColumn> rowColumns) {
     Map<RowColumn, Bytes> bytesMap = get(rowColumns);
     return Maps.transformValues(bytesMap, b -> b.toString());
   }
 
+  @Override
   public Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
       Set<Column> columns) {
     Map<Bytes, Map<Column, Bytes>> rcvs = get(Collections2.transform(rows, this::s2bConv), columns);
@@ -97,11 +111,13 @@
     return ret;
   }
 
+  @Override
   public Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
       Column... columns) {
     return gets(rows, ImmutableSet.copyOf(columns));
   }
 
+  @Override
   public String gets(CharSequence row, Column column) {
     Bytes val = get(s2bConv(row), column);
     if (val == null) {
@@ -110,6 +126,7 @@
     return val.toString();
   }
 
+  @Override
   public String gets(CharSequence row, Column column, String defaultValue) {
     Bytes val = get(s2bConv(row), column);
     if (val == null) {
@@ -119,11 +136,13 @@
     return val.toString();
   }
 
+  @Override
   public Map<Column, String> gets(CharSequence row, Set<Column> columns) {
     Map<Column, Bytes> values = get(s2bConv(row), columns);
     return Maps.transformValues(values, b -> b.toString());
   }
 
+  @Override
   public Map<Column, String> gets(CharSequence row, Column... columns) {
     return gets(row, ImmutableSet.copyOf(columns));
   }
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
index 8e44590..731aaeb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
@@ -27,14 +27,17 @@
 public abstract class AbstractTransactionBase extends AbstractSnapshotBase
     implements TransactionBase {
 
+  @Override
   public void delete(CharSequence row, Column col) {
     delete(s2bConv(row), col);
   }
 
+  @Override
   public void set(CharSequence row, Column col, CharSequence value) throws AlreadySetException {
     set(s2bConv(row), col, Bytes.of(value));
   }
 
+  @Override
   public void setWeakNotification(CharSequence row, Column col) {
     setWeakNotification(s2bConv(row), col);
   }
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 51c29b9..6f4d1ff 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -114,6 +114,7 @@
    * </pre>
    *
    * @return A scanner builder.
+   * @see TransactionBase#withReadLock()
    */
 
   ScannerBuilder scanner();
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java
index 06303b6..bb53da5 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -60,4 +60,26 @@
    * encoded using UTF-8.
    */
   void setWeakNotification(CharSequence row, Column col);
+
+  /**
+   * Normally when a Fluo transaction reads data and does not write to it, it will not collide with
+   * other transactions making concurrent writes.  When this method is called, all reads will
+   * acquire a read lock. These read locks cause collisions with transactions doing concurrent
+   * writes. However, multiple transactions can get concurrent read locks on the same row+col
+   * without colliding.
+   *
+   * <p>
+   * Scanning with read locks is not supported. Attempting to call {@code withReadLock().scanner()}
+   * will throw an {@link UnsupportedOperationException}. This is because there are an infinite
+   * amount of keys within a range and read locks can not be obtained on them all.
+   *
+   * <p>
+   * A transaction that only acquires read locks will do nothing at commit time. In this case no
+   * read locks are actually written and no collisions will ever occur.
+   *
+   * @since 1.2.0
+   */
+  default SnapshotBase withReadLock() {
+    throw new UnsupportedOperationException("Read locks not supported by this implementation");
+  }
 }
diff --git a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
index 5998e89..20377a9 100644
--- a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
+++ b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
@@ -34,7 +34,7 @@
     JCommander jcommand = new JCommander(options);
     jcommand.parse(args.split(" "));
     ScanUtil.ScanOpts opts = options.getScanOpts();
-    return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts));
+    return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false);
   }
 
   @Test
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index f3068e9..4172386 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -24,24 +24,33 @@
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.accumulo.iterators.OpenReadLockIterator;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.LockValue;
+import org.apache.fluo.accumulo.values.ReadLockValue;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.core.util.ColumnUtil;
 import org.apache.fluo.core.util.ConditionalFlutation;
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.SpanUtil;
 
+import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
 import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 
 /**
@@ -52,38 +61,61 @@
 
 public class LockResolver {
 
-  private static Map<PrimaryRowColumn, List<Entry<Key, Value>>> groupLocksByPrimary(
-      List<Entry<Key, Value>> locks) {
-    Map<PrimaryRowColumn, List<Entry<Key, Value>>> groupedLocks = new HashMap<>();
+  private static Map<PrimaryRowColumn, List<LockInfo>> groupLocksByPrimary(List<LockInfo> locks) {
+    Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = new HashMap<>();
     Map<PrimaryRowColumn, Long> transactorIds = new HashMap<>();
 
-    for (Entry<Key, Value> lock : locks) {
-      LockValue lockVal = new LockValue(lock.getValue().get());
-      PrimaryRowColumn prc =
-          new PrimaryRowColumn(lockVal.getPrimaryRow(), lockVal.getPrimaryColumn(),
-              lock.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK);
+    for (LockInfo lockInfo : locks) {
 
-      List<Entry<Key, Value>> lockList = groupedLocks.get(prc);
-      if (lockList == null) {
-        lockList = new ArrayList<>();
-        groupedLocks.put(prc, lockList);
-      }
+      PrimaryRowColumn prc = new PrimaryRowColumn(lockInfo.prow, lockInfo.pcol, lockInfo.lockTs);
+
+      List<LockInfo> lockList = groupedLocks.computeIfAbsent(prc, k -> new ArrayList<>());
 
       Long trid = transactorIds.get(prc);
       if (trid == null) {
-        transactorIds.put(prc, lockVal.getTransactor());
-      } else if (!trid.equals(lockVal.getTransactor())) {
+        transactorIds.put(prc, lockInfo.transactorId);
+      } else if (!trid.equals(lockInfo.transactorId)) {
         // sanity check.. its assumed that all locks w/ the same PrimaryRowColumn should have the
         // same transactor id as well
-        throw new IllegalStateException("transactor ids not equals " + prc + " " + lock.getKey()
-            + " " + trid + " " + lockVal.getTransactor());
+        throw new IllegalStateException("transactor ids not equals " + prc + " "
+            + lockInfo.entry.getKey() + " " + trid + " " + lockInfo.transactorId);
       }
 
-      lockList.add(lock);
+      lockList.add(lockInfo);
     }
 
     return groupedLocks;
+  }
 
+
+  private static class LockInfo {
+
+    final Bytes prow;
+    final Column pcol;
+    final Long transactorId;
+    final long lockTs;
+    final boolean isReadLock;
+    final Entry<Key, Value> entry;
+
+    public LockInfo(Entry<Key, Value> kve) {
+      long rawTs = kve.getKey().getTimestamp();
+      this.entry = kve;
+      if ((rawTs & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+        this.lockTs = ReadLockUtil.decodeTs(rawTs);
+        ReadLockValue rlv = new ReadLockValue(kve.getValue().get());
+        this.prow = rlv.getPrimaryRow();
+        this.pcol = rlv.getPrimaryColumn();
+        this.transactorId = rlv.getTransactor();
+        this.isReadLock = true;
+      } else {
+        this.lockTs = kve.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+        LockValue lv = new LockValue(kve.getValue().get());
+        this.prow = lv.getPrimaryRow();
+        this.pcol = lv.getPrimaryColumn();
+        this.transactorId = lv.getTransactor();
+        this.isReadLock = false;
+      }
+    }
   }
 
   /**
@@ -99,7 +131,7 @@
    * @return true if all locks passed in were resolved (rolled forward or back)
    */
   static boolean resolveLocks(Environment env, long startTs, TxStats stats,
-      List<Entry<Key, Value>> locks, long startTime) {
+      List<Entry<Key, Value>> locksKVs, long startTime) {
     // check if transactor is still alive
 
     int numResolved = 0;
@@ -110,7 +142,10 @@
 
     TransactorCache transactorCache = env.getSharedResources().getTransactorCache();
 
-    List<Entry<Key, Value>> locksToRecover;
+    List<LockInfo> locks = new ArrayList<>();
+    locksKVs.forEach(e -> locks.add(new LockInfo(e)));
+
+    List<LockInfo> locksToRecover;
     if (System.currentTimeMillis() - startTime > env.getConfiguration()
         .getTransactionRollbackTime()) {
       locksToRecover = locks;
@@ -118,38 +153,32 @@
       timedOut = true;
     } else {
       locksToRecover = new ArrayList<>(locks.size());
-      for (Entry<Key, Value> entry : locks) {
-
-        Long transactorId = new LockValue(entry.getValue().get()).getTransactor();
-        long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
-
-        if (transactorCache.checkTimedout(transactorId, lockTs)) {
-          locksToRecover.add(entry);
+      for (LockInfo lockInfo : locks) {
+        if (transactorCache.checkTimedout(lockInfo.transactorId, lockInfo.lockTs)) {
+          locksToRecover.add(lockInfo);
           stats.incrementTimedOutLocks();
-        } else if (!transactorCache.checkExists(transactorId)) {
-          locksToRecover.add(entry);
+        } else if (!transactorCache.checkExists(lockInfo.transactorId)) {
+          locksToRecover.add(lockInfo);
           stats.incrementDeadLocks();
         }
       }
     }
 
-    Map<PrimaryRowColumn, List<Entry<Key, Value>>> groupedLocks =
-        groupLocksByPrimary(locksToRecover);
+    Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = groupLocksByPrimary(locksToRecover);
 
     if (timedOut) {
-      Set<Entry<PrimaryRowColumn, List<Entry<Key, Value>>>> es = groupedLocks.entrySet();
+      Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
 
-      for (Entry<PrimaryRowColumn, List<Entry<Key, Value>>> entry : es) {
+      for (Entry<PrimaryRowColumn, List<LockInfo>> entry : es) {
         long lockTs = entry.getKey().startTs;
-        Long transactorId = new LockValue(entry.getValue().get(0).getValue().get()).getTransactor();
+        Long transactorId = entry.getValue().get(0).transactorId;
         transactorCache.addTimedoutTransactor(transactorId, lockTs, startTime);
       }
     }
 
     TxInfoCache txiCache = env.getSharedResources().getTxInfoCache();
-
-    Set<Entry<PrimaryRowColumn, List<Entry<Key, Value>>>> es = groupedLocks.entrySet();
-    for (Entry<PrimaryRowColumn, List<Entry<Key, Value>>> group : es) {
+    Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
+    for (Entry<PrimaryRowColumn, List<LockInfo>> group : es) {
       TxInfo txInfo = txiCache.getTransactionInfo(group.getKey());
       switch (txInfo.status) {
         case COMMITTED:
@@ -182,18 +211,24 @@
   }
 
   private static void rollback(Environment env, long startTs, PrimaryRowColumn prc,
-      List<Entry<Key, Value>> value, Map<ByteSequence, Mutation> mutations) {
-    for (Entry<Key, Value> entry : value) {
-      if (isPrimary(prc, entry.getKey())) {
+      List<LockInfo> value, Map<ByteSequence, Mutation> mutations) {
+    for (LockInfo lockInfo : value) {
+      if (isPrimary(prc, lockInfo.entry.getKey())) {
         continue;
       }
 
-      long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
-      Mutation mut = getMutation(entry.getKey().getRowData(), mutations);
-      Key k = entry.getKey();
-      mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
-          k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockTs,
-          DelLockValue.encodeRollback(false, true));
+      Mutation mut = getMutation(lockInfo.entry.getKey().getRowData(), mutations);
+      Key k = lockInfo.entry.getKey();
+      if (lockInfo.isReadLock) {
+        mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
+            k.getColumnVisibilityParsed(),
+            ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(lockInfo.lockTs, true),
+            DelReadLockValue.encodeRollback());
+      } else {
+        mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
+            k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockInfo.lockTs,
+            DelLockValue.encodeRollback(false, true));
+      }
     }
 
   }
@@ -224,26 +259,31 @@
     }
   }
 
-  private static void commitColumns(Environment env, PrimaryRowColumn prc,
-      List<Entry<Key, Value>> value, long commitTs, Map<ByteSequence, Mutation> mutations) {
-    for (Entry<Key, Value> entry : value) {
-      if (isPrimary(prc, entry.getKey())) {
+  private static void commitColumns(Environment env, PrimaryRowColumn prc, List<LockInfo> value,
+      long commitTs, Map<ByteSequence, Mutation> mutations) {
+    for (LockInfo lockInfo : value) {
+      if (isPrimary(prc, lockInfo.entry.getKey())) {
         continue;
       }
 
-      long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+      long lockTs = lockInfo.lockTs;
       // TODO may be that a stronger sanity check that could be done here
       if (commitTs < lockTs) {
         throw new IllegalStateException(
-            "bad commitTs : " + entry.getKey() + " (" + commitTs + "<" + lockTs + ")");
+            "bad commitTs : " + lockInfo.entry.getKey() + " (" + commitTs + "<" + lockTs + ")");
       }
 
-      Mutation mut = getMutation(entry.getKey().getRowData(), mutations);
-      Column col = SpanUtil.toRowColumn(entry.getKey()).getColumn();
+      Mutation mut = getMutation(lockInfo.entry.getKey().getRowData(), mutations);
+      Column col = ColumnUtil.convert(lockInfo.entry.getKey());
 
-      LockValue lv = new LockValue(entry.getValue().get());
-      ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), lockTs,
-          commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
+      if (lockInfo.isReadLock) {
+        ColumnUtil.commitColumn(env, false, false, col, false, false, true, lockTs, commitTs,
+            env.getConfiguredObservers().getObservedColumns(STRONG), mut);
+      } else {
+        LockValue lv = new LockValue(lockInfo.entry.getValue().get());
+        ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), false,
+            lockTs, commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
+      }
     }
 
   }
@@ -263,4 +303,43 @@
     return prc.prow.equals(ByteUtil.toBytes(k.getRowData()))
         && prc.pcol.equals(SpanUtil.toRowColumn(k).getColumn());
   }
+
+  static List<Entry<Key, Value>> getOpenReadLocks(Environment env,
+      Map<Bytes, Set<Column>> rowColsToCheck) throws Exception {
+
+    List<Range> ranges = new ArrayList<>();
+
+    for (Entry<Bytes, Set<Column>> e1 : rowColsToCheck.entrySet()) {
+      for (Column col : e1.getValue()) {
+        Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col));
+        Key end = new Key(start);
+        end.setTimestamp(ColumnConstants.LOCK_PREFIX | ColumnConstants.TIMESTAMP_MASK);
+        ranges.add(new Range(start, true, end, false));
+      }
+    }
+
+    BatchScanner bscanner = null;
+    try {
+      bscanner = env.getConnector().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
+
+      bscanner.setRanges(ranges);
+      IteratorSetting iterCfg = new IteratorSetting(10, OpenReadLockIterator.class);
+
+      bscanner.addScanIterator(iterCfg);
+
+      List<Entry<Key, Value>> ret = new ArrayList<>();
+      for (Entry<Key, Value> entry : bscanner) {
+        if ((entry.getKey().getTimestamp() & PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+          ret.add(entry);
+        }
+      }
+
+      return ret;
+
+    } finally {
+      if (bscanner != null) {
+        bscanner.close();
+      }
+    }
+  }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 37e0df5..48250c0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -19,6 +19,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -51,9 +52,10 @@
   private List<Range> rangesToScan = new ArrayList<>();
   private Function<ByteSequence, Bytes> rowConverter;
   private Function<Key, Column> columnConverter;
+  private Map<Bytes, Set<Column>> readLocksSeen;
 
   ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env,
-      long startTs, TxStats stats) {
+      long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen) {
     this.rows = rows;
     this.columns = columns;
     this.env = env;
@@ -61,10 +63,11 @@
     this.stats = stats;
     this.rowConverter = new CachedBytesConverter(rows);
     this.columnConverter = new CachedColumnConverter(columns);
+    this.readLocksSeen = readLocksSeen;
   }
 
-  ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs,
-      TxStats stats) {
+  ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats,
+      Map<Bytes, Set<Column>> readLocksSeen) {
     for (RowColumn rc : cells) {
       byte[] r = rc.getRow().toArray();
       byte[] cf = rc.getColumn().getFamily().toArray();
@@ -83,6 +86,7 @@
     this.stats = stats;
     this.rowConverter = ByteUtil::toBytes;
     this.columnConverter = ColumnUtil::convert;
+    this.readLocksSeen = readLocksSeen;
   }
 
   private BatchScanner setupBatchScanner() {
@@ -101,7 +105,7 @@
 
     if (rangesToScan.size() > 0) {
       scanner.setRanges(rangesToScan);
-      SnapshotScanner.setupScanner(scanner, Collections.<Column>emptySet(), startTs);
+      SnapshotScanner.setupScanner(scanner, Collections.<Column>emptySet(), startTs, true);
     } else if (rows != null) {
       List<Range> ranges = new ArrayList<>(rows.size());
 
@@ -111,7 +115,7 @@
 
       scanner.setRanges(ranges);
 
-      SnapshotScanner.setupScanner(scanner, columns, startTs);
+      SnapshotScanner.setupScanner(scanner, columns, startTs, true);
     } else {
       return null;
     }
@@ -176,13 +180,9 @@
         if (colType == ColumnConstants.LOCK_PREFIX) {
           locks.add(entry);
         } else if (colType == ColumnConstants.DATA_PREFIX) {
-          Map<Column, Bytes> cols = ret.get(row);
-          if (cols == null) {
-            cols = new HashMap<>();
-            ret.put(row, cols);
-          }
-
-          cols.put(col, Bytes.of(entry.getValue().get()));
+          ret.computeIfAbsent(row, k -> new HashMap<>()).put(col, Bytes.of(entry.getValue().get()));
+        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+          readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
         } else {
           throw new IllegalArgumentException("Unexpected column type " + colType);
         }
@@ -191,5 +191,4 @@
       bs.close();
     }
   }
-
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ReadLockSnapshot.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ReadLockSnapshot.java
new file mode 100644
index 0000000..eb1ec01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ReadLockSnapshot.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.AbstractSnapshotBase;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+
+public class ReadLockSnapshot extends AbstractSnapshotBase implements SnapshotBase {
+
+  private TransactionImpl txi;
+
+  ReadLockSnapshot(TransactionImpl txi) {
+    super(txi);
+    this.txi = txi;
+  }
+
+  @Override
+  public Bytes get(Bytes row, Column column) {
+    txi.setReadLock(row, column);
+    return txi.get(row, column);
+  }
+
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    for (Column column : columns) {
+      txi.setReadLock(row, column);
+    }
+    return txi.get(row, columns);
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+    for (Bytes row : rows) {
+      for (Column column : columns) {
+        txi.setReadLock(row, column);
+      }
+    }
+    return txi.get(rows, columns);
+  }
+
+  @Override
+  public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+    for (RowColumn rowColumn : rowColumns) {
+      txi.setReadLock(rowColumn.getRow(), rowColumn.getColumn());
+    }
+    return txi.get(rowColumns);
+  }
+
+  @Override
+  public ScannerBuilder scanner() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getStartTimestamp() {
+    return txi.getStartTimestamp();
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index cd2e008..a1db35b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -49,10 +50,12 @@
   public static final class Opts {
     private final Span span;
     private final Collection<Column> columns;
+    private final boolean showReadLocks;
 
-    public Opts(Span span, Collection<Column> columns) {
+    public Opts(Span span, Collection<Column> columns, boolean showReadLocks) {
       this.span = span;
       this.columns = ImmutableSet.copyOf(columns);
+      this.showReadLocks = showReadLocks;
     }
 
     public Span getSpan() {
@@ -62,12 +65,17 @@
     public Collection<Column> getColumns() {
       return columns;
     }
+
+    public boolean getShowReadLocks() {
+      return showReadLocks;
+    }
   }
 
   private final long startTs;
   private final Environment env;
   private final TxStats stats;
   private final Opts config;
+  private Consumer<Entry<Key, Value>> locksSeen;
 
   static final long INITIAL_WAIT_TIME = 50;
   // TODO make configurable
@@ -75,7 +83,8 @@
 
 
 
-  static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs) {
+  static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs,
+      boolean showReadLocks) {
     for (Column col : columns) {
       if (col.isQualifierSet()) {
         scanner.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
@@ -86,6 +95,7 @@
 
     IteratorSetting iterConf = new IteratorSetting(10, SnapshotIterator.class);
     SnapshotIterator.setSnaptime(iterConf, startTs);
+    SnapshotIterator.setReturnReadLockPresent(iterConf, showReadLocks);
     scanner.addScanIterator(iterConf);
   }
 
@@ -111,7 +121,7 @@
       scanner.clearScanIterators();
       scanner.setRange(SpanUtil.toRange(snapIterConfig.getSpan()));
 
-      setupScanner(scanner, snapIterConfig.getColumns(), startTs);
+      setupScanner(scanner, snapIterConfig.getColumns(), startTs, snapIterConfig.showReadLocks);
 
       this.iterator = scanner.iterator();
     }
@@ -137,7 +147,7 @@
     }
 
     private void resetScanner(Span span) {
-      snapIterConfig = new Opts(span, snapIterConfig.columns);
+      snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks);
       setUpIterator();
     }
 
@@ -145,6 +155,8 @@
 
       // read ahead a little bit looking for other locks to resolve
 
+      locksSeen.accept(lockEntry);
+
       long startTime = System.currentTimeMillis();
       long waitTime = INITIAL_WAIT_TIME;
 
@@ -164,6 +176,7 @@
 
           if (colType == ColumnConstants.LOCK_PREFIX) {
             locks.add(entry);
+            locksSeen.accept(lockEntry);
           }
 
           amountRead += entry.getKey().getSize() + entry.getValue().getSize();
@@ -215,6 +228,8 @@
         } else if (colType == ColumnConstants.DATA_PREFIX) {
           stats.incrementEntriesReturned(1);
           return entry;
+        } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+          return entry;
         } else {
           throw new IllegalArgumentException("Unexpected column type " + colType);
         }
@@ -227,11 +242,13 @@
     }
   }
 
-  SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats) {
+  SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats,
+      Consumer<Entry<Key, Value>> locksSeen) {
     this.env = env;
     this.config = config;
     this.startTs = startTs;
     this.stats = stats;
+    this.locksSeen = locksSeen;
   }
 
   @Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index aa5e7da..0abdafe 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -26,10 +26,12 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -51,10 +53,14 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.LockValue;
+import org.apache.fluo.accumulo.values.ReadLockValue;
 import org.apache.fluo.api.client.AbstractTransactionBase;
 import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -71,13 +77,17 @@
 import org.apache.fluo.core.exceptions.StaleScanException;
 import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
 import org.apache.fluo.core.oracle.Stamp;
+import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.core.util.ColumnUtil;
 import org.apache.fluo.core.util.ConditionalFlutation;
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.Flutation;
 import org.apache.fluo.core.util.Hex;
 import org.apache.fluo.core.util.SpanUtil;
+import org.apache.fluo.core.util.UtilWaitThread;
 
+import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
+import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
 import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
 
@@ -92,15 +102,21 @@
       Bytes.of("special delete object f804266bf94935edd45ae3e6c287b93c1814295c");
   private static final Bytes NTFY_VAL =
       Bytes.of("special ntfy value ce0c523e6e4dc093be8a2736b82eca1b95f97ed4");
+  private static final Bytes RLOCK_VAL =
+      Bytes.of("special rlock value 94da84e7796ff3b23b779805d820a33f1997cb8b");
 
   private static boolean isWrite(Bytes val) {
-    return val != NTFY_VAL;
+    return val != NTFY_VAL && val != RLOCK_VAL;
   }
 
   private static boolean isDelete(Bytes val) {
     return val == DELETE;
   }
 
+  private static boolean isReadLock(Bytes val) {
+    return val == RLOCK_VAL;
+  }
+
   private static enum TxStatus {
     OPEN, COMMIT_STARTED, COMMITTED, CLOSED
   }
@@ -110,7 +126,9 @@
   private final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
   private final Set<Column> observedColumns;
   private final Environment env;
-  final Map<Bytes, Set<Column>> columnsRead = new HashMap<>();
+  private final Map<Bytes, Set<Column>> columnsRead = new HashMap<>();
+  // Tracks row columns that were observed to have had a read lock in the past.
+  private final Map<Bytes, Set<Column>> readLocksSeen = new HashMap<>();
   private final TxStats stats;
   private Notification notification;
   private Notification weakNotification;
@@ -170,7 +188,8 @@
   @Override
   public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
     checkIfOpen();
-    return getImpl(row, columns);
+    return getImpl(row, columns, kve -> {
+    });
   }
 
   @Override
@@ -183,7 +202,8 @@
 
     env.getSharedResources().getVisCache().validate(columns);
 
-    ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rows, columns, env, startTs, stats);
+    ParallelSnapshotScanner pss =
+        new ParallelSnapshotScanner(rows, columns, env, startTs, stats, readLocksSeen);
 
     Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
 
@@ -202,7 +222,8 @@
       return Collections.emptyMap();
     }
 
-    ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, env, startTs, stats);
+    ParallelSnapshotScanner pss =
+        new ParallelSnapshotScanner(rowColumns, env, startTs, stats, readLocksSeen);
 
     Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
     Map<RowColumn, Bytes> ret = new HashMap<>();
@@ -217,7 +238,8 @@
     return ret;
   }
 
-  private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
+  private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns,
+      Consumer<Entry<Key, Value>> locksSeen) {
 
     // TODO push visibility filtering to server side?
 
@@ -241,19 +263,26 @@
           cols.add(column);
         }
       }
-      opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+      opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
     } else {
-      opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+      opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
     }
 
     Map<Column, Bytes> ret = new HashMap<>();
+    Set<Column> readLockCols = null;
 
-    for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats)) {
+    for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats, locksSeen)) {
+
       Column col = ColumnUtil.convert(kve.getKey());
-      if (shouldCopy) {
-        if (columns.contains(col)) {
-          ret.put(col, Bytes.of(kve.getValue().get()));
+      if (shouldCopy && !columns.contains(col)) {
+        continue;
+      }
+
+      if ((kve.getKey().getTimestamp() & PREFIX_MASK) == RLOCK_PREFIX) {
+        if (readLockCols == null) {
+          readLockCols = readLocksSeen.computeIfAbsent(row, k -> new HashSet<>());
         }
+        readLockCols.add(col);
       } else {
         ret.put(col, Bytes.of(kve.getValue().get()));
       }
@@ -280,6 +309,31 @@
     colsRead.addAll(columns);
   }
 
+  void setReadLock(Bytes row, Column col) {
+    checkIfOpen();
+    Objects.requireNonNull(row);
+    Objects.requireNonNull(col);
+
+    if (col.getFamily().equals(ColumnConstants.NOTIFY_CF)) {
+      throw new IllegalArgumentException(ColumnConstants.NOTIFY_CF + " is a reserved family");
+    }
+
+    env.getSharedResources().getVisCache().validate(col);
+
+    Map<Column, Bytes> colUpdates = updates.computeIfAbsent(row, k -> new HashMap<>());
+    Bytes curVal = colUpdates.get(col);
+    if (curVal != null && (isWrite(curVal) || isDelete(curVal))) {
+      throw new AlreadySetException("Attemped read lock after write lock " + row + " " + col);
+    }
+
+    colUpdates.put(col, RLOCK_VAL);
+  }
+
+  @Override
+  public SnapshotBase withReadLock() {
+    return new ReadLockSnapshot(this);
+  }
+
   @Override
   public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
     checkIfOpen();
@@ -293,13 +347,7 @@
 
     env.getSharedResources().getVisCache().validate(col);
 
-    // TODO copy?
-
-    Map<Column, Bytes> colUpdates = updates.get(row);
-    if (colUpdates == null) {
-      colUpdates = new HashMap<>();
-      updates.put(row, colUpdates);
-    }
+    Map<Column, Bytes> colUpdates = updates.computeIfAbsent(row, k -> new HashMap<>());
 
     Bytes curVal = colUpdates.get(col);
     if (curVal != null && isWrite(curVal)) {
@@ -346,6 +394,10 @@
       PrewriteIterator.enableAckCheck(iterConf, notification.getTimestamp());
     }
 
+    if (isReadLock(val)) {
+      PrewriteIterator.setReadlock(iterConf);
+    }
+
     Condition cond = new FluoCondition(env, col).setIterators(iterConf);
 
     if (cm == null) {
@@ -358,8 +410,13 @@
       cm.put(col, ColumnConstants.DATA_PREFIX | startTs, val.toArray());
     }
 
-    cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, LockValue.encode(primaryRow, primaryColumn,
-        isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
+    if (isReadLock(val)) {
+      cm.put(col, ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, false),
+          ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID()));
+    } else {
+      cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, LockValue.encode(primaryRow, primaryColumn,
+          isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
+    }
 
     return cm;
   }
@@ -497,7 +554,7 @@
    *
    * @param cd Commit data
    */
-  private void readUnread(CommitData cd) throws Exception {
+  private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> locksSeen) throws Exception {
     // TODO make async
     // TODO need to keep track of ranges read (not ranges passed in, but actual data read... user
     // may not iterate over entire range
@@ -517,10 +574,86 @@
     }
 
     for (Entry<Bytes, Set<Column>> entry : columnsToRead.entrySet()) {
-      getImpl(entry.getKey(), entry.getValue());
+      getImpl(entry.getKey(), entry.getValue(), locksSeen);
     }
   }
 
+  private void checkForOrphanedReadLocks(CommitData cd, Map<Bytes, Set<Column>> locksResolved)
+      throws Exception {
+
+    if (readLocksSeen.size() == 0) {
+      return;
+    }
+
+    Map<Bytes, Set<Column>> rowColsToCheck = new HashMap<>();
+
+    for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {
+
+      Set<Column> resolvedColumns =
+          locksResolved.getOrDefault(entry.getKey(), Collections.emptySet());
+
+      Set<Column> colsToCheck = null;
+      Set<Column> readLockCols = readLocksSeen.get(entry.getKey());
+      if (readLockCols != null) {
+        for (Column candidate : Sets.intersection(readLockCols, entry.getValue())) {
+          if (resolvedColumns.contains(candidate)) {
+            // A write lock was seen and this is probably what caused the collision, no need to
+            // check this column for read locks.
+            continue;
+          }
+
+          if (!isReadLock(updates.getOrDefault(entry.getKey(), Collections.emptyMap())
+              .getOrDefault(candidate, EMPTY_BS))) {
+            if (colsToCheck == null) {
+              colsToCheck = new HashSet<>();
+            }
+            colsToCheck.add(candidate);
+          }
+        }
+
+        if (colsToCheck != null) {
+          rowColsToCheck.put(entry.getKey(), colsToCheck);
+        }
+      }
+    }
+
+    if (rowColsToCheck.size() > 0) {
+
+      long startTime = System.currentTimeMillis();
+      long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;
+
+      boolean resolved = false;
+
+      List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
+
+      startTime = System.currentTimeMillis();
+
+      while (!resolved) {
+        resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
+        if (!resolved) {
+          UtilWaitThread.sleep(waitTime);
+          stats.incrementLockWaitTime(waitTime);
+          waitTime = Math.min(SnapshotScanner.MAX_WAIT_TIME, waitTime * 2);
+
+          openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
+        }
+      }
+    }
+  }
+
+  private void checkForOrphanedLocks(CommitData cd) throws Exception {
+
+    Map<Bytes, Set<Column>> locksSeen = new HashMap<>();
+
+    readUnread(cd, kve -> {
+      Bytes row = ByteUtil.toBytes(kve.getKey().getRowData());
+      Column col = ColumnUtil.convert(kve.getKey());
+      locksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
+    });
+
+    checkForOrphanedReadLocks(cd, locksSeen);
+  }
+
   private boolean checkForAckCollision(ConditionalMutation cm) {
     Bytes row = Bytes.of(cm.getRow());
 
@@ -792,8 +925,8 @@
       stats.incrementEntriesSet(cols.size());
     }
 
-    Bytes primRow;
-    Column primCol;
+    Bytes primRow = null;
+    Column primCol = null;
 
     if (primary != null) {
       primRow = primary.getRow();
@@ -805,9 +938,23 @@
       primRow = notification.getRow();
       primCol = notification.getColumn();
     } else {
-      primRow = updates.keySet().iterator().next();
-      Map<Column, Bytes> colSet = updates.get(primRow);
-      primCol = colSet.keySet().iterator().next();
+
+      outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet()) {
+        for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
+          if (!isReadLock(entry2.getValue())) {
+            primRow = entry.getKey();
+            primCol = entry2.getKey();
+            break outer;
+          }
+        }
+      }
+
+      if (primRow == null) {
+        // there are only read locks, so nothing to write
+        deleteWeakRow();
+        commitCallback.committed();
+        return;
+      }
     }
 
     // get a primary column
@@ -825,7 +972,6 @@
     final ConditionalMutation pcm =
         prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow));
 
-
     ListenableFuture<Iterator<Result>> future = cd.acw.apply(Collections.singletonList(pcm));
     Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
       @Override
@@ -881,7 +1027,7 @@
       cd.addPrimaryToRejected();
       getStats().setRejected(cd.getRejected());
       // TODO do async
-      readUnread(cd);
+      checkForOrphanedLocks(cd);
       if (checkForAckCollision(pcm)) {
         cd.commitObserver.alreadyAcknowledged();
       } else {
@@ -913,7 +1059,6 @@
 
     cd.acceptedRows = new HashSet<>();
 
-
     ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations);
     Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
       @Override
@@ -940,7 +1085,7 @@
       env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) {
         @Override
         protected void runCommitStep(CommitData cd) throws Exception {
-          readUnread(cd);
+          checkForOrphanedLocks(cd);
           rollbackOtherLocks(cd);
         }
       });
@@ -957,7 +1102,6 @@
     }
   }
 
-
   private void rollbackOtherLocks(CommitData cd) throws Exception {
     // roll back locks
 
@@ -968,9 +1112,14 @@
     ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
     for (Bytes row : cd.acceptedRows) {
       m = new Flutation(env, row);
-      for (Column col : updates.get(row).keySet()) {
-        m.put(col, ColumnConstants.DEL_LOCK_PREFIX | startTs,
-            DelLockValue.encodeRollback(false, true));
+      for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
+        if (isReadLock(entry.getValue())) {
+          m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+              DelReadLockValue.encodeRollback());
+        } else {
+          m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+              DelLockValue.encodeRollback(false, true));
+        }
       }
       mutations.add(m);
     }
@@ -1004,7 +1153,6 @@
     }, env.getSharedResources().getAsyncCommitExecutor());
   }
 
-
   private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
     if (startTs < commitStamp.getGcTimestamp()) {
       rollbackOtherLocks(cd);
@@ -1065,7 +1213,6 @@
       }
     }
 
-
     ListenableFuture<Void> future =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
     Futures.addCallback(future, new CommitCallback<Void>(cd) {
@@ -1088,8 +1235,7 @@
     final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
 
     ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
-        startTs, commitTs, observedColumns, delLockMutation);
-
+        isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
 
     ListenableFuture<Iterator<Result>> future =
         cd.acw.apply(Collections.singletonList(delLockMutation));
@@ -1146,7 +1292,6 @@
     }
   }
 
-
   private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
       throws Exception {
     if (mutationStatus != Status.ACCEPTED) {
@@ -1170,7 +1315,7 @@
         ColumnUtil.commitColumn(env,
             isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
             colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
-            startTs, commitTs, observedColumns, m);
+            isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
       }
 
       mutations.add(m);
@@ -1217,6 +1362,8 @@
   }
 
   public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
-    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns), startTs, stats);
+    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false), startTs, stats,
+        kve -> {
+        });
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index a4c35c3..21406dc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -20,8 +20,10 @@
 import java.util.Set;
 
 import com.google.common.collect.Iterators;
+import org.apache.fluo.api.client.AbstractSnapshotBase;
 import org.apache.fluo.api.client.AbstractTransactionBase;
 import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.data.Bytes;
@@ -84,7 +86,6 @@
         e -> encNonAscii(e.getKey()) + "=" + encNonAscii(e.getValue())));
   }
 
-
   private String toStringEncNonAsciiCC(Collection<Column> columns) {
     return Iterators.toString(Iterators.transform(columns.iterator(), Hex::encNonAscii));
   }
@@ -126,47 +127,66 @@
 
   }
 
-  @Override
-  public Bytes get(Bytes row, Column column) {
-    Bytes ret = tx.get(row, column);
+  private Bytes get(SnapshotBase snap, Bytes row, Column column, String prefix) {
+    Bytes ret = snap.get(row, column);
     if (log.isTraceEnabled()) {
-      log.trace("txid: {} get({}, {}) -> {}", txid, encNonAscii(row), toStringEncNonAscii(column),
-          encNonAscii(ret));
+      log.trace("txid: {} {}get({}, {}) -> {}", txid, prefix, encNonAscii(row),
+          toStringEncNonAscii(column), encNonAscii(ret));
     }
     return ret;
   }
 
-  @Override
-  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
-    Map<Column, Bytes> ret = tx.get(row, columns);
+  private Map<Column, Bytes> get(SnapshotBase snap, Bytes row, Set<Column> columns, String prefix) {
+    Map<Column, Bytes> ret = snap.get(row, columns);
     if (log.isTraceEnabled()) {
-      log.trace("txid: {} get({}, {}) -> {}", txid, encNonAscii(row),
+      log.trace("txid: {} {}get({}, {}) -> {}", txid, prefix, encNonAscii(row),
           toStringEncNonAsciiCC(columns), toStringEncNonAsciiMCB(ret));
     }
     return ret;
   }
 
-  @Override
-  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
-    Map<Bytes, Map<Column, Bytes>> ret = tx.get(rows, columns);
+  private Map<Bytes, Map<Column, Bytes>> get(SnapshotBase snap, Collection<Bytes> rows,
+      Set<Column> columns, String prefix) {
+    Map<Bytes, Map<Column, Bytes>> ret = snap.get(rows, columns);
     if (log.isTraceEnabled()) {
-      log.trace("txid: {} get({}, {}) -> {}", txid, toStringEncNonAsciiCB(rows),
+      log.trace("txid: {} {}get({}, {}) -> {}", txid, prefix, toStringEncNonAsciiCB(rows),
           toStringEncNonAsciiCC(columns), toStringEncNonAsciiMBMCB(ret));
     }
     return ret;
   }
 
-  @Override
-  public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
-    Map<RowColumn, Bytes> ret = tx.get(rowColumns);
+
+  private Map<RowColumn, Bytes> get(SnapshotBase snap, Collection<RowColumn> rowColumns,
+      String prefix) {
+    Map<RowColumn, Bytes> ret = snap.get(rowColumns);
     if (log.isTraceEnabled()) {
-      log.trace("txid: {} get({}) -> {}", txid, toStringEncNonAsciiCRC(rowColumns),
+      log.trace("txid: {} {}get({}) -> {}", txid, prefix, toStringEncNonAsciiCRC(rowColumns),
           toStringEncNonAsciiMRCB(ret));
     }
     return ret;
   }
 
   @Override
+  public Bytes get(Bytes row, Column column) {
+    return get(tx, row, column, "");
+  }
+
+  @Override
+  public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+    return get(tx, row, columns, "");
+  }
+
+  @Override
+  public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+    return get(tx, rows, columns, "");
+  }
+
+  @Override
+  public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+    return get(tx, rowColumns, "");
+  }
+
+  @Override
   public ScannerBuilder scanner() {
     return new TracingScannerBuilder(tx.scanner(), txid);
   }
@@ -303,4 +323,43 @@
   public int getSize() {
     return tx.getSize();
   }
+
+  @Override
+  public SnapshotBase withReadLock() {
+    SnapshotBase rltx = tx.withReadLock();
+
+    return new AbstractSnapshotBase() {
+
+      @Override
+      public ScannerBuilder scanner() {
+        // this is an unsupported op and will throw an exception so don't bother w/ trace logging
+        return rltx.scanner();
+      }
+
+      @Override
+      public long getStartTimestamp() {
+        return rltx.getStartTimestamp();
+      }
+
+      @Override
+      public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+        return TracingTransaction.this.get(rltx, rowColumns, "withReadLock().");
+      }
+
+      @Override
+      public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+        return TracingTransaction.this.get(rltx, rows, columns, "withReadLock().");
+      }
+
+      @Override
+      public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+        return TracingTransaction.this.get(rltx, row, columns, "withReadLock().");
+      }
+
+      @Override
+      public Bytes get(Bytes row, Column column) {
+        return TracingTransaction.this.get(rltx, row, column, "withReadLock().");
+      }
+    };
+  }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bbc2d03..7d488ca 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -30,7 +30,9 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
 import org.apache.fluo.accumulo.values.WriteValue;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Bytes.BytesBuilder;
@@ -51,9 +53,13 @@
   }
 
   public static void commitColumn(Environment env, boolean isTrigger, boolean isPrimary, Column col,
-      boolean isWrite, boolean isDelete, long startTs, long commitTs, Set<Column> observedColumns,
-      Mutation m) {
-    if (isWrite) {
+      boolean isWrite, boolean isDelete, boolean isReadlock, long startTs, long commitTs,
+      Set<Column> observedColumns, Mutation m) {
+    if (isReadlock) {
+      Flutation.put(env, m, col,
+          ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+          DelReadLockValue.encodeCommit(commitTs));
+    } else if (isWrite) {
       Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs,
           WriteValue.encode(startTs, isPrimary, isDelete));
     } else {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 57299e9..4e481e4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -17,6 +17,7 @@
 
 import java.io.File;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
@@ -110,18 +111,22 @@
     return TABLE_BASE + tableCounter.incrementAndGet();
   }
 
-  protected void printSnapshot() throws Exception {
+  protected void printSnapshot(Consumer<String> out) throws Exception {
     try (Snapshot s = client.newSnapshot()) {
-      System.out.println("== snapshot start ==");
+      out.accept("== snapshot start ==");
 
       for (RowColumnValue rcv : s.scanner().build()) {
-        System.out.println(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue());
+        out.accept(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue());
       }
 
-      System.out.println("=== snapshot end ===");
+      out.accept("=== snapshot end ===");
     }
   }
 
+  protected void printSnapshot() throws Exception {
+    printSnapshot(System.out::println);
+  }
+
   @AfterClass
   public static void tearDownAccumulo() throws Exception {
     if (startedCluster) {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index 57b3923..a175500 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -31,6 +31,7 @@
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.NotificationUtil;
 import org.apache.fluo.api.client.AbstractTransactionBase;
+import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.ScannerBuilder;
 import org.apache.fluo.api.data.Bytes;
@@ -199,4 +200,9 @@
   public long getStartTimestamp() {
     return tx.getStartTimestamp();
   }
+
+  @Override
+  public SnapshotBase withReadLock() {
+    return tx.withReadLock();
+  }
 }
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index bdb4b51..fc553f5 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -29,6 +29,7 @@
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
+import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.impl.TransactorNode;
@@ -176,6 +177,88 @@
     Assert.assertEquals(0, countInTable("-DATA"));
   }
 
+  private void increment(TransactionBase tx, String row, Column col) {
+    int count = Integer.parseInt(tx.gets(row, col, "0"));
+    tx.set(row, col, count + 1 + "");
+  }
+
+  @Test(timeout = 60000)
+  public void testReadLocks() throws Exception {
+
+    final Column altIdCol = new Column("info", "altId");
+
+    TestTransaction tx1 = new TestTransaction(env);
+    for (int i = 0; i < 10; i++) {
+      tx1.set(String.format("n:%03d", i), altIdCol, "" + (19 * (1 + i)));
+    }
+
+    tx1.done();
+
+    for (int i = 0; i < 50; i++) {
+      String row = String.format("n:%03d", i % 10);
+
+      TestTransaction tx = new TestTransaction(env);
+      String altId = tx.withReadLock().gets(row, altIdCol);
+
+      increment(tx, "a:" + altId, new Column("count", row));
+
+      tx.done();
+    }
+
+    Assert.assertEquals(50, countInTable("-DEL_RLOCK"));
+    Assert.assertEquals(50, countInTable("-RLOCK"));
+
+    TestTransaction tx2 = new TestTransaction(env);
+    for (int i = 0; i < 10; i++) {
+      String row = String.format("n:%03d", i);
+      String newAltId = (13 * (i + 1)) + "";
+      String currAltId = tx2.gets(row, altIdCol);
+
+
+      tx2.set(row, altIdCol, newAltId);
+
+      String count = tx2.gets("a:" + currAltId, new Column("count", row));
+      tx2.set("a:" + newAltId, new Column("count", row), count);
+      tx2.delete("a:" + currAltId, new Column("count", row));
+    }
+
+    tx2.done();
+
+    // all read locks should be garbage collected because of the writes after the read locks
+    conn.tableOperations().compact(table, null, null, true, true);
+
+    Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
+    Assert.assertEquals(0, countInTable("-RLOCK"));
+
+    for (int i = 0; i < 50; i++) {
+      String row = String.format("n:%03d", i % 10);
+
+      TestTransaction tx = new TestTransaction(env);
+      String altId = tx.withReadLock().gets(row, altIdCol);
+
+      increment(tx, "a:" + altId, new Column("count", row));
+
+      tx.done();
+    }
+
+    TestTransaction tx3 = new TestTransaction(env);
+    for (int i = 0; i < 10; i++) {
+      String row = String.format("n:%03d", i);
+      String currAltId = tx3.gets(row, altIdCol);
+      Assert.assertEquals("10", tx3.gets("a:" + currAltId, new Column("count", row)));
+    }
+
+    tx3.done();
+
+    waitForGcTime(tx3.getStartTimestamp());
+    conn.tableOperations().compact(table, null, null, true, true);
+
+
+    // all read locks older than GC time should be dropped
+    Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
+    Assert.assertEquals(0, countInTable("-RLOCK"));
+  }
+
   private int countInTable(String str) throws TableNotFoundException {
     int count = 0;
     Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
new file mode 100644
index 0000000..489b9e0
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.accumulo.format.FluoFormatter;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.exceptions.CommitException;
+import org.apache.fluo.core.impl.TransactionImpl.CommitData;
+import org.apache.fluo.core.impl.TransactorNode;
+import org.apache.fluo.core.oracle.Stamp;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.apache.fluo.integration.TestTransaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.fluo.integration.impl.ReadLockIT.addEdge;
+import static org.apache.fluo.integration.impl.ReadLockIT.setAlias;
+
+public class ReadLockFailureIT extends ITBaseImpl {
+
+  private void dumpTable(Consumer<String> out) throws TableNotFoundException {
+    Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
+    for (Entry<Key, Value> entry : scanner) {
+      out.accept(FluoFormatter.toString(entry));
+    }
+  }
+
+  private Set<String> getDerivedEdges() {
+    Set<String> derivedEdges = new HashSet<>();
+    try (Snapshot snap = client.newSnapshot()) {
+      snap.scanner().over(Span.prefix("d:")).build().stream().map(RowColumnValue::getsRow)
+          .map(r -> r.substring(2)).forEach(derivedEdges::add);
+    }
+    return derivedEdges;
+  }
+
+  private void expectCommitException(Consumer<Transaction> retryAction) {
+    try (Transaction tx = client.newTransaction()) {
+      retryAction.accept(tx);
+      tx.commit();
+      Assert.fail();
+    } catch (CommitException ce) {
+
+    }
+  }
+
+  private void retryOnce(Consumer<Transaction> retryAction) {
+
+    expectCommitException(retryAction);
+
+    try (Transaction tx = client.newTransaction()) {
+      retryAction.accept(tx);
+      tx.commit();
+    }
+  }
+
+  private void retryTwice(Consumer<Transaction> retryAction) {
+
+    expectCommitException(retryAction);
+    expectCommitException(retryAction);
+
+    try (Transaction tx = client.newTransaction()) {
+      retryAction.accept(tx);
+      tx.commit();
+    }
+  }
+
+
+  private TransactorNode partiallyCommit(Consumer<TransactionBase> action, boolean commitPrimary,
+      boolean closeTransactor) throws Exception {
+    TransactorNode t2 = new TransactorNode(env);
+    TestTransaction tx2 = new TestTransaction(env, t2);
+
+    action.accept(tx2);
+
+    CommitData cd = tx2.createCommitData();
+    Assert.assertTrue(tx2.preCommit(cd));
+
+    if (commitPrimary) {
+      Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
+      Assert.assertTrue(tx2.commitPrimaryColumn(cd, commitTs));
+    }
+
+    if (closeTransactor) {
+      t2.close();
+    }
+
+    return t2;
+  }
+
+  private void testBasicRollback(boolean closeTransactor) throws Exception {
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bob");
+      setAlias(tx, "node2", "joe");
+      setAlias(tx, "node3", "alice");
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      addEdge(tx, "node1", "node2");
+      tx.commit();
+    }
+
+    TransactorNode tn =
+        partiallyCommit(tx -> addEdge(tx, "node1", "node3"), false, closeTransactor);
+
+    Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob"), getDerivedEdges());
+
+    retryOnce(tx -> setAlias(tx, "node1", "bobby"));
+
+    Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby"), getDerivedEdges());
+
+    retryOnce(tx -> setAlias(tx, "node3", "alex"));
+
+    Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby"), getDerivedEdges());
+
+    if (!closeTransactor) {
+      tn.close();
+    }
+  }
+
+  @Test
+  public void testBasicRollback1() throws Exception {
+    testBasicRollback(true);
+  }
+
+  @Test
+  public void testBasicRollback2() throws Exception {
+    testBasicRollback(false);
+  }
+
+  private void testBasicRollforward(boolean closeTransactor) throws Exception {
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bob");
+      setAlias(tx, "node2", "joe");
+      setAlias(tx, "node3", "alice");
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      addEdge(tx, "node1", "node2");
+      tx.commit();
+    }
+
+    TransactorNode tn = partiallyCommit(tx -> addEdge(tx, "node1", "node3"), true, closeTransactor);
+
+    retryOnce(tx -> setAlias(tx, "node1", "bobby"));
+
+    Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alice", "alice:bobby"),
+        getDerivedEdges());
+
+    retryOnce(tx -> setAlias(tx, "node3", "alex"));
+
+    Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alex", "alex:bobby"),
+        getDerivedEdges());
+
+    if (!closeTransactor) {
+      tn.close();
+    }
+  }
+
+  @Test
+  public void testBasicRollforward1() throws Exception {
+    testBasicRollforward(false);
+  }
+
+  @Test
+  public void testBasicRollforward2() throws Exception {
+    testBasicRollforward(true);
+  }
+
+  private void testParallelScan(boolean closeTransactor) throws Exception {
+    Column crCol = new Column("stat", "completionRatio");
+
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("user5", crCol, "0.5");
+      tx.set("user6", crCol, "0.75");
+      tx.commit();
+    }
+
+    TransactorNode tn = partiallyCommit(tx -> {
+      // get multiple read locks with a parallel scan
+      Map<String, Map<Column, String>> ratios =
+          tx.withReadLock().gets(Arrays.asList("user5", "user6"), crCol);
+
+      double cr1 = Double.parseDouble(ratios.get("user5").get(crCol));
+      double cr2 = Double.parseDouble(ratios.get("user5").get(crCol));
+
+      tx.set("org1", crCol, (cr1 + cr2) / 2 + "");
+    }, false, closeTransactor);
+
+    retryTwice(tx -> {
+      Map<String, Map<Column, String>> ratios = tx.gets(Arrays.asList("user5", "user6"), crCol);
+
+      tx.set("user5", crCol, "0.51");
+      tx.set("user6", crCol, "0.76");
+    });
+
+    try (Snapshot snap = client.newSnapshot()) {
+      Assert.assertNull(snap.gets("org1", crCol));
+      Assert.assertEquals("0.51", snap.gets("user5", crCol));
+      Assert.assertEquals("0.76", snap.gets("user6", crCol));
+    }
+
+    if (!closeTransactor) {
+      tn.close();
+    }
+  }
+
+  @Test
+  public void testParallelScan1() throws Exception {
+    testParallelScan(true);
+  }
+
+  @Test
+  public void testParallelScan2() throws Exception {
+    testParallelScan(false);
+  }
+
+  private void testParallelScanRC(boolean closeTransactor) throws Exception {
+    // currently get w/ RowColumn has a different code path than other gets that take multiple rows
+    // and columns
+
+    Column crCol = new Column("stat", "completionRatio");
+
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("user5", crCol, "0.5");
+      tx.set("user6", crCol, "0.75");
+      tx.commit();
+    }
+
+    TransactorNode tn = partiallyCommit(tx -> {
+      // get multiple read locks with a parallel scan
+      Map<RowColumn, String> ratios = tx.withReadLock()
+          .gets(Arrays.asList(new RowColumn("user5", crCol), new RowColumn("user6", crCol)));
+
+
+      double cr1 = Double.parseDouble(ratios.get(new RowColumn("user5", crCol)));
+      double cr2 = Double.parseDouble(ratios.get(new RowColumn("user6", crCol)));
+
+      tx.set("org1", crCol, (cr1 + cr2) / 2 + "");
+    }, false, true);
+
+    retryTwice(tx -> {
+      Map<RowColumn, String> ratios =
+          tx.gets(Arrays.asList(new RowColumn("user5", crCol), new RowColumn("user6", crCol)));
+
+      tx.set("user5", crCol, "0.51");
+      tx.set("user6", crCol, "0.76");
+    });
+
+    try (Snapshot snap = client.newSnapshot()) {
+      Assert.assertNull(snap.gets("org1", crCol));
+      Assert.assertEquals("0.51", snap.gets("user5", crCol));
+      Assert.assertEquals("0.76", snap.gets("user6", crCol));
+    }
+
+    if (!closeTransactor) {
+      tn.close();
+    }
+  }
+
+  @Test
+  public void testParallelScanRC1() throws Exception {
+    testParallelScanRC(true);
+  }
+
+  @Test
+  public void testParallelScanRC2() throws Exception {
+    testParallelScanRC(false);
+  }
+
+  private void testWriteWoRead(boolean commitPrimary, boolean closeTransactor) throws Exception {
+    // Reads can cause locks to be recovered. This test the case of a transactions that only does a
+    // write to a field that has an open read lock.
+
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("r1", new Column("f1", "q1"), "v1");
+      tx.set("r2", new Column("f1", "q1"), "v2");
+      tx.commit();
+    }
+
+    TransactorNode transactor = partiallyCommit(tx -> {
+      String v1 = tx.withReadLock().gets("r1", new Column("f1", "q1"));
+      String v2 = tx.withReadLock().gets("r2", new Column("f1", "q1"));
+
+      tx.set("r3", new Column("f1", "qa"), v1 + ":" + v2);
+    }, commitPrimary, closeTransactor);
+
+    // TODO open an issue... does not really need to retry in this case
+    retryOnce(tx -> {
+      tx.set("r1", new Column("f1", "q1"), "v3");
+    });
+
+    try (Transaction tx = client.newTransaction()) {
+      if (commitPrimary) {
+        Assert.assertEquals("v1:v2", tx.gets("r3", new Column("f1", "qa")));
+      } else {
+        Assert.assertNull(tx.gets("r3", new Column("f1", "qa")));
+      }
+      Assert.assertEquals("v3", tx.gets("r1", new Column("f1", "q1")));
+    }
+
+    if (!closeTransactor) {
+      transactor.close();
+    }
+  }
+
+  @Test
+  public void testWriteWoRead1() throws Exception {
+    testWriteWoRead(false, false);
+  }
+
+  @Test
+  public void testWriteWoRead2() throws Exception {
+    testWriteWoRead(false, true);
+  }
+
+  @Test
+  public void testWriteWoRead3() throws Exception {
+    testWriteWoRead(true, false);
+  }
+
+  @Test
+  public void testWriteWoRead4() throws Exception {
+    testWriteWoRead(true, true);
+  }
+
+  private int countInTable(String str) throws TableNotFoundException {
+    int count = 0;
+    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    for (String e : Iterables.transform(scanner, FluoFormatter::toString)) {
+      if (e.contains(str)) {
+        count++;
+      }
+    }
+
+    return count;
+  }
+
+  @Test
+  public void testFailDeletesReadLocks() throws Exception {
+    try (Transaction tx = client.newTransaction()) {
+      for (int i = 0; i < 20; i++) {
+        tx.set("r-" + i, new Column("f1", "q1"), "" + i);
+      }
+
+      tx.commit();
+    }
+
+    long startTs = 0;
+
+    try (Transaction tx1 = client.newTransaction()) {
+      tx1.set("r-5", new Column("f1", "q1"), "9");
+      try (Transaction tx2 = client.newTransaction()) {
+        tx1.commit();
+
+        int sum = 0;
+        for (int i = 0; i < 20; i++) {
+          sum += Integer.parseInt(tx2.withReadLock().gets("r-" + i, new Column("f1", "q1")));
+        }
+
+        tx2.set("sum1", new Column("f", "s"), "" + sum);
+        startTs = tx2.getStartTimestamp();
+        tx2.commit();
+        Assert.fail();
+      } catch (CommitException e) {
+
+      }
+    }
+
+    try (Snapshot snapshot = client.newSnapshot()) {
+      Assert.assertNull(snapshot.gets("sum1", new Column("f", "s")));
+    }
+
+    // ensure the failed tx deleted its read locks....
+    Assert.assertEquals(19, countInTable(startTs + "-RLOCK"));
+    Assert.assertEquals(19, countInTable(startTs + "-DEL_RLOCK"));
+
+    try (Transaction tx = client.newTransaction()) {
+      int sum = 0;
+      for (int i = 0; i < 20; i++) {
+        sum += Integer.parseInt(tx.withReadLock().gets("r-" + i, new Column("f1", "q1")));
+      }
+
+      tx.set("sum1", new Column("f", "s"), "" + sum);
+      tx.commit();
+    }
+
+    try (Snapshot snapshot = client.newSnapshot()) {
+      Assert.assertEquals("194", snapshot.gets("sum1", new Column("f", "s")));
+    }
+  }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockIT.java
new file mode 100644
index 0000000..b88d66e
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockIT.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.accumulo.format.FluoFormatter;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.api.exceptions.CommitException;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.apache.fluo.integration.TestTransaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+
+public class ReadLockIT extends ITBaseImpl {
+
+  private static final Column ALIAS_COL = new Column("node", "alias");
+
+  private void addEdge(String node1, String node2) {
+    try (Transaction tx = client.newTransaction()) {
+      addEdge(tx, node1, node2);
+      tx.commit();
+    }
+  }
+
+  static void addEdge(TransactionBase tx, String node1, String node2) {
+    Map<String, Map<Column, String>> aliases =
+        tx.withReadLock().gets(asList("r:" + node1, "r:" + node2), ALIAS_COL);
+    String alias1 = aliases.get("r:" + node1).get(ALIAS_COL);
+    String alias2 = aliases.get("r:" + node2).get(ALIAS_COL);
+
+    addEdge(tx, node1, node2, alias1, alias2);
+  }
+
+  static void addEdge(TransactionBase tx, String node1, String node2, String alias1,
+      String alias2) {
+    tx.set("d:" + alias1 + ":" + alias2, new Column("edge", node1 + ":" + node2), "");
+    tx.set("d:" + alias2 + ":" + alias1, new Column("edge", node2 + ":" + node1), "");
+
+    tx.set("r:" + node1 + ":" + node2, new Column("edge", "aliases"), alias1 + ":" + alias2);
+    tx.set("r:" + node2 + ":" + node1, new Column("edge", "aliases"), alias2 + ":" + alias1);
+  }
+
+  static void setAlias(TransactionBase tx, String node, String alias) {
+    tx.set("r:" + node, new Column("node", "alias"), alias);
+
+    CellScanner scanner = tx.scanner().over(Span.prefix("r:" + node + ":")).build();
+
+    for (RowColumnValue rcv : scanner) {
+      String otherNode = rcv.getsRow().split(":")[2];
+      String[] aliases = rcv.getsValue().split(":");
+
+      if (aliases.length != 2) {
+        throw new RuntimeException("bad alias " + rcv);
+      }
+
+      if (!alias.equals(aliases[0])) {
+        tx.delete("d:" + aliases[0] + ":" + aliases[1], new Column("edge", node + ":" + otherNode));
+        tx.delete("d:" + aliases[1] + ":" + aliases[0], new Column("edge", otherNode + ":" + node));
+
+        addEdge(tx, node, otherNode, alias, aliases[1]);
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentReadlocks() throws Exception {
+
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bob");
+      setAlias(tx, "node2", "joe");
+      setAlias(tx, "node3", "alice");
+      tx.commit();
+    }
+
+
+    TestTransaction tx1 = new TestTransaction(env);
+    setAlias(tx1, "node2", "jojo");
+
+    TestTransaction tx2 = new TestTransaction(env);
+    TestTransaction tx3 = new TestTransaction(env);
+
+    addEdge(tx2, "node1", "node2");
+    addEdge(tx3, "node1", "node3");
+
+    tx2.commit();
+    tx2.close();
+
+    tx3.commit();
+    tx3.close();
+
+    Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob", "bob:alice", "alice:bob"),
+        getDerivedEdges());
+
+    try {
+      tx1.commit();
+      Assert.fail("Expected exception");
+    } catch (CommitException ce) {
+    }
+
+    Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob", "bob:alice", "alice:bob"),
+        getDerivedEdges());
+  }
+
+  @Test
+  public void testWriteCausesReadLockToFail() throws Exception {
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bob");
+      setAlias(tx, "node2", "joe");
+      tx.commit();
+    }
+
+    TestTransaction tx1 = new TestTransaction(env);
+    setAlias(tx1, "node2", "jojo");
+
+    TestTransaction tx2 = new TestTransaction(env);
+
+    addEdge(tx2, "node1", "node2");
+
+    tx1.commit();
+    tx1.close();
+
+    Assert.assertEquals(0, getDerivedEdges().size());
+
+    try {
+      tx2.commit();
+      Assert.fail("Expected exception");
+    } catch (CommitException ce) {
+    }
+
+    // ensure the failed read lock on node1 is cleaned up
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "fred");
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      addEdge(tx, "node1", "node2");
+      tx.commit();
+    }
+
+    Assert.assertEquals(ImmutableSet.of("fred:jojo", "jojo:fred"), getDerivedEdges());
+  }
+
+  private void dumpRow(String row, Consumer<String> out) throws TableNotFoundException {
+    Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
+    scanner.setRange(Range.exact(row));
+    for (Entry<Key, Value> entry : scanner) {
+      out.accept(FluoFormatter.toString(entry));
+    }
+  }
+
+  private void dumpTable(Consumer<String> out) throws TableNotFoundException {
+    Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
+    for (Entry<Key, Value> entry : scanner) {
+      out.accept(FluoFormatter.toString(entry));
+    }
+  }
+
+  @Test
+  public void testWriteAfterReadLock() throws Exception {
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bob");
+      setAlias(tx, "node2", "joe");
+      setAlias(tx, "node3", "alice");
+
+      tx.commit();
+    }
+
+    addEdge("node1", "node2");
+    Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob"), getDerivedEdges());
+
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bobby");
+      tx.commit();
+    }
+
+    addEdge("node1", "node3");
+    Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alice", "alice:bobby"),
+        getDerivedEdges());
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    int numNodes = 100;
+    int numEdges = 1000;
+    int numAliasChanges = 25;
+
+    Random rand = new Random();
+
+    Map<String, String> nodes = new HashMap<>();
+    while (nodes.size() < numNodes) {
+      nodes.put(String.format("n-%09d", rand.nextInt(1000000000)),
+          String.format("a-%09d", rand.nextInt(1000000000)));
+    }
+
+    List<String> nodesList = new ArrayList<>(nodes.keySet());
+    Set<String> edges = new HashSet<>();
+    while (edges.size() < numEdges) {
+      String n1 = nodesList.get(rand.nextInt(nodesList.size()));
+      String n2 = nodesList.get(rand.nextInt(nodesList.size()));
+      if (n1.equals(n2) || edges.contains(n2 + ":" + n1)) {
+        continue;
+      }
+
+      edges.add(n1 + ":" + n2);
+    }
+
+    try (LoaderExecutor le = client.newLoaderExecutor()) {
+      for (Entry<String, String> entry : nodes.entrySet()) {
+        le.execute((tx, ctx) -> setAlias(tx, entry.getKey(), entry.getValue()));
+      }
+    }
+
+    List<Loader> loadOps = new ArrayList<>();
+    for (String edge : edges) {
+      String[] enodes = edge.split(":");
+      loadOps.add((tx, ctx) -> {
+        try {
+          addEdge(tx, enodes[0], enodes[1]);
+        } catch (NullPointerException e) {
+          // TOOD remove after finding bug
+          System.out.println(
+              " en0 " + enodes[0] + " en1 " + enodes[1] + " start ts " + tx.getStartTimestamp());
+          dumpRow("r:" + enodes[0], System.out::println);
+          dumpRow("r:" + enodes[1], System.out::println);
+          throw e;
+        }
+      });
+    }
+
+    Map<String, String> changes = new HashMap<>();
+    while (changes.size() < numAliasChanges) {
+      String node = nodesList.get(rand.nextInt(nodesList.size()));
+      String alias = String.format("a-%09d", rand.nextInt(1000000000));
+      changes.put(node, alias);
+    }
+
+    Map<String, String> nodes2 = new HashMap<>(nodes);
+    nodes2.putAll(changes);
+
+    changes.forEach((node, alias) -> {
+      loadOps.add((tx, ctx) -> setAlias(tx, node, alias));
+    });
+
+    Collections.shuffle(loadOps, rand);
+
+    FluoConfiguration conf = new FluoConfiguration(config);
+    conf.setLoaderThreads(20);
+    try (FluoClient client = FluoFactory.newClient(conf);
+        LoaderExecutor le = client.newLoaderExecutor()) {
+      loadOps.forEach(loader -> le.execute(loader));
+    }
+
+    Set<String> expectedEdges = new HashSet<>();
+    for (String edge : edges) {
+      String[] enodes = edge.split(":");
+      String alias1 = nodes2.get(enodes[0]);
+      String alias2 = nodes2.get(enodes[1]);
+
+      expectedEdges.add(alias1 + ":" + alias2);
+      expectedEdges.add(alias2 + ":" + alias1);
+    }
+
+    Set<String> actualEdges = getDerivedEdges();
+
+    if (!expectedEdges.equals(actualEdges)) {
+      Path dumpFile = Paths.get("target/ReadLockIT.txt");
+
+      try (BufferedWriter writer = Files.newBufferedWriter(dumpFile)) {
+
+        Consumer<String> out = s -> {
+          try {
+            writer.append(s);
+            writer.append("\n");
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        };
+
+
+        writer.append("Alias changes : \n");
+        Maps.difference(nodes, nodes2).entriesDiffering()
+            .forEach((k, v) -> out.accept(k + " " + v));
+
+        writer.append("expected - actual : \n");
+        Sets.difference(expectedEdges, actualEdges).forEach(out);
+        writer.append("\n");
+
+        writer.append("actual - expected : \n");
+        Sets.difference(actualEdges, expectedEdges).forEach(out);
+        writer.append("\n");
+
+        printSnapshot(out);
+        dumpTable(out);
+      }
+      Assert.fail("Did not produce expected graph, dumped info to " + dumpFile);
+    }
+  }
+
+  private Set<String> getDerivedEdges() {
+    Set<String> derivedEdges = new HashSet<>();
+    try (Snapshot snap = client.newSnapshot()) {
+      snap.scanner().over(Span.prefix("d:")).build().stream().map(RowColumnValue::getsRow)
+          .map(r -> r.substring(2)).forEach(derivedEdges::add);
+    }
+    return derivedEdges;
+  }
+
+  @Test(expected = AlreadySetException.class)
+  public void testReadAndWriteLockInSameTx() {
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bob");
+      setAlias(tx, "node2", "joe");
+
+
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      setAlias(tx, "node1", "bobby");
+      // tries to get a read lock on node1 in same tx
+      addEdge(tx, "node1", "node2");
+    }
+  }
+
+  @Test(expected = AlreadySetException.class)
+  public void testReadAndDeleteInSameTx() {
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("123456", new Column("f", "q"), "abc");
+      tx.commit();
+    }
+
+    try (Transaction tx = client.newTransaction()) {
+      tx.delete("123456", new Column("f", "q"));
+      // should fail here because already have write lock
+      String val = tx.withReadLock().gets("123456", new Column("f", "q"));
+      tx.set("123457", new Column("f", "q"), val + "7");
+      tx.commit();
+    }
+  }
+
+  private static final Column c1 = new Column("f1", "q1");
+  private static final Column c2 = new Column("f1", "q2");
+  private static final Column invCol = new Column("f1", "inv");
+
+  final private void ensureReadLocksSet(Consumer<TransactionBase> readLockOperation) {
+
+    try (Transaction txi = client.newTransaction()) {
+      txi.set("test1", c1, "45");
+      txi.set("test1", c2, "90");
+      txi.set("test2", c1, "30");
+      txi.set("test2", c2, "60");
+      txi.commit();
+    }
+
+
+    List<Consumer<TransactionBase>> writeLockOperations = ImmutableList.of(txw -> {
+      txw.set("test1", c1, "47");
+    }, txw -> {
+      txw.set("test1", c2, "94");
+    }, txw -> {
+      txw.set("test2", c1, "37");
+    }, txw -> {
+      txw.set("test2", c2, "74");
+    });
+
+    List<Transaction> writeTxs = new ArrayList<>();
+    for (Consumer<TransactionBase> wop : writeLockOperations) {
+      Transaction wtx = client.newTransaction();
+      wop.accept(wtx);
+      writeTxs.add(wtx);
+    }
+
+    try (Transaction txr = client.newTransaction()) {
+      readLockOperation.accept(txr);
+      txr.commit();
+    }
+
+    for (Transaction wtx : writeTxs) {
+      try {
+        wtx.commit();
+        Assert.fail();
+      } catch (CommitException ce) {
+      }
+    }
+
+    try (Snapshot snap = client.newSnapshot()) {
+      Assert.assertEquals("45", snap.gets("test1", c1));
+      Assert.assertEquals("90", snap.gets("test1", c2));
+      Assert.assertEquals("test1", snap.gets("45", invCol));
+      Assert.assertEquals("test1", snap.gets("90", invCol));
+      Assert.assertEquals("30", snap.gets("test2", c1));
+      Assert.assertEquals("60", snap.gets("test2", c2));
+      Assert.assertEquals("test2", snap.gets("30", invCol));
+      Assert.assertEquals("test2", snap.gets("60", invCol));
+    }
+  }
+
+  @Test
+  public void testGet() {
+    ensureReadLocksSet(txr -> {
+      // ensure this operation sets two read locks
+      SnapshotBase rlSnap = txr.withReadLock();
+
+      txr.set(rlSnap.gets("test1", c1), invCol, "test1");
+      txr.set(rlSnap.gets("test1", c2), invCol, "test1");
+      txr.set(rlSnap.gets("test2", c1), invCol, "test2");
+      txr.set(rlSnap.gets("test2", c2), invCol, "test2");
+    });
+  }
+
+  @Test
+  public void testGetColumns() {
+    ensureReadLocksSet(txr -> {
+      // ensure this operation sets two read locks
+      Map<Column, String> vals = txr.withReadLock().gets("test1", c1, c2);
+      txr.set(vals.get(c1), invCol, "test1");
+      txr.set(vals.get(c2), invCol, "test1");
+      vals = txr.withReadLock().gets("test2", c1, c2);
+      txr.set(vals.get(c1), invCol, "test2");
+      txr.set(vals.get(c2), invCol, "test2");
+    });
+  }
+
+  @Test
+  public void testGetRowsColumns() {
+    ensureReadLocksSet(txr -> {
+      // ensure this operation sets two read locks
+      Map<String, Map<Column, String>> vals =
+          txr.withReadLock().gets(ImmutableList.of("test1", "test2"), c1, c2);
+      txr.set(vals.get("test1").get(c1), invCol, "test1");
+      txr.set(vals.get("test1").get(c2), invCol, "test1");
+      txr.set(vals.get("test2").get(c1), invCol, "test2");
+      txr.set(vals.get("test2").get(c2), invCol, "test2");
+    });
+  }
+
+  @Test
+  public void testGetRowColumns() {
+    ensureReadLocksSet(txr -> {
+      // ensure this operation sets two read locks
+      Map<RowColumn, String> vals =
+          txr.withReadLock().gets(ImmutableList.of(new RowColumn("test1", c1),
+              new RowColumn("test1", c2), new RowColumn("test2", c1), new RowColumn("test2", c2)));
+      txr.set(vals.get(new RowColumn("test1", c1)), invCol, "test1");
+      txr.set(vals.get(new RowColumn("test1", c2)), invCol, "test1");
+      txr.set(vals.get(new RowColumn("test2", c1)), invCol, "test2");
+      txr.set(vals.get(new RowColumn("test2", c2)), invCol, "test2");
+    });
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testScan() {
+    try (Transaction tx = client.newTransaction()) {
+      tx.withReadLock().scanner().build();
+    }
+  }
+
+  @Test
+  public void testOnlyReadLocks() {
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("r1", new Column("q1", "f1"), "v1");
+      tx.set("r2", new Column("q1", "f1"), "v2");
+      tx.commit();
+    }
+
+    try (Transaction tx1 = client.newTransaction()) {
+      try (Transaction tx2 = client.newTransaction()) {
+        String v1 = tx2.withReadLock().gets("r1", new Column("q1", "f1"));
+        String v2 = tx2.withReadLock().gets("r2", new Column("q1", "f1"));
+
+        Assert.assertEquals("v1", v1);
+        Assert.assertEquals("v2", v2);
+
+        // commit should be a no-op because only read locks
+        tx2.commit();
+      }
+
+      tx1.set("r1", new Column("q1", "f1"), "v3");
+      tx1.set("r2", new Column("q1", "f1"), "v4");
+
+      // should not collide with read locks
+      tx1.commit();
+    }
+    try (Snapshot snap = client.newSnapshot()) {
+      String v1 = snap.gets("r1", new Column("q1", "f1"));
+      String v2 = snap.gets("r2", new Column("q1", "f1"));
+
+      Assert.assertEquals("v3", v1);
+      Assert.assertEquals("v4", v2);
+    }
+  }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
index e5a8d02..0a6bc99 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
@@ -70,7 +70,7 @@
 
     try (Snapshot snap = client.newSnapshot()) {
       HashSet<RowColumnValue> actual = new HashSet<>();
-      Iterables.addAll(actual, snap.scanner().over("r2").build());
+      Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).build());
       Assert.assertEquals(expectedR2, actual);
 
       actual.clear();
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 1736248..0835310 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -18,6 +18,7 @@
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
@@ -592,4 +593,72 @@
     Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
 
   }
+
+  @Test
+  public void testReadLocks() {
+    Column c1 = new Column("f1", "q1");
+    Column c2 = new Column("f1", "q2");
+
+    try (Transaction tx = client.newTransaction()) {
+      tx.set("r1", c1, "v1");
+      tx.set("r1", c2, "v2");
+      tx.set("r2", c1, "v3");
+      tx.set("r2", c2, "v4");
+      tx.commit();
+    }
+
+    Logger logger = Logger.getLogger("fluo.tx");
+
+    StringWriter writer = new StringWriter();
+    WriterAppender appender =
+        new WriterAppender(new PatternLayout("%d{ISO8601} [%-8c{2}] %-5p: %m%n"), writer);
+
+    Level level = logger.getLevel();
+    boolean additivity = logger.getAdditivity();
+
+    try {
+      logger.setLevel(Level.TRACE);
+      logger.setAdditivity(false);
+      logger.addAppender(appender);
+
+      try (Transaction tx = client.newTransaction()) {
+        Assert.assertEquals("v1", tx.withReadLock().gets("r1", c1));
+        Assert.assertEquals(ImmutableMap.of(c1, "v3", c2, "v4"),
+            tx.withReadLock().gets("r2", c1, c2));
+        Assert.assertEquals(ImmutableMap.of(new RowColumn("r1", c2), "v2"),
+            tx.withReadLock().gets(Arrays.asList(new RowColumn("r1", c2))));
+        Map<String, Map<Column, String>> expected = new HashMap<>();
+        expected.computeIfAbsent("r1", k -> new HashMap<>()).put(c1, "v1");
+        expected.computeIfAbsent("r2", k -> new HashMap<>()).put(c1, "v3");
+        Map<String, Map<Column, String>> actual =
+            tx.withReadLock().gets(Arrays.asList("r1", "r2"), ImmutableSet.of(c1));
+        Assert.assertEquals(expected, actual);
+        tx.set("r3", c1, "345");
+        tx.commit();
+      }
+
+    } finally {
+      logger.removeAppender(appender);
+      logger.setAdditivity(additivity);
+      logger.setLevel(level);
+    }
+
+    String origLogMsgs = writer.toString();
+    String logMsgs = origLogMsgs.replace('\n', ' ');
+
+    String pattern = "";
+
+    pattern += ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+    pattern += ".*txid: \\1 \\QwithReadLock().get(r1, f1 q1 ) -> v1\\E";
+    pattern +=
+        ".*txid: \\1 \\QwithReadLock().get(r2, [f1 q1 , f1 q2 ]) -> [f1 q1 =v3, f1 q2 =v4]\\E";
+    pattern += ".*txid: \\1 \\QwithReadLock().get([r1 f1 q2 ]) -> [r1 f1 q2 =v2]\\E";
+    pattern +=
+        ".*txid: \\1 \\QwithReadLock().get([r1, r2], [f1 q1 ]) -> [r1=[f1 q1 =v1], r2=[f1 q1 =v3]]\\E";
+    pattern += ".*txid: \\1 \\Qset(r3, f1 q1 , 345)\\E";
+    pattern += ".*txid: \\1 \\Qcommit()\\E -> SUCCESSFUL commitTs: \\d+";
+    pattern += ".*txid: \\1 \\Qclose()\\E.*";
+
+    Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
+  }
 }