fixes #917 Added read locks (#953)
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
index 056fa8d..d87975a 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java
@@ -22,8 +22,11 @@
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.LockValue;
+import org.apache.fluo.accumulo.values.ReadLockValue;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -104,6 +107,15 @@
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) {
type = "ACK";
}
+ if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+ if (ReadLockUtil.isDelete(ts)) {
+ type = "DEL_RLOCK";
+ } else {
+ type = "RLOCK";
+ }
+ ts = ReadLockUtil.decodeTs(ts);
+ }
+
StringBuilder sb = new StringBuilder();
@@ -115,6 +127,10 @@
} else if (type.equals("LOCK")) {
// TODO can Value be made to extend Bytes w/o breaking API?
val = new LockValue(entry.getValue().get()).toString();
+ } else if (type.equals("RLOCK")) {
+ val = new ReadLockValue(entry.getValue().get()).toString();
+ } else if (type.equals("DEL_RLOCK")) {
+ val = new DelReadLockValue(entry.getValue().get()).toString();
} else {
encNonAscii(sb, entry.getValue().get());
val = sb.toString();
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index 9ca23e9..1814dfa 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -35,8 +35,10 @@
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.WriteValue;
/**
@@ -159,6 +161,7 @@
boolean oldestSeen = false;
boolean sawAck = false;
long firstWrite = -1;
+ long lastReadLockDeleteTs = -1;
truncationTime = -1;
@@ -243,6 +246,38 @@
} else if (complete) {
completeTxs.remove(txDoneTs);
}
+ } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+ boolean keep = false;
+ long rlts = ReadLockUtil.decodeTs(ts);
+ boolean isDelete = ReadLockUtil.isDelete(ts);
+
+ if (isDelete) {
+ lastReadLockDeleteTs = rlts;
+ }
+
+ if (rlts > invalidationTime) {
+ if (isFullMajc) {
+ if (isDelete) {
+ if (DelReadLockValue.isRollback(source.getTopValue().get())) {
+ // can drop rolled back read lock delete markers on any full majc, do not need to consider gcTimestamp
+ keep = false;
+ } else {
+ long rlockCommitTs =
+ DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+ keep = rlockCommitTs >= gcTimestamp;
+ }
+ } else {
+ keep = lastReadLockDeleteTs != rlts;
+ }
+ } else {
+ // can drop deleted read lock entries.. keep the delete entry.
+ keep = isDelete || lastReadLockDeleteTs != rlts;
+ }
+ }
+
+ if (keep) {
+ keys.add(source.getTopKey(), source.getTopValue());
+ }
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
keys.add(source.getTopKey(), source.getTopValue());
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
new file mode 100644
index 0000000..dd4de54
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
+
+import static org.apache.fluo.accumulo.util.ColumnConstants.ACK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.DATA_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.DEL_LOCK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.LOCK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.TIMESTAMP_MASK;
+import static org.apache.fluo.accumulo.util.ColumnConstants.TX_DONE_PREFIX;
+import static org.apache.fluo.accumulo.util.ColumnConstants.WRITE_PREFIX;
+
+public class OpenReadLockIterator implements SortedKeyValueIterator<Key, Value> {
+
+ private TimestampSkippingIterator source;
+
+ private Key lastDelete;
+
+ private void findTop() throws IOException {
+ while (source.hasTop()) {
+
+ long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+
+ if (colType == TX_DONE_PREFIX || colType == WRITE_PREFIX || colType == DEL_LOCK_PREFIX) {
+ source.skipToPrefix(source.getTopKey(), RLOCK_PREFIX);
+ continue;
+ } else if (colType == RLOCK_PREFIX) {
+ if (ReadLockUtil.isDelete(source.getTopKey())) {
+ lastDelete.set(source.getTopKey());
+ } else {
+ if (lastDelete.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+ long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK);
+ long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & TIMESTAMP_MASK);
+
+ if (ts1 != ts2) {
+ // found a read lock that is not suppressed by a delete read lock entry
+ return;
+ }
+ } else {
+ // found a read lock that is not suppressed by a delete read lock entry
+ return;
+ }
+ }
+ source.next();
+ continue;
+ } else if (colType == DATA_PREFIX || colType == LOCK_PREFIX || colType == ACK_PREFIX) {
+ source.skipColumn(source.getTopKey());
+ continue;
+ } else {
+ throw new IllegalArgumentException("Unknown column type " + source.getTopKey());
+ }
+ }
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
+ IteratorEnvironment env) throws IOException {
+ this.source = new TimestampSkippingIterator(source);
+ }
+
+ @Override
+ public boolean hasTop() {
+ return source.hasTop();
+ }
+
+ @Override
+ public void next() throws IOException {
+ source.next();
+ findTop();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+ throws IOException {
+
+ lastDelete = new Key();
+
+ Collection<ByteSequence> fams;
+ if (columnFamilies.isEmpty() && !inclusive) {
+ fams = SnapshotIterator.NOTIFY_CF_SET;
+ inclusive = false;
+ } else {
+ fams = columnFamilies;
+ }
+
+ source.seek(range, fams, inclusive);
+ findTop();
+ }
+
+ @Override
+ public Key getTopKey() {
+ return source.getTopKey();
+ }
+
+ @Override
+ public Value getTopValue() {
+ return source.getTopValue();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
index a5a160d..b6f9a48 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
@@ -28,6 +28,8 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.WriteValue;
/**
@@ -37,6 +39,7 @@
private static final String TIMESTAMP_OPT = "timestampOpt";
private static final String CHECK_ACK_OPT = "checkAckOpt";
private static final String NTFY_TIMESTAMP_OPT = "ntfyTsOpt";
+ private static final String READ_LOCK_OPT = "readLock";
private TimestampSkippingIterator source;
private long snaptime;
@@ -44,6 +47,7 @@
boolean hasTop = false;
boolean checkAck = false;
long ntfyTimestamp = -1;
+ boolean readlock;
public static void setSnaptime(IteratorSetting cfg, long time) {
if (time < 0 || (ColumnConstants.PREFIX_MASK & time) != 0) {
@@ -52,8 +56,12 @@
cfg.addOption(TIMESTAMP_OPT, time + "");
}
+ public static void setReadlock(IteratorSetting cfg) {
+ cfg.addOption(READ_LOCK_OPT, Boolean.TRUE.toString());
+ }
+
public static void enableAckCheck(IteratorSetting cfg, long timestamp) {
- cfg.addOption(CHECK_ACK_OPT, "true");
+ cfg.addOption(CHECK_ACK_OPT, Boolean.TRUE.toString());
cfg.addOption(NTFY_TIMESTAMP_OPT, timestamp + "");
}
@@ -66,6 +74,8 @@
this.checkAck = Boolean.parseBoolean(options.get(CHECK_ACK_OPT));
this.ntfyTimestamp = Long.parseLong(options.get(NTFY_TIMESTAMP_OPT));
}
+
+ this.readlock = Boolean.parseBoolean(options.getOrDefault(READ_LOCK_OPT, "false"));
}
@Override
@@ -140,7 +150,58 @@
}
}
- source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
+ if (readlock) {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
+ } else {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.RLOCK_PREFIX);
+ }
+ } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+
+ long lastDeleteTs = -1;
+ long rlts = ReadLockUtil.decodeTs(ts);
+
+ if (!readlock) {
+ while (rlts > invalidationTime && colType == ColumnConstants.RLOCK_PREFIX) {
+ if (ReadLockUtil.isDelete(ts)) {
+ // ignore rolled back read locks, these should never prevent a write lock
+ if (!DelReadLockValue.isRollback(source.getTopValue().get())) {
+ if (rlts >= snaptime) {
+ hasTop = true;
+ return;
+ } else {
+ long rlockCommitTs =
+ DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
+ if (rlockCommitTs > snaptime) {
+ hasTop = true;
+ return;
+ }
+ }
+ }
+
+
+ lastDeleteTs = rlts;
+ } else {
+ if (rlts != lastDeleteTs) {
+ // this read lock is active
+ hasTop = true;
+ return;
+ }
+ }
+
+ source.next();
+ if (source.hasTop()) {
+ colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+ ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+ rlts = ReadLockUtil.decodeTs(ts);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (source.hasTop() && (colType == ColumnConstants.RLOCK_PREFIX)) {
+ source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX);
+ }
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
// nothing supersedes this lock, therefore the column is locked
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
index 962f172..cae383f 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java
@@ -129,6 +129,9 @@
continue;
}
+ } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+ source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+ continue;
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
// nothing supersedes this lock, therefore the column is locked
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
index 3c6939c..27e63ef 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
@@ -22,6 +22,7 @@
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
@@ -38,6 +39,9 @@
@VisibleForTesting
static final String TIMESTAMP_OPT = "timestampOpt";
+
+ static final String RETURN_READLOCK_PRESENT_OPT = "rrlpOpt";
+
private static final ByteSequence NOTIFY_CF_BS =
new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
@@ -47,10 +51,35 @@
private long snaptime;
private boolean hasTop = false;
+ private boolean returnReadLockPresent = false;
+
private final Key curCol = new Key();
+ private Key readLockIgnore;
+ private Key readLockKey;
+ private Value readLockValue;
+
+ private void rememberReadLock(Key key, Value val) {
+ Preconditions.checkState(readLockKey == null && readLockValue == null);
+ if (readLockIgnore == null
+ || !key.equals(readLockIgnore, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+ readLockKey = new Key(key);
+ readLockValue = new Value(val);
+ }
+ }
+
+ private void ignoreReadLock(Key key) {
+ readLockIgnore = key;
+ }
+
+ private void clearReadLock() {
+ readLockKey = null;
+ readLockValue = null;
+ readLockIgnore = null;
+ }
+
private void findTop() throws IOException {
- outer: while (source.hasTop()) {
+ outer: while (source.hasTop() && readLockKey == null) {
long invalidationTime = -1;
long dataPointer = -1;
@@ -89,6 +118,18 @@
if (ts > invalidationTime) {
invalidationTime = ts;
}
+ if (returnReadLockPresent) {
+ source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX);
+ } else {
+ source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
+ }
+ continue;
+
+ } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+ if (returnReadLockPresent) {
+ rememberReadLock(source.getTopKey(), source.getTopValue());
+ }
+
source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
continue;
} else if (colType == ColumnConstants.LOCK_PREFIX) {
@@ -142,21 +183,26 @@
IteratorEnvironment env) throws IOException {
this.source = new TimestampSkippingIterator(source);
this.snaptime = Long.parseLong(options.get(TIMESTAMP_OPT));
+ this.returnReadLockPresent =
+ Boolean.parseBoolean(options.getOrDefault(RETURN_READLOCK_PRESENT_OPT, "false"));
// TODO could require client to send version as a sanity check
}
@Override
public boolean hasTop() {
- return hasTop && source.hasTop();
+ return hasTop && (readLockKey != null || source.hasTop());
}
@Override
public void next() throws IOException {
- curCol.set(source.getTopKey());
- source.skipColumn(curCol);
+ if (readLockKey != null) {
+ clearReadLock();
+ } else {
+ curCol.set(source.getTopKey());
+ source.skipColumn(curCol);
- findTop();
-
+ findTop();
+ }
}
@Override
@@ -167,16 +213,28 @@
Collection<ByteSequence> cols;
boolean inc;
+ clearReadLock();
+
// handle continue case
hasTop = true;
if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE
&& !range.isStartKeyInclusive()) {
- Key nextCol = range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
- if (range.afterEndKey(nextCol)) {
- hasTop = false;
- return;
+
+ if ((range.getStartKey().getTimestamp()
+ & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+ Key currCol = new Key(range.getStartKey());
+ currCol.setTimestamp(Long.MAX_VALUE);
+ newRange = new Range(currCol, true, range.getEndKey(), range.isEndKeyInclusive());
+ ignoreReadLock(currCol);
} else {
- newRange = new Range(nextCol, true, range.getEndKey(), range.isEndKeyInclusive());
+
+ Key nextCol = range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
+ if (range.afterEndKey(nextCol)) {
+ hasTop = false;
+ return;
+ } else {
+ newRange = new Range(nextCol, true, range.getEndKey(), range.isEndKeyInclusive());
+ }
}
} else {
newRange = range;
@@ -196,12 +254,20 @@
@Override
public Key getTopKey() {
- return source.getTopKey();
+ if (readLockKey != null) {
+ return readLockKey;
+ } else {
+ return source.getTopKey();
+ }
}
@Override
public Value getTopValue() {
- return source.getTopValue();
+ if (readLockValue != null) {
+ return readLockValue;
+ } else {
+ return source.getTopValue();
+ }
}
@Override
@@ -215,4 +281,8 @@
}
cfg.addOption(TIMESTAMP_OPT, time + "");
}
+
+ public static void setReturnReadLockPresent(IteratorSetting cfg, boolean rrlp) {
+ cfg.addOption(RETURN_READLOCK_PRESENT_OPT, rrlp + "");
+ }
}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index 4da1a48..c38b9e6 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -26,6 +26,7 @@
public static final long TX_DONE_PREFIX = 0x6000000000000000L;
public static final long WRITE_PREFIX = 0x4000000000000000L;
public static final long DEL_LOCK_PREFIX = 0x2000000000000000L;
+ public static final long RLOCK_PREFIX = 0x0000000000000000L;
public static final long LOCK_PREFIX = 0xe000000000000000L;
public static final long ACK_PREFIX = 0xc000000000000000L;
public static final long DATA_PREFIX = 0xa000000000000000L;
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ReadLockUtil.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ReadLockUtil.java
new file mode 100644
index 0000000..654ebc1
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ReadLockUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.data.Key;
+
+public class ReadLockUtil {
+ private static final long DEL_MASK = 0x0000000000000001L;
+
+ public static boolean isDelete(Key k) {
+ return isDelete(k.getTimestamp());
+ }
+
+ public static boolean isDelete(long ts) {
+ return (ts & DEL_MASK) == DEL_MASK;
+ }
+
+ public static long encodeTs(long ts, boolean isDelete) {
+ Preconditions.checkArgument((ts & ColumnConstants.PREFIX_MASK) == 0);
+ return ts << 1 | (isDelete ? 1 : 0);
+ }
+
+ public static long decodeTs(Key k) {
+ return decodeTs(k.getTimestamp());
+ }
+
+ public static long decodeTs(long ts) {
+ return ts >> 1;
+ }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/DelReadLockValue.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/DelReadLockValue.java
new file mode 100644
index 0000000..58bf760
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/DelReadLockValue.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.values;
+
+import org.apache.fluo.accumulo.util.ByteArrayUtil;
+
+public class DelReadLockValue {
+
+ long commitTs = -1;
+ boolean rollback;
+
+ public DelReadLockValue(byte[] value) {
+ this.rollback = isRollback(value);
+ if (!this.rollback) {
+ this.commitTs = getCommitTimestamp(value);
+ }
+ }
+
+
+ public static byte[] encodeRollback() {
+ byte[] ba = new byte[1];
+ ba[0] = (byte) 1;
+ return ba;
+ }
+
+ public static byte[] encodeCommit(long commitTs) {
+ byte[] ba = new byte[9];
+ ba[0] = (byte) 0;
+ ByteArrayUtil.encode(ba, 1, commitTs);
+ return ba;
+ }
+
+ public static boolean isRollback(byte[] data) {
+ return (data[0] & 0x01) == 1;
+ }
+
+ public static long getCommitTimestamp(byte[] data) {
+ return ByteArrayUtil.decodeLong(data, 1);
+ }
+
+ @Override
+ public String toString() {
+ return commitTs + (rollback ? " ABORT" : " COMMIT");
+ }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/ReadLockValue.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/ReadLockValue.java
new file mode 100644
index 0000000..73451ae
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/values/ReadLockValue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.values;
+
+import java.util.List;
+
+import org.apache.fluo.accumulo.util.ByteArrayUtil;
+import org.apache.fluo.accumulo.util.LongUtil;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+import static org.apache.fluo.accumulo.format.FluoFormatter.encNonAscii;
+
+public class ReadLockValue {
+ private final Bytes prow;
+ private final Column pcol;
+ private final Long transactor;
+
+ public ReadLockValue(byte[] enc) {
+ List<Bytes> fields = ByteArrayUtil.split(enc);
+
+ if (fields.size() != 5) {
+ throw new IllegalArgumentException("more fields than expected");
+ }
+
+ this.prow = fields.get(0);
+ this.pcol = new Column(fields.get(1), fields.get(2), fields.get(3));
+ this.transactor = ByteArrayUtil.decodeLong(fields.get(4).toArray());
+ }
+
+ public Bytes getPrimaryRow() {
+ return prow;
+ }
+
+ public Column getPrimaryColumn() {
+ return pcol;
+ }
+
+ public Long getTransactor() {
+ return transactor;
+ }
+
+ public static byte[] encode(Bytes prow, Column pcol, Long transactor) {
+ return ByteArrayUtil.concat(prow, pcol.getFamily(), pcol.getQualifier(), pcol.getVisibility(),
+ Bytes.of(ByteArrayUtil.encode(transactor)));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ encNonAscii(sb, prow);
+ sb.append(" ");
+ encNonAscii(sb, pcol.getFamily());
+ sb.append(" ");
+ encNonAscii(sb, pcol.getQualifier());
+ sb.append(" ");
+ encNonAscii(sb, pcol.getVisibility());
+ sb.append(" ");
+ sb.append(LongUtil.toMaxRadixString(transactor));
+
+ return sb.toString();
+ }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/CountingIterator.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/CountingIterator.java
new file mode 100644
index 0000000..a2517be
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/CountingIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+
+public class CountingIterator extends SortedMapIterator {
+
+ public static class Counter {
+ public int nextCalls;
+ public int seeks;
+
+ public void reset() {
+ nextCalls = 0;
+ seeks = 0;
+ }
+ }
+
+ private Counter counter;
+
+ CountingIterator(Counter counter, SortedMap<Key, Value> data) {
+ super(data);
+ this.counter = counter;
+ }
+
+ @Override
+ public void next() throws IOException {
+ super.next();
+ if (counter != null) {
+ counter.nextCalls++;
+ }
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+ throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+ if (counter != null) {
+ counter.seeks++;
+ }
+ }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
index a1600d8..591fdce 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
@@ -597,4 +597,104 @@
Assert.assertEquals(expected, output);
}
+
+ @Test
+ public void testDeletedReadLocks() {
+
+ TestData input = new TestData();
+
+ input.add("0 f q RLOCK 42", "0 f q");
+ input.add("0 f q DEL_RLOCK 42", "50");
+ input.add("0 f q RLOCK 45", "0 f q");
+ input.add("0 f q DEL_RLOCK 45", "53");
+ input.add("0 f q RLOCK 49", "0 f q");
+ input.add("0 f q DEL_RLOCK 49", "ROLLBACK");
+
+ for (long oldestActiveTs : new long[] {20, 40, 42, 45, 46, 49, 50, 51, 52, 53, 54, 70}) {
+ TestData expected = new TestData();
+ if (oldestActiveTs <= 53) {
+ expected.add("0 f q DEL_RLOCK 45", "53");
+ }
+ if (oldestActiveTs <= 50) {
+ expected.add("0 f q DEL_RLOCK 42", "50");
+ }
+
+ TestData output = new TestData(newGCI(input, oldestActiveTs, true));
+ Assert.assertEquals(expected, output);
+
+
+ expected.add("0 f q DEL_RLOCK 45", "53");
+ expected.add("0 f q DEL_RLOCK 42", "50");
+ expected.add("0 f q DEL_RLOCK 49", "ROLLBACK");
+
+ output = new TestData(newGCI(input, oldestActiveTs, false));
+ Assert.assertEquals(expected, output);
+ }
+
+ input.add("0 f q RLOCK 47", "0 f q");
+ input.add("0 f q RLOCK 40", "0 f q");
+
+ for (long oldestActiveTs : new long[] {20, 40, 42, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
+ 70}) {
+ TestData expected = new TestData();
+ expected.add("0 f q RLOCK 47", "0 f q");
+ expected.add("0 f q RLOCK 40", "0 f q");
+
+ if (oldestActiveTs <= 53) {
+ expected.add("0 f q DEL_RLOCK 45", "53");
+ }
+ if (oldestActiveTs <= 50) {
+ expected.add("0 f q DEL_RLOCK 42", "50");
+ }
+
+ TestData output = new TestData(newGCI(input, oldestActiveTs, true));
+ Assert.assertEquals(expected, output);
+
+ expected.add("0 f q DEL_RLOCK 45", "53");
+ expected.add("0 f q DEL_RLOCK 42", "50");
+ expected.add("0 f q DEL_RLOCK 49", "ROLLBACK");
+ output = new TestData(newGCI(input, oldestActiveTs, false));
+ Assert.assertEquals(expected, output);
+ }
+ }
+
+ @Test
+ public void testInvalidatedReadLocks() {
+ // a write or delete lock invalidates all read locks, so they can be dropped on partial or full
+ // compactions
+ TestData input = new TestData();
+
+ input.add("0 f q RLOCK 42", "0 f q");
+ input.add("0 f q DEL_RLOCK 42", "50");
+ input.add("0 f q RLOCK 45", "0 f q");
+ input.add("0 f q DEL_RLOCK 45", "53");
+ input.add("0 f q RLOCK 60", "0 f q");
+ input.add("0 f q DEL_RLOCK 60", "70");
+ input.add("1 f q RLOCK 42", "0 f q");
+ input.add("1 f q DEL_RLOCK 42", "50");
+
+ TestData input2 = new TestData(input);
+ input2.add("0 f q WRITE 56", "55");
+ input2.add("0 f q DATA 55", "19");
+
+ TestData expected = new TestData();
+ expected.add("0 f q WRITE 56", "55");
+ expected.add("0 f q DATA 55", "19");
+
+ // invalidation time or oldest active should get rid of all read locks
+ TestData output = new TestData(newGCI(input2, 75, true));
+ Assert.assertEquals(expected, output);
+
+ // only invalidation should get rid of read locks in following three cases
+ expected.add("0 f q DEL_RLOCK 60", "70");
+ expected.add("1 f q DEL_RLOCK 42", "50"); // should not be invalidated, in diff column
+ output = new TestData(newGCI(input2, 75, false));
+ Assert.assertEquals(expected, output);
+
+ output = new TestData(newGCI(input2, 30, true));
+ Assert.assertEquals(expected, output);
+
+ output = new TestData(newGCI(input2, 30, false));
+ Assert.assertEquals(expected, output);
+ }
}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/OpenReadLockIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/OpenReadLockIteratorTest.java
new file mode 100644
index 0000000..08b4ee0
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/OpenReadLockIteratorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OpenReadLockIteratorTest {
+ OpenReadLockIterator newORLI(TestData input) {
+ OpenReadLockIterator si = new OpenReadLockIterator();
+
+ IteratorEnvironment env = TestIteratorEnv.create(IteratorScope.scan, true);
+
+ try {
+ SortedKeyValueIterator<Key, Value> source = new SortedMapIterator(input.data);
+ si.init(source, Collections.emptyMap(), env);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return si;
+ }
+
+ @Test
+ public void testBasic() {
+ TestData input = new TestData();
+
+ input.add("0 f q LOCK 11", "1 f q");
+ input.add("0 f q WRITE 16", "11");
+ input.add("0 f q DATA 11", "15");
+ input.add("0 f q DEL_RLOCK 25", "36");
+ input.add("0 f q RLOCK 25", " 0 f q");
+ input.add("0 f q RLOCK 23", " 0 f q");
+ input.add("0 f q DEL_RLOCK 21", "30");
+ input.add("0 f q RLOCK 21", " 0 f q");
+
+ input.add("1 f q1 TX_DONE 16", "11");
+ input.add("1 f q LOCK 11", "1 f q");
+ input.add("1 f q WRITE 16", "11");
+ input.add("1 f q DATA 11", "15");
+ input.add("1 f q RLOCK 25", " 0 f q");
+ input.add("1 f q RLOCK 23", " 0 f q");
+ input.add("1 f q DEL_RLOCK 21", "30");
+ input.add("1 f q RLOCK 21", " 0 f q");
+ input.add("1 f q ACK 11", "");
+
+ input.add("3 f q LOCK 11", "1 f q");
+ input.add("3 f q WRITE 16", "11");
+ input.add("3 f q DATA 11", "15");
+ input.add("3 f q DEL_RLOCK 25", "36");
+ input.add("3 f q RLOCK 25", " 0 f q");
+ input.add("3 f q DEL_RLOCK 23", "33");
+ input.add("3 f q RLOCK 23", " 0 f q");
+ input.add("3 f q DEL_RLOCK 21", "30");
+ input.add("3 f q RLOCK 21", " 0 f q");
+
+ TestData expected = new TestData();
+ expected.add("0 f q RLOCK 23", " 0 f q");
+ expected.add("1 f q RLOCK 25", " 0 f q");
+ expected.add("1 f q RLOCK 23", " 0 f q");
+
+ TestData output = new TestData(newORLI(input));
+ Assert.assertEquals(expected, output);
+
+ output = new TestData(newORLI(input), new Range(), true);
+ Assert.assertEquals(expected, output);
+ }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java
index 959f224..ad601f5 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/PrewriteIteratorTest.java
@@ -21,12 +21,29 @@
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedMapIterator;
import org.junit.Assert;
import org.junit.Test;
public class PrewriteIteratorTest {
+ PrewriteIterator newPI(TestData input, long snapTime, boolean readlock) {
+ PrewriteIterator ni = new PrewriteIterator();
+
+ IteratorEnvironment env = TestIteratorEnv.create(IteratorScope.scan, false);
+
+ try {
+ IteratorSetting cfg = new IteratorSetting(10, PrewriteIterator.class);
+ PrewriteIterator.setSnaptime(cfg, snapTime);
+ if (readlock) {
+ PrewriteIterator.setReadlock(cfg);
+ }
+ ni.init(input.getIterator(), cfg.getOptions(), env);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ni;
+ }
+
PrewriteIterator newPI(TestData input, long snapTime) {
PrewriteIterator ni = new PrewriteIterator();
@@ -35,7 +52,7 @@
try {
IteratorSetting cfg = new IteratorSetting(10, PrewriteIterator.class);
PrewriteIterator.setSnaptime(cfg, snapTime);
- ni.init(new SortedMapIterator(input.data), cfg.getOptions(), env);
+ ni.init(input.getIterator(), cfg.getOptions(), env);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -51,7 +68,7 @@
IteratorSetting cfg = new IteratorSetting(10, PrewriteIterator.class);
PrewriteIterator.setSnaptime(cfg, snapTime);
PrewriteIterator.enableAckCheck(cfg, ntfyTime);
- ni.init(new SortedMapIterator(input.data), cfg.getOptions(), env);
+ ni.init(input.getIterator(), cfg.getOptions(), env);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -238,4 +255,219 @@
output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
Assert.assertEquals(0, output.data.size());
}
+
+ @Test
+ public void testReadLockPreventsWriteLock() {
+
+ TestData[] initialInputs = new TestData[4];
+
+ initialInputs[0] = new TestData();
+
+ initialInputs[1] = new TestData();
+ initialInputs[1].add("0 f q WRITE 10", "5");
+ initialInputs[1].add("0 f q LOCK 5", "0 f q");
+ initialInputs[1].add("0 f q DATA 5", "15");
+
+ initialInputs[2] = new TestData();
+ initialInputs[2].add("0 f q WRITE 10", "5");
+ initialInputs[2].add("0 f q DATA 5", "15");
+
+ initialInputs[3] = new TestData();
+ initialInputs[3].add("0 f q DEL_LOCK 5", "ABORT");
+
+ for (TestData input : initialInputs) {
+
+ input.add("0 f q RLOCK 42", "0 f q");
+
+ TestData expected = new TestData();
+ expected.add("0 f q RLOCK 42", "0 f q");
+
+ for (int ts : new int[] {40, 45}) {
+ TestData output = new TestData(newPI(input, ts), Range.exact("0", "f", "q"));
+ Assert.assertEquals(expected, output);
+ }
+
+ input.add("0 f q DEL_RLOCK 42", "50");
+
+ expected = new TestData();
+ expected.add("0 f q DEL_RLOCK 42", "50");
+
+ for (int ts : new int[] {40, 45}) {
+ TestData output = new TestData(newPI(input, ts), Range.exact("0", "f", "q"));
+ Assert.assertEquals(expected, output);
+ }
+
+ TestData output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
+ Assert.assertEquals(0, output.data.size());
+
+ input.add("0 f q RLOCK 30", "0 f q");
+
+ expected = new TestData();
+ expected.add("0 f q RLOCK 30", "0 f q");
+
+ output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
+ Assert.assertEquals(expected, output);
+
+ input.add("0 f q DEL_RLOCK 30", "60");
+
+ output = new TestData(newPI(input, 117), Range.exact("0", "f", "q"));
+ Assert.assertEquals(0, output.data.size());
+
+ output = new TestData(newPI(input, 55), Range.exact("0", "f", "q"));
+ expected = new TestData();
+ expected.add("0 f q DEL_RLOCK 30", "60");
+ Assert.assertEquals(expected, output);
+
+ expected = new TestData();
+ expected.add("0 f q DEL_RLOCK 42", "50");
+
+ for (int ts : new int[] {20, 40, 45}) {
+ output = new TestData(newPI(input, ts), Range.exact("0", "f", "q"));
+ Assert.assertEquals(expected, output);
+ }
+
+ }
+ }
+
+ @Test
+ public void testOnlyDelReadLocks() {
+ //Garbage collection iter may drop read locks and leave del_read_lock entries. Ensure this case works as expected.
+ TestData input = new TestData();
+ input.add("0 f q DEL_RLOCK 42", "50");
+ input.add("0 f q DEL_RLOCK 39", "44");
+
+ input.add("0 f q WRITE 10", "5");
+ input.add("0 f q LOCK 5", "0 f q");
+ input.add("0 f q DATA 5", "15");
+
+ for (long startTs : new long[] {31, 40, 47}) {
+ TestData expected = new TestData();
+ expected.add("0 f q DEL_RLOCK 42", "50");
+
+ TestData output = new TestData(newPI(input, startTs), Range.exact("0", "f", "q"));
+
+ Assert.assertEquals(expected, output);
+ }
+
+ TestData output = new TestData(newPI(input, 55), Range.exact("0", "f", "q"));
+ Assert.assertEquals(0, output.data.size());
+ }
+
+ @Test
+ public void testWriteLockPreventsReadLock() {
+ for (int i = 0; i < 2; i++) {
+ TestData input = new TestData();
+
+ if (i == 1) {
+ // this should have no impact
+ input.add("0 f q RLOCK 13", "0 f q");
+ }
+
+ input.add("0 f q LOCK 5", "0 f q");
+ input.add("0 f q DATA 5", "15");
+
+ TestData output = new TestData(newPI(input, 117, true), Range.exact("0", "f", "q"));
+ TestData expected = new TestData();
+ expected.add("0 f q LOCK 5", "0 f q");
+ Assert.assertEquals(expected, output);
+
+ input.add("0 f q WRITE 10", "5");
+ output = new TestData(newPI(input, 117, true), Range.exact("0", "f", "q"));
+ Assert.assertEquals(0, output.data.size());
+
+ output = new TestData(newPI(input, 11, true), Range.exact("0", "f", "q"));
+ Assert.assertEquals(0, output.data.size());
+
+ output = new TestData(newPI(input, 7, true), Range.exact("0", "f", "q"));
+ expected = new TestData();
+ expected.add("0 f q WRITE 10", "5");
+ Assert.assertEquals(expected, output);
+
+ output = new TestData(newPI(input, 4, true), Range.exact("0", "f", "q"));
+ Assert.assertEquals(expected, output);
+ }
+ }
+
+ @Test
+ public void testManyReadLocks() {
+ TestData input = new TestData();
+
+ input.add("0 f q WRITE 10", "5");
+ input.add("0 f q DATA 5", "15");
+
+ for (int i = 10; i < 2010; i += 2) {
+ input.add("0 f q DEL_RLOCK " + i, "" + (i + 1));
+ input.add("0 f q RLOCK " + i, "0 f q");
+ }
+
+ TestData output = new TestData(newPI(input, 3000, false), Range.exact("0", "f", "q"));
+ // scans all read locks looking for an active lock
+ Assert.assertEquals(2001, input.counter.nextCalls);
+ Assert.assertEquals(0, output.data.size());
+
+ // read locks do not need to scan for other read locks... this checks that read locks are
+ // skipped
+ input.counter.reset();
+ output = new TestData(newPI(input, 3000, true), Range.exact("0", "f", "q"));
+ // read locks should not scan everything looking for a read lock
+ Assert.assertEquals(12, input.counter.nextCalls); // skipping will read 11 before seeking
+ Assert.assertEquals(0, output.data.size());
+
+ // This write invalidates the 2000 read locks, so should skip read locks
+ input.add("0 f q WRITE 2500", "2490");
+ input.add("0 f q DATA 2490", "16");
+ input.counter.reset();
+ output = new TestData(newPI(input, 3000, false), Range.exact("0", "f", "q"));
+ Assert.assertEquals(13, input.counter.nextCalls);
+ Assert.assertEquals(0, output.data.size());
+ }
+
+ @Test
+ public void testGarbageCollectedReadLocks() {
+ // After a partial compaction the garbage collection iterator will drop read lock entries, but
+ // not del_read_locks entries. This test ensures that prewrite works w/ this.
+
+ TestData input = new TestData();
+
+ input.add("0 f q DEL_RLOCK 42", "50");
+ input.add("0 f q DEL_RLOCK 44", "51");
+
+
+ TestData output = new TestData(newPI(input, 47), Range.exact("0", "f", "q"));
+ TestData expected = new TestData();
+
+ expected.add("0 f q DEL_RLOCK 44", "51");
+
+ Assert.assertEquals(expected, output);
+
+ output = new TestData(newPI(input, 43), Range.exact("0", "f", "q"));
+ Assert.assertEquals(expected, output);
+ }
+
+ @Test
+ public void testAbortedReadLock() {
+ // A read lock that was aborted or rolledback should not prevent a write lock
+ TestData input = new TestData();
+
+ input.add("0 f q DEL_RLOCK 55", "ABORT");
+ input.add("0 f q DEL_RLOCK 42", "50");
+
+ for (int i = 0; i < 2; i++) {
+ for (long startTs : new long[] {31, 40, 47}) {
+ TestData expected = new TestData();
+ expected.add("0 f q DEL_RLOCK 42", "50");
+
+ TestData output = new TestData(newPI(input, startTs), Range.exact("0", "f", "q"));
+
+ Assert.assertEquals(expected, output);
+ }
+
+ for (long startTs : new long[] {51, 55, 56}) {
+ TestData output = new TestData(newPI(input, startTs), Range.exact("0", "f", "q"));
+ Assert.assertEquals(0, output.data.size());
+ }
+ //add this for 2nd iteration, should ignore because delete rolls back
+ input.add("0 f q RLOCK 55", "0 f q");
+ }
+ }
}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
index 4605923..4e0c27d 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
@@ -31,10 +31,11 @@
import org.junit.Test;
public class SnapshotIteratorTest {
- SnapshotIterator newSI(TestData input, long startTs) {
+ SnapshotIterator newSI(TestData input, long startTs, boolean returnReadLocks) {
SnapshotIterator si = new SnapshotIterator();
Map<String, String> options = new HashMap<>();
+ options.put(SnapshotIterator.RETURN_READLOCK_PRESENT_OPT, returnReadLocks + "");
options.put(SnapshotIterator.TIMESTAMP_OPT, startTs + "");
IteratorEnvironment env = TestIteratorEnv.create(IteratorScope.scan, true);
@@ -48,6 +49,10 @@
return si;
}
+ SnapshotIterator newSI(TestData input, long startTs) {
+ return newSI(input, startTs, true);
+ }
+
@Test
public void testBasic() {
TestData input = new TestData();
@@ -61,13 +66,11 @@
TestData output = new TestData(newSI(input, 6));
Assert.assertEquals(0, output.data.size());
- output = new TestData(newSI(input, 11));
TestData expected = new TestData().add("0 f q DATA 9", "14");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 11);
- output = new TestData(newSI(input, 17));
expected = new TestData().add("0 f q DATA 11", "15");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 17);
}
@Test
@@ -85,13 +88,11 @@
output = new TestData(newSI(input, 15));
Assert.assertEquals(0, output.data.size());
- output = new TestData(newSI(input, 17));
TestData expected = new TestData().add("0 f q DATA 11", "15");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 17);
- output = new TestData(newSI(input, 22));
expected = new TestData().add("0 f q LOCK 21", "1 f q");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 22);
}
@Test
@@ -115,13 +116,11 @@
output = new TestData(newSI(input, 15));
Assert.assertEquals(0, output.data.size());
- output = new TestData(newSI(input, 17));
TestData expected = new TestData().add("0 f q DATA 11", "15");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 17);
- output = new TestData(newSI(input, 23));
expected = new TestData().add("0 f q DATA 11", "15");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 23);
// test case where there is newer lock thats not invalidated by DEL_LOCK
@@ -137,17 +136,14 @@
output = new TestData(newSI(input, 6));
Assert.assertEquals(0, output.data.size());
- output = new TestData(newSI(input, 17));
expected = new TestData().add("0 f q DATA 11", "15");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 17);
- output = new TestData(newSI(input, 19));
expected = new TestData().add("0 f q DATA 11", "15");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 19);
- output = new TestData(newSI(input, 23));
expected = new TestData().add("0 f q LOCK 21", "1 f q");
- Assert.assertEquals(expected, output);
+ checkInput(input, expected, 23);
}
@Test
@@ -253,7 +249,54 @@
for (Range range : ranges) {
checkManyColumnData(input, numToWrite, range);
}
+ }
+ @Test
+ public void testReadLock() {
+ TestData input = new TestData();
+
+ input.add("0 f q WRITE 16", "11 DELETE");
+ input.add("0 f q DATA 11", "15");
+ input.add("0 f q DEL_RLOCK 5", "6");
+ input.add("0 f q RLOCK 5", " 0 f q");
+
+ input.add("1 f q WRITE 16", "11");
+ input.add("1 f q DATA 11", "15");
+
+ input.add("2 f q WRITE 16", "11");
+ input.add("2 f q DATA 11", "17");
+ input.add("2 f q DEL_RLOCK 5", "6");
+ input.add("2 f q RLOCK 5", " 0 f q");
+
+
+ TestData expected = new TestData();
+ expected.add("0 f q DATA 11", "15");
+ expected.add("1 f q DATA 11", "15");
+ expected.add("2 f q DATA 11", "17");
+
+
+ checkInput(input, expected, 20, false);
+
+ expected.add("0 f q DEL_RLOCK 5", "6");
+ expected.add("2 f q DEL_RLOCK 5", "6");
+
+ checkInput(input, expected, 20);
+
+ }
+
+ private void checkInput(TestData input, TestData expected, long startTs) {
+ checkInput(input, expected, startTs, true);
+ }
+
+ private void checkInput(TestData input, TestData expected, long startTs,
+ boolean returnReadLocks) {
+ // run test with a single seek followed by many next calls
+ TestData output = new TestData(newSI(input, startTs, returnReadLocks), new Range());
+ Assert.assertEquals(expected, output);
+
+ // run test reseeking after each key
+ output = new TestData(newSI(input, startTs, returnReadLocks), new Range(), true);
+ Assert.assertEquals(expected, output);
}
private void checkManyColumnData(TestData input, int numToWrite, Range range) throws IOException {
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index 2ea42b3..262df15 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -28,8 +28,11 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.format.FluoFormatter;
+import org.apache.fluo.accumulo.iterators.CountingIterator.Counter;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.LockValue;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
@@ -37,6 +40,7 @@
public class TestData {
TreeMap<Key, Value> data = new TreeMap<>();
+ Counter counter = new Counter();
TestData() {}
@@ -44,19 +48,29 @@
data.putAll(td.data);
}
- TestData(SortedKeyValueIterator<Key, Value> iter, Range range) {
+ TestData(SortedKeyValueIterator<Key, Value> iter, Range range, boolean reseek) {
try {
iter.seek(range, new HashSet<ByteSequence>(), false);
while (iter.hasTop()) {
data.put(iter.getTopKey(), iter.getTopValue());
- iter.next();
+ if (reseek) {
+ iter.seek(
+ new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive()),
+ new HashSet<ByteSequence>(), false);
+ } else {
+ iter.next();
+ }
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ TestData(SortedKeyValueIterator<Key, Value> iter, Range range) {
+ this(iter, range, false);
+ }
+
TestData(SortedKeyValueIterator<Key, Value> iter) {
this(iter, new Range());
}
@@ -116,6 +130,21 @@
val = DelLockValue.encodeCommit(commitTs, value.contains("PRIMARY"));
}
break;
+ case "RLOCK":
+ ts = ReadLockUtil.encodeTs(ts, false);
+ ts |= ColumnConstants.RLOCK_PREFIX;
+ break;
+ case "DEL_RLOCK":
+ ts = ReadLockUtil.encodeTs(ts, true);
+ ts |= ColumnConstants.RLOCK_PREFIX;
+
+ if (value.contains("ROLLBACK") || value.contains("ABORT")) {
+ val = DelReadLockValue.encodeRollback();
+ } else {
+ long commitTs = Long.parseLong(value.split("\\s+")[0]);
+ val = DelReadLockValue.encodeCommit(commitTs);
+ }
+ break;
case "ntfy":
break;
default:
@@ -163,4 +192,8 @@
public int hashCode() {
return Objects.hashCode(data);
}
+
+ public SortedKeyValueIterator<Key, Value> getIterator() {
+ return new CountingIterator(counter, data);
+ }
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
index 69780b2..1ba7f9b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractSnapshotBase.java
@@ -49,6 +49,15 @@
*/
private Map<String, Bytes> s2bCache = new WeakHashMap<String, Bytes>();
+ public AbstractSnapshotBase() {}
+
+ /**
+ * @since 1.2.0
+ */
+ protected AbstractSnapshotBase(AbstractSnapshotBase other) {
+ this.s2bCache = other.s2bCache;
+ }
+
Bytes s2bConv(CharSequence cs) {
Objects.requireNonNull(cs);
if (cs instanceof String) {
@@ -64,6 +73,7 @@
}
}
+ @Override
public Bytes get(Bytes row, Column column, Bytes defaultValue) {
Bytes ret = get(row, column);
if (ret == null) {
@@ -73,19 +83,23 @@
return ret;
}
+ @Override
public Map<Column, Bytes> get(Bytes row, Column... columns) {
return get(row, ImmutableSet.copyOf(columns));
}
+ @Override
public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Column... columns) {
return get(rows, ImmutableSet.copyOf(columns));
}
+ @Override
public Map<RowColumn, String> gets(Collection<RowColumn> rowColumns) {
Map<RowColumn, Bytes> bytesMap = get(rowColumns);
return Maps.transformValues(bytesMap, b -> b.toString());
}
+ @Override
public Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
Set<Column> columns) {
Map<Bytes, Map<Column, Bytes>> rcvs = get(Collections2.transform(rows, this::s2bConv), columns);
@@ -97,11 +111,13 @@
return ret;
}
+ @Override
public Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
Column... columns) {
return gets(rows, ImmutableSet.copyOf(columns));
}
+ @Override
public String gets(CharSequence row, Column column) {
Bytes val = get(s2bConv(row), column);
if (val == null) {
@@ -110,6 +126,7 @@
return val.toString();
}
+ @Override
public String gets(CharSequence row, Column column, String defaultValue) {
Bytes val = get(s2bConv(row), column);
if (val == null) {
@@ -119,11 +136,13 @@
return val.toString();
}
+ @Override
public Map<Column, String> gets(CharSequence row, Set<Column> columns) {
Map<Column, Bytes> values = get(s2bConv(row), columns);
return Maps.transformValues(values, b -> b.toString());
}
+ @Override
public Map<Column, String> gets(CharSequence row, Column... columns) {
return gets(row, ImmutableSet.copyOf(columns));
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
index 8e44590..731aaeb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
@@ -27,14 +27,17 @@
public abstract class AbstractTransactionBase extends AbstractSnapshotBase
implements TransactionBase {
+ @Override
public void delete(CharSequence row, Column col) {
delete(s2bConv(row), col);
}
+ @Override
public void set(CharSequence row, Column col, CharSequence value) throws AlreadySetException {
set(s2bConv(row), col, Bytes.of(value));
}
+ @Override
public void setWeakNotification(CharSequence row, Column col) {
setWeakNotification(s2bConv(row), col);
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 51c29b9..6f4d1ff 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -114,6 +114,7 @@
* </pre>
*
* @return A scanner builder.
+ * @see TransactionBase#withReadLock()
*/
ScannerBuilder scanner();
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java
index 06303b6..bb53da5 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/TransactionBase.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -60,4 +60,26 @@
* encoded using UTF-8.
*/
void setWeakNotification(CharSequence row, Column col);
+
+ /**
+ * Normally when a Fluo transaction reads data and does not write to it, it will not collide with
+ * other transactions making concurrent writes. When this method is called, all reads will
+ * acquire a read lock. These read locks cause collisions with transactions doing concurrent
+ * writes. However, multiple transactions can get concurrent read locks on the same row+col
+ * without colliding.
+ *
+ * <p>
+ * Scanning with read locks is not supported. Attempting to call {@code withReadLock().scanner()}
+ * will throw an {@link UnsupportedOperationException}. This is because there are an infinite
+ * amount of keys within a range and read locks can not be obtained on them all.
+ *
+ * <p>
+ * A transaction that only acquires read locks will do nothing at commit time. In this case no
+ * read locks are actually written and no collisions will ever occur.
+ *
+ * @since 1.2.0
+ */
+ default SnapshotBase withReadLock() {
+ throw new UnsupportedOperationException("Read locks not supported by this implementation");
+ }
}
diff --git a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
index 5998e89..20377a9 100644
--- a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
+++ b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
@@ -34,7 +34,7 @@
JCommander jcommand = new JCommander(options);
jcommand.parse(args.split(" "));
ScanUtil.ScanOpts opts = options.getScanOpts();
- return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts));
+ return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false);
}
@Test
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index f3068e9..4172386 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -24,24 +24,33 @@
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.accumulo.iterators.OpenReadLockIterator;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.LockValue;
+import org.apache.fluo.accumulo.values.ReadLockValue;
+import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.SpanUtil;
+import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
/**
@@ -52,38 +61,61 @@
public class LockResolver {
- private static Map<PrimaryRowColumn, List<Entry<Key, Value>>> groupLocksByPrimary(
- List<Entry<Key, Value>> locks) {
- Map<PrimaryRowColumn, List<Entry<Key, Value>>> groupedLocks = new HashMap<>();
+ private static Map<PrimaryRowColumn, List<LockInfo>> groupLocksByPrimary(List<LockInfo> locks) {
+ Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = new HashMap<>();
Map<PrimaryRowColumn, Long> transactorIds = new HashMap<>();
- for (Entry<Key, Value> lock : locks) {
- LockValue lockVal = new LockValue(lock.getValue().get());
- PrimaryRowColumn prc =
- new PrimaryRowColumn(lockVal.getPrimaryRow(), lockVal.getPrimaryColumn(),
- lock.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK);
+ for (LockInfo lockInfo : locks) {
- List<Entry<Key, Value>> lockList = groupedLocks.get(prc);
- if (lockList == null) {
- lockList = new ArrayList<>();
- groupedLocks.put(prc, lockList);
- }
+ PrimaryRowColumn prc = new PrimaryRowColumn(lockInfo.prow, lockInfo.pcol, lockInfo.lockTs);
+
+ List<LockInfo> lockList = groupedLocks.computeIfAbsent(prc, k -> new ArrayList<>());
Long trid = transactorIds.get(prc);
if (trid == null) {
- transactorIds.put(prc, lockVal.getTransactor());
- } else if (!trid.equals(lockVal.getTransactor())) {
+ transactorIds.put(prc, lockInfo.transactorId);
+ } else if (!trid.equals(lockInfo.transactorId)) {
// sanity check.. its assumed that all locks w/ the same PrimaryRowColumn should have the
// same transactor id as well
- throw new IllegalStateException("transactor ids not equals " + prc + " " + lock.getKey()
- + " " + trid + " " + lockVal.getTransactor());
+ throw new IllegalStateException("transactor ids not equals " + prc + " "
+ + lockInfo.entry.getKey() + " " + trid + " " + lockInfo.transactorId);
}
- lockList.add(lock);
+ lockList.add(lockInfo);
}
return groupedLocks;
+ }
+
+ private static class LockInfo {
+
+ final Bytes prow;
+ final Column pcol;
+ final Long transactorId;
+ final long lockTs;
+ final boolean isReadLock;
+ final Entry<Key, Value> entry;
+
+ public LockInfo(Entry<Key, Value> kve) {
+ long rawTs = kve.getKey().getTimestamp();
+ this.entry = kve;
+ if ((rawTs & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+ this.lockTs = ReadLockUtil.decodeTs(rawTs);
+ ReadLockValue rlv = new ReadLockValue(kve.getValue().get());
+ this.prow = rlv.getPrimaryRow();
+ this.pcol = rlv.getPrimaryColumn();
+ this.transactorId = rlv.getTransactor();
+ this.isReadLock = true;
+ } else {
+ this.lockTs = kve.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+ LockValue lv = new LockValue(kve.getValue().get());
+ this.prow = lv.getPrimaryRow();
+ this.pcol = lv.getPrimaryColumn();
+ this.transactorId = lv.getTransactor();
+ this.isReadLock = false;
+ }
+ }
}
/**
@@ -99,7 +131,7 @@
* @return true if all locks passed in were resolved (rolled forward or back)
*/
static boolean resolveLocks(Environment env, long startTs, TxStats stats,
- List<Entry<Key, Value>> locks, long startTime) {
+ List<Entry<Key, Value>> locksKVs, long startTime) {
// check if transactor is still alive
int numResolved = 0;
@@ -110,7 +142,10 @@
TransactorCache transactorCache = env.getSharedResources().getTransactorCache();
- List<Entry<Key, Value>> locksToRecover;
+ List<LockInfo> locks = new ArrayList<>();
+ locksKVs.forEach(e -> locks.add(new LockInfo(e)));
+
+ List<LockInfo> locksToRecover;
if (System.currentTimeMillis() - startTime > env.getConfiguration()
.getTransactionRollbackTime()) {
locksToRecover = locks;
@@ -118,38 +153,32 @@
timedOut = true;
} else {
locksToRecover = new ArrayList<>(locks.size());
- for (Entry<Key, Value> entry : locks) {
-
- Long transactorId = new LockValue(entry.getValue().get()).getTransactor();
- long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
-
- if (transactorCache.checkTimedout(transactorId, lockTs)) {
- locksToRecover.add(entry);
+ for (LockInfo lockInfo : locks) {
+ if (transactorCache.checkTimedout(lockInfo.transactorId, lockInfo.lockTs)) {
+ locksToRecover.add(lockInfo);
stats.incrementTimedOutLocks();
- } else if (!transactorCache.checkExists(transactorId)) {
- locksToRecover.add(entry);
+ } else if (!transactorCache.checkExists(lockInfo.transactorId)) {
+ locksToRecover.add(lockInfo);
stats.incrementDeadLocks();
}
}
}
- Map<PrimaryRowColumn, List<Entry<Key, Value>>> groupedLocks =
- groupLocksByPrimary(locksToRecover);
+ Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = groupLocksByPrimary(locksToRecover);
if (timedOut) {
- Set<Entry<PrimaryRowColumn, List<Entry<Key, Value>>>> es = groupedLocks.entrySet();
+ Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
- for (Entry<PrimaryRowColumn, List<Entry<Key, Value>>> entry : es) {
+ for (Entry<PrimaryRowColumn, List<LockInfo>> entry : es) {
long lockTs = entry.getKey().startTs;
- Long transactorId = new LockValue(entry.getValue().get(0).getValue().get()).getTransactor();
+ Long transactorId = entry.getValue().get(0).transactorId;
transactorCache.addTimedoutTransactor(transactorId, lockTs, startTime);
}
}
TxInfoCache txiCache = env.getSharedResources().getTxInfoCache();
-
- Set<Entry<PrimaryRowColumn, List<Entry<Key, Value>>>> es = groupedLocks.entrySet();
- for (Entry<PrimaryRowColumn, List<Entry<Key, Value>>> group : es) {
+ Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
+ for (Entry<PrimaryRowColumn, List<LockInfo>> group : es) {
TxInfo txInfo = txiCache.getTransactionInfo(group.getKey());
switch (txInfo.status) {
case COMMITTED:
@@ -182,18 +211,24 @@
}
private static void rollback(Environment env, long startTs, PrimaryRowColumn prc,
- List<Entry<Key, Value>> value, Map<ByteSequence, Mutation> mutations) {
- for (Entry<Key, Value> entry : value) {
- if (isPrimary(prc, entry.getKey())) {
+ List<LockInfo> value, Map<ByteSequence, Mutation> mutations) {
+ for (LockInfo lockInfo : value) {
+ if (isPrimary(prc, lockInfo.entry.getKey())) {
continue;
}
- long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
- Mutation mut = getMutation(entry.getKey().getRowData(), mutations);
- Key k = entry.getKey();
- mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
- k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockTs,
- DelLockValue.encodeRollback(false, true));
+ Mutation mut = getMutation(lockInfo.entry.getKey().getRowData(), mutations);
+ Key k = lockInfo.entry.getKey();
+ if (lockInfo.isReadLock) {
+ mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
+ k.getColumnVisibilityParsed(),
+ ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(lockInfo.lockTs, true),
+ DelReadLockValue.encodeRollback());
+ } else {
+ mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
+ k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockInfo.lockTs,
+ DelLockValue.encodeRollback(false, true));
+ }
}
}
@@ -224,26 +259,31 @@
}
}
- private static void commitColumns(Environment env, PrimaryRowColumn prc,
- List<Entry<Key, Value>> value, long commitTs, Map<ByteSequence, Mutation> mutations) {
- for (Entry<Key, Value> entry : value) {
- if (isPrimary(prc, entry.getKey())) {
+ private static void commitColumns(Environment env, PrimaryRowColumn prc, List<LockInfo> value,
+ long commitTs, Map<ByteSequence, Mutation> mutations) {
+ for (LockInfo lockInfo : value) {
+ if (isPrimary(prc, lockInfo.entry.getKey())) {
continue;
}
- long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
+ long lockTs = lockInfo.lockTs;
// TODO may be that a stronger sanity check that could be done here
if (commitTs < lockTs) {
throw new IllegalStateException(
- "bad commitTs : " + entry.getKey() + " (" + commitTs + "<" + lockTs + ")");
+ "bad commitTs : " + lockInfo.entry.getKey() + " (" + commitTs + "<" + lockTs + ")");
}
- Mutation mut = getMutation(entry.getKey().getRowData(), mutations);
- Column col = SpanUtil.toRowColumn(entry.getKey()).getColumn();
+ Mutation mut = getMutation(lockInfo.entry.getKey().getRowData(), mutations);
+ Column col = ColumnUtil.convert(lockInfo.entry.getKey());
- LockValue lv = new LockValue(entry.getValue().get());
- ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), lockTs,
- commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
+ if (lockInfo.isReadLock) {
+ ColumnUtil.commitColumn(env, false, false, col, false, false, true, lockTs, commitTs,
+ env.getConfiguredObservers().getObservedColumns(STRONG), mut);
+ } else {
+ LockValue lv = new LockValue(lockInfo.entry.getValue().get());
+ ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), false,
+ lockTs, commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
+ }
}
}
@@ -263,4 +303,43 @@
return prc.prow.equals(ByteUtil.toBytes(k.getRowData()))
&& prc.pcol.equals(SpanUtil.toRowColumn(k).getColumn());
}
+
+ static List<Entry<Key, Value>> getOpenReadLocks(Environment env,
+ Map<Bytes, Set<Column>> rowColsToCheck) throws Exception {
+
+ List<Range> ranges = new ArrayList<>();
+
+ for (Entry<Bytes, Set<Column>> e1 : rowColsToCheck.entrySet()) {
+ for (Column col : e1.getValue()) {
+ Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col));
+ Key end = new Key(start);
+ end.setTimestamp(ColumnConstants.LOCK_PREFIX | ColumnConstants.TIMESTAMP_MASK);
+ ranges.add(new Range(start, true, end, false));
+ }
+ }
+
+ BatchScanner bscanner = null;
+ try {
+ bscanner = env.getConnector().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
+
+ bscanner.setRanges(ranges);
+ IteratorSetting iterCfg = new IteratorSetting(10, OpenReadLockIterator.class);
+
+ bscanner.addScanIterator(iterCfg);
+
+ List<Entry<Key, Value>> ret = new ArrayList<>();
+ for (Entry<Key, Value> entry : bscanner) {
+ if ((entry.getKey().getTimestamp() & PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
+ ret.add(entry);
+ }
+ }
+
+ return ret;
+
+ } finally {
+ if (bscanner != null) {
+ bscanner.close();
+ }
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 37e0df5..48250c0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -51,9 +52,10 @@
private List<Range> rangesToScan = new ArrayList<>();
private Function<ByteSequence, Bytes> rowConverter;
private Function<Key, Column> columnConverter;
+ private Map<Bytes, Set<Column>> readLocksSeen;
ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env,
- long startTs, TxStats stats) {
+ long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen) {
this.rows = rows;
this.columns = columns;
this.env = env;
@@ -61,10 +63,11 @@
this.stats = stats;
this.rowConverter = new CachedBytesConverter(rows);
this.columnConverter = new CachedColumnConverter(columns);
+ this.readLocksSeen = readLocksSeen;
}
- ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs,
- TxStats stats) {
+ ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats,
+ Map<Bytes, Set<Column>> readLocksSeen) {
for (RowColumn rc : cells) {
byte[] r = rc.getRow().toArray();
byte[] cf = rc.getColumn().getFamily().toArray();
@@ -83,6 +86,7 @@
this.stats = stats;
this.rowConverter = ByteUtil::toBytes;
this.columnConverter = ColumnUtil::convert;
+ this.readLocksSeen = readLocksSeen;
}
private BatchScanner setupBatchScanner() {
@@ -101,7 +105,7 @@
if (rangesToScan.size() > 0) {
scanner.setRanges(rangesToScan);
- SnapshotScanner.setupScanner(scanner, Collections.<Column>emptySet(), startTs);
+ SnapshotScanner.setupScanner(scanner, Collections.<Column>emptySet(), startTs, true);
} else if (rows != null) {
List<Range> ranges = new ArrayList<>(rows.size());
@@ -111,7 +115,7 @@
scanner.setRanges(ranges);
- SnapshotScanner.setupScanner(scanner, columns, startTs);
+ SnapshotScanner.setupScanner(scanner, columns, startTs, true);
} else {
return null;
}
@@ -176,13 +180,9 @@
if (colType == ColumnConstants.LOCK_PREFIX) {
locks.add(entry);
} else if (colType == ColumnConstants.DATA_PREFIX) {
- Map<Column, Bytes> cols = ret.get(row);
- if (cols == null) {
- cols = new HashMap<>();
- ret.put(row, cols);
- }
-
- cols.put(col, Bytes.of(entry.getValue().get()));
+ ret.computeIfAbsent(row, k -> new HashMap<>()).put(col, Bytes.of(entry.getValue().get()));
+ } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+ readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
} else {
throw new IllegalArgumentException("Unexpected column type " + colType);
}
@@ -191,5 +191,4 @@
bs.close();
}
}
-
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ReadLockSnapshot.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ReadLockSnapshot.java
new file mode 100644
index 0000000..eb1ec01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ReadLockSnapshot.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.AbstractSnapshotBase;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.scanner.ScannerBuilder;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+
+public class ReadLockSnapshot extends AbstractSnapshotBase implements SnapshotBase {
+
+ private TransactionImpl txi;
+
+ ReadLockSnapshot(TransactionImpl txi) {
+ super(txi);
+ this.txi = txi;
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ txi.setReadLock(row, column);
+ return txi.get(row, column);
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ for (Column column : columns) {
+ txi.setReadLock(row, column);
+ }
+ return txi.get(row, columns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ for (Bytes row : rows) {
+ for (Column column : columns) {
+ txi.setReadLock(row, column);
+ }
+ }
+ return txi.get(rows, columns);
+ }
+
+ @Override
+ public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+ for (RowColumn rowColumn : rowColumns) {
+ txi.setReadLock(rowColumn.getRow(), rowColumn.getColumn());
+ }
+ return txi.get(rowColumns);
+ }
+
+ @Override
+ public ScannerBuilder scanner() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ return txi.getStartTimestamp();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index cd2e008..a1db35b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -49,10 +50,12 @@
public static final class Opts {
private final Span span;
private final Collection<Column> columns;
+ private final boolean showReadLocks;
- public Opts(Span span, Collection<Column> columns) {
+ public Opts(Span span, Collection<Column> columns, boolean showReadLocks) {
this.span = span;
this.columns = ImmutableSet.copyOf(columns);
+ this.showReadLocks = showReadLocks;
}
public Span getSpan() {
@@ -62,12 +65,17 @@
public Collection<Column> getColumns() {
return columns;
}
+
+ public boolean getShowReadLocks() {
+ return showReadLocks;
+ }
}
private final long startTs;
private final Environment env;
private final TxStats stats;
private final Opts config;
+ private Consumer<Entry<Key, Value>> locksSeen;
static final long INITIAL_WAIT_TIME = 50;
// TODO make configurable
@@ -75,7 +83,8 @@
- static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs) {
+ static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs,
+ boolean showReadLocks) {
for (Column col : columns) {
if (col.isQualifierSet()) {
scanner.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
@@ -86,6 +95,7 @@
IteratorSetting iterConf = new IteratorSetting(10, SnapshotIterator.class);
SnapshotIterator.setSnaptime(iterConf, startTs);
+ SnapshotIterator.setReturnReadLockPresent(iterConf, showReadLocks);
scanner.addScanIterator(iterConf);
}
@@ -111,7 +121,7 @@
scanner.clearScanIterators();
scanner.setRange(SpanUtil.toRange(snapIterConfig.getSpan()));
- setupScanner(scanner, snapIterConfig.getColumns(), startTs);
+ setupScanner(scanner, snapIterConfig.getColumns(), startTs, snapIterConfig.showReadLocks);
this.iterator = scanner.iterator();
}
@@ -137,7 +147,7 @@
}
private void resetScanner(Span span) {
- snapIterConfig = new Opts(span, snapIterConfig.columns);
+ snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks);
setUpIterator();
}
@@ -145,6 +155,8 @@
// read ahead a little bit looking for other locks to resolve
+ locksSeen.accept(lockEntry);
+
long startTime = System.currentTimeMillis();
long waitTime = INITIAL_WAIT_TIME;
@@ -164,6 +176,7 @@
if (colType == ColumnConstants.LOCK_PREFIX) {
locks.add(entry);
+ locksSeen.accept(lockEntry);
}
amountRead += entry.getKey().getSize() + entry.getValue().getSize();
@@ -215,6 +228,8 @@
} else if (colType == ColumnConstants.DATA_PREFIX) {
stats.incrementEntriesReturned(1);
return entry;
+ } else if (colType == ColumnConstants.RLOCK_PREFIX) {
+ return entry;
} else {
throw new IllegalArgumentException("Unexpected column type " + colType);
}
@@ -227,11 +242,13 @@
}
}
- SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats) {
+ SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats,
+ Consumer<Entry<Key, Value>> locksSeen) {
this.env = env;
this.config = config;
this.startTs = startTs;
this.stats = stats;
+ this.locksSeen = locksSeen;
}
@Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index aa5e7da..0abdafe 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -26,10 +26,12 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -51,10 +53,14 @@
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.LockValue;
+import org.apache.fluo.accumulo.values.ReadLockValue;
import org.apache.fluo.api.client.AbstractTransactionBase;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -71,13 +77,17 @@
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.oracle.Stamp;
+import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.Flutation;
import org.apache.fluo.core.util.Hex;
import org.apache.fluo.core.util.SpanUtil;
+import org.apache.fluo.core.util.UtilWaitThread;
+import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
+import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX;
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
@@ -92,15 +102,21 @@
Bytes.of("special delete object f804266bf94935edd45ae3e6c287b93c1814295c");
private static final Bytes NTFY_VAL =
Bytes.of("special ntfy value ce0c523e6e4dc093be8a2736b82eca1b95f97ed4");
+ private static final Bytes RLOCK_VAL =
+ Bytes.of("special rlock value 94da84e7796ff3b23b779805d820a33f1997cb8b");
private static boolean isWrite(Bytes val) {
- return val != NTFY_VAL;
+ return val != NTFY_VAL && val != RLOCK_VAL;
}
private static boolean isDelete(Bytes val) {
return val == DELETE;
}
+ private static boolean isReadLock(Bytes val) {
+ return val == RLOCK_VAL;
+ }
+
private static enum TxStatus {
OPEN, COMMIT_STARTED, COMMITTED, CLOSED
}
@@ -110,7 +126,9 @@
private final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
private final Set<Column> observedColumns;
private final Environment env;
- final Map<Bytes, Set<Column>> columnsRead = new HashMap<>();
+ private final Map<Bytes, Set<Column>> columnsRead = new HashMap<>();
+ // Tracks row columns that were observed to have had a read lock in the past.
+ private final Map<Bytes, Set<Column>> readLocksSeen = new HashMap<>();
private final TxStats stats;
private Notification notification;
private Notification weakNotification;
@@ -170,7 +188,8 @@
@Override
public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
checkIfOpen();
- return getImpl(row, columns);
+ return getImpl(row, columns, kve -> {
+ });
}
@Override
@@ -183,7 +202,8 @@
env.getSharedResources().getVisCache().validate(columns);
- ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rows, columns, env, startTs, stats);
+ ParallelSnapshotScanner pss =
+ new ParallelSnapshotScanner(rows, columns, env, startTs, stats, readLocksSeen);
Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
@@ -202,7 +222,8 @@
return Collections.emptyMap();
}
- ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, env, startTs, stats);
+ ParallelSnapshotScanner pss =
+ new ParallelSnapshotScanner(rowColumns, env, startTs, stats, readLocksSeen);
Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
Map<RowColumn, Bytes> ret = new HashMap<>();
@@ -217,7 +238,8 @@
return ret;
}
- private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
+ private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns,
+ Consumer<Entry<Key, Value>> locksSeen) {
// TODO push visibility filtering to server side?
@@ -241,19 +263,26 @@
cols.add(column);
}
}
- opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+ opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
} else {
- opts = new SnapshotScanner.Opts(Span.exact(row), columns);
+ opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
}
Map<Column, Bytes> ret = new HashMap<>();
+ Set<Column> readLockCols = null;
- for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats)) {
+ for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats, locksSeen)) {
+
Column col = ColumnUtil.convert(kve.getKey());
- if (shouldCopy) {
- if (columns.contains(col)) {
- ret.put(col, Bytes.of(kve.getValue().get()));
+ if (shouldCopy && !columns.contains(col)) {
+ continue;
+ }
+
+ if ((kve.getKey().getTimestamp() & PREFIX_MASK) == RLOCK_PREFIX) {
+ if (readLockCols == null) {
+ readLockCols = readLocksSeen.computeIfAbsent(row, k -> new HashSet<>());
}
+ readLockCols.add(col);
} else {
ret.put(col, Bytes.of(kve.getValue().get()));
}
@@ -280,6 +309,31 @@
colsRead.addAll(columns);
}
+ void setReadLock(Bytes row, Column col) {
+ checkIfOpen();
+ Objects.requireNonNull(row);
+ Objects.requireNonNull(col);
+
+ if (col.getFamily().equals(ColumnConstants.NOTIFY_CF)) {
+ throw new IllegalArgumentException(ColumnConstants.NOTIFY_CF + " is a reserved family");
+ }
+
+ env.getSharedResources().getVisCache().validate(col);
+
+ Map<Column, Bytes> colUpdates = updates.computeIfAbsent(row, k -> new HashMap<>());
+ Bytes curVal = colUpdates.get(col);
+ if (curVal != null && (isWrite(curVal) || isDelete(curVal))) {
+ throw new AlreadySetException("Attemped read lock after write lock " + row + " " + col);
+ }
+
+ colUpdates.put(col, RLOCK_VAL);
+ }
+
+ @Override
+ public SnapshotBase withReadLock() {
+ return new ReadLockSnapshot(this);
+ }
+
@Override
public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
checkIfOpen();
@@ -293,13 +347,7 @@
env.getSharedResources().getVisCache().validate(col);
- // TODO copy?
-
- Map<Column, Bytes> colUpdates = updates.get(row);
- if (colUpdates == null) {
- colUpdates = new HashMap<>();
- updates.put(row, colUpdates);
- }
+ Map<Column, Bytes> colUpdates = updates.computeIfAbsent(row, k -> new HashMap<>());
Bytes curVal = colUpdates.get(col);
if (curVal != null && isWrite(curVal)) {
@@ -346,6 +394,10 @@
PrewriteIterator.enableAckCheck(iterConf, notification.getTimestamp());
}
+ if (isReadLock(val)) {
+ PrewriteIterator.setReadlock(iterConf);
+ }
+
Condition cond = new FluoCondition(env, col).setIterators(iterConf);
if (cm == null) {
@@ -358,8 +410,13 @@
cm.put(col, ColumnConstants.DATA_PREFIX | startTs, val.toArray());
}
- cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, LockValue.encode(primaryRow, primaryColumn,
- isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
+ if (isReadLock(val)) {
+ cm.put(col, ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, false),
+ ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID()));
+ } else {
+ cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, LockValue.encode(primaryRow, primaryColumn,
+ isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
+ }
return cm;
}
@@ -497,7 +554,7 @@
*
* @param cd Commit data
*/
- private void readUnread(CommitData cd) throws Exception {
+ private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> locksSeen) throws Exception {
// TODO make async
// TODO need to keep track of ranges read (not ranges passed in, but actual data read... user
// may not iterate over entire range
@@ -517,10 +574,86 @@
}
for (Entry<Bytes, Set<Column>> entry : columnsToRead.entrySet()) {
- getImpl(entry.getKey(), entry.getValue());
+ getImpl(entry.getKey(), entry.getValue(), locksSeen);
}
}
+ private void checkForOrphanedReadLocks(CommitData cd, Map<Bytes, Set<Column>> locksResolved)
+ throws Exception {
+
+ if (readLocksSeen.size() == 0) {
+ return;
+ }
+
+ Map<Bytes, Set<Column>> rowColsToCheck = new HashMap<>();
+
+ for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {
+
+ Set<Column> resolvedColumns =
+ locksResolved.getOrDefault(entry.getKey(), Collections.emptySet());
+
+ Set<Column> colsToCheck = null;
+ Set<Column> readLockCols = readLocksSeen.get(entry.getKey());
+ if (readLockCols != null) {
+ for (Column candidate : Sets.intersection(readLockCols, entry.getValue())) {
+ if (resolvedColumns.contains(candidate)) {
+ // A write lock was seen and this is probably what caused the collision, no need to
+ // check this column for read locks.
+ continue;
+ }
+
+ if (!isReadLock(updates.getOrDefault(entry.getKey(), Collections.emptyMap())
+ .getOrDefault(candidate, EMPTY_BS))) {
+ if (colsToCheck == null) {
+ colsToCheck = new HashSet<>();
+ }
+ colsToCheck.add(candidate);
+ }
+ }
+
+ if (colsToCheck != null) {
+ rowColsToCheck.put(entry.getKey(), colsToCheck);
+ }
+ }
+ }
+
+ if (rowColsToCheck.size() > 0) {
+
+ long startTime = System.currentTimeMillis();
+ long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;
+
+ boolean resolved = false;
+
+ List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
+
+ startTime = System.currentTimeMillis();
+
+ while (!resolved) {
+ resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
+ if (!resolved) {
+ UtilWaitThread.sleep(waitTime);
+ stats.incrementLockWaitTime(waitTime);
+ waitTime = Math.min(SnapshotScanner.MAX_WAIT_TIME, waitTime * 2);
+
+ openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
+ }
+ }
+ }
+ }
+
+ private void checkForOrphanedLocks(CommitData cd) throws Exception {
+
+ Map<Bytes, Set<Column>> locksSeen = new HashMap<>();
+
+ readUnread(cd, kve -> {
+ Bytes row = ByteUtil.toBytes(kve.getKey().getRowData());
+ Column col = ColumnUtil.convert(kve.getKey());
+ locksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
+ });
+
+ checkForOrphanedReadLocks(cd, locksSeen);
+ }
+
private boolean checkForAckCollision(ConditionalMutation cm) {
Bytes row = Bytes.of(cm.getRow());
@@ -792,8 +925,8 @@
stats.incrementEntriesSet(cols.size());
}
- Bytes primRow;
- Column primCol;
+ Bytes primRow = null;
+ Column primCol = null;
if (primary != null) {
primRow = primary.getRow();
@@ -805,9 +938,23 @@
primRow = notification.getRow();
primCol = notification.getColumn();
} else {
- primRow = updates.keySet().iterator().next();
- Map<Column, Bytes> colSet = updates.get(primRow);
- primCol = colSet.keySet().iterator().next();
+
+ outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet()) {
+ for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
+ if (!isReadLock(entry2.getValue())) {
+ primRow = entry.getKey();
+ primCol = entry2.getKey();
+ break outer;
+ }
+ }
+ }
+
+ if (primRow == null) {
+ // there are only read locks, so nothing to write
+ deleteWeakRow();
+ commitCallback.committed();
+ return;
+ }
}
// get a primary column
@@ -825,7 +972,6 @@
final ConditionalMutation pcm =
prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow));
-
ListenableFuture<Iterator<Result>> future = cd.acw.apply(Collections.singletonList(pcm));
Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
@Override
@@ -881,7 +1027,7 @@
cd.addPrimaryToRejected();
getStats().setRejected(cd.getRejected());
// TODO do async
- readUnread(cd);
+ checkForOrphanedLocks(cd);
if (checkForAckCollision(pcm)) {
cd.commitObserver.alreadyAcknowledged();
} else {
@@ -913,7 +1059,6 @@
cd.acceptedRows = new HashSet<>();
-
ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations);
Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
@Override
@@ -940,7 +1085,7 @@
env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) {
@Override
protected void runCommitStep(CommitData cd) throws Exception {
- readUnread(cd);
+ checkForOrphanedLocks(cd);
rollbackOtherLocks(cd);
}
});
@@ -957,7 +1102,6 @@
}
}
-
private void rollbackOtherLocks(CommitData cd) throws Exception {
// roll back locks
@@ -968,9 +1112,14 @@
ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
for (Bytes row : cd.acceptedRows) {
m = new Flutation(env, row);
- for (Column col : updates.get(row).keySet()) {
- m.put(col, ColumnConstants.DEL_LOCK_PREFIX | startTs,
- DelLockValue.encodeRollback(false, true));
+ for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
+ if (isReadLock(entry.getValue())) {
+ m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+ DelReadLockValue.encodeRollback());
+ } else {
+ m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ DelLockValue.encodeRollback(false, true));
+ }
}
mutations.add(m);
}
@@ -1004,7 +1153,6 @@
}, env.getSharedResources().getAsyncCommitExecutor());
}
-
private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
if (startTs < commitStamp.getGcTimestamp()) {
rollbackOtherLocks(cd);
@@ -1065,7 +1213,6 @@
}
}
-
ListenableFuture<Void> future =
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
Futures.addCallback(future, new CommitCallback<Void>(cd) {
@@ -1088,8 +1235,7 @@
final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
- startTs, commitTs, observedColumns, delLockMutation);
-
+ isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
ListenableFuture<Iterator<Result>> future =
cd.acw.apply(Collections.singletonList(delLockMutation));
@@ -1146,7 +1292,6 @@
}
}
-
private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
throws Exception {
if (mutationStatus != Status.ACCEPTED) {
@@ -1170,7 +1315,7 @@
ColumnUtil.commitColumn(env,
isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
- startTs, commitTs, observedColumns, m);
+ isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
}
mutations.add(m);
@@ -1217,6 +1362,8 @@
}
public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
- return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns), startTs, stats);
+ return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false), startTs, stats,
+ kve -> {
+ });
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index a4c35c3..21406dc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -20,8 +20,10 @@
import java.util.Set;
import com.google.common.collect.Iterators;
+import org.apache.fluo.api.client.AbstractSnapshotBase;
import org.apache.fluo.api.client.AbstractTransactionBase;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
@@ -84,7 +86,6 @@
e -> encNonAscii(e.getKey()) + "=" + encNonAscii(e.getValue())));
}
-
private String toStringEncNonAsciiCC(Collection<Column> columns) {
return Iterators.toString(Iterators.transform(columns.iterator(), Hex::encNonAscii));
}
@@ -126,47 +127,66 @@
}
- @Override
- public Bytes get(Bytes row, Column column) {
- Bytes ret = tx.get(row, column);
+ private Bytes get(SnapshotBase snap, Bytes row, Column column, String prefix) {
+ Bytes ret = snap.get(row, column);
if (log.isTraceEnabled()) {
- log.trace("txid: {} get({}, {}) -> {}", txid, encNonAscii(row), toStringEncNonAscii(column),
- encNonAscii(ret));
+ log.trace("txid: {} {}get({}, {}) -> {}", txid, prefix, encNonAscii(row),
+ toStringEncNonAscii(column), encNonAscii(ret));
}
return ret;
}
- @Override
- public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
- Map<Column, Bytes> ret = tx.get(row, columns);
+ private Map<Column, Bytes> get(SnapshotBase snap, Bytes row, Set<Column> columns, String prefix) {
+ Map<Column, Bytes> ret = snap.get(row, columns);
if (log.isTraceEnabled()) {
- log.trace("txid: {} get({}, {}) -> {}", txid, encNonAscii(row),
+ log.trace("txid: {} {}get({}, {}) -> {}", txid, prefix, encNonAscii(row),
toStringEncNonAsciiCC(columns), toStringEncNonAsciiMCB(ret));
}
return ret;
}
- @Override
- public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
- Map<Bytes, Map<Column, Bytes>> ret = tx.get(rows, columns);
+ private Map<Bytes, Map<Column, Bytes>> get(SnapshotBase snap, Collection<Bytes> rows,
+ Set<Column> columns, String prefix) {
+ Map<Bytes, Map<Column, Bytes>> ret = snap.get(rows, columns);
if (log.isTraceEnabled()) {
- log.trace("txid: {} get({}, {}) -> {}", txid, toStringEncNonAsciiCB(rows),
+ log.trace("txid: {} {}get({}, {}) -> {}", txid, prefix, toStringEncNonAsciiCB(rows),
toStringEncNonAsciiCC(columns), toStringEncNonAsciiMBMCB(ret));
}
return ret;
}
- @Override
- public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
- Map<RowColumn, Bytes> ret = tx.get(rowColumns);
+
+ private Map<RowColumn, Bytes> get(SnapshotBase snap, Collection<RowColumn> rowColumns,
+ String prefix) {
+ Map<RowColumn, Bytes> ret = snap.get(rowColumns);
if (log.isTraceEnabled()) {
- log.trace("txid: {} get({}) -> {}", txid, toStringEncNonAsciiCRC(rowColumns),
+ log.trace("txid: {} {}get({}) -> {}", txid, prefix, toStringEncNonAsciiCRC(rowColumns),
toStringEncNonAsciiMRCB(ret));
}
return ret;
}
@Override
+ public Bytes get(Bytes row, Column column) {
+ return get(tx, row, column, "");
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ return get(tx, row, columns, "");
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ return get(tx, rows, columns, "");
+ }
+
+ @Override
+ public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+ return get(tx, rowColumns, "");
+ }
+
+ @Override
public ScannerBuilder scanner() {
return new TracingScannerBuilder(tx.scanner(), txid);
}
@@ -303,4 +323,43 @@
public int getSize() {
return tx.getSize();
}
+
+ @Override
+ public SnapshotBase withReadLock() {
+ SnapshotBase rltx = tx.withReadLock();
+
+ return new AbstractSnapshotBase() {
+
+ @Override
+ public ScannerBuilder scanner() {
+ // this is an unsupported op and will throw an exception so don't bother w/ trace logging
+ return rltx.scanner();
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ return rltx.getStartTimestamp();
+ }
+
+ @Override
+ public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+ return TracingTransaction.this.get(rltx, rowColumns, "withReadLock().");
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ return TracingTransaction.this.get(rltx, rows, columns, "withReadLock().");
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ return TracingTransaction.this.get(rltx, row, columns, "withReadLock().");
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ return TracingTransaction.this.get(rltx, row, column, "withReadLock().");
+ }
+ };
+ }
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bbc2d03..7d488ca 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -30,7 +30,9 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
+import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Bytes.BytesBuilder;
@@ -51,9 +53,13 @@
}
public static void commitColumn(Environment env, boolean isTrigger, boolean isPrimary, Column col,
- boolean isWrite, boolean isDelete, long startTs, long commitTs, Set<Column> observedColumns,
- Mutation m) {
- if (isWrite) {
+ boolean isWrite, boolean isDelete, boolean isReadlock, long startTs, long commitTs,
+ Set<Column> observedColumns, Mutation m) {
+ if (isReadlock) {
+ Flutation.put(env, m, col,
+ ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+ DelReadLockValue.encodeCommit(commitTs));
+ } else if (isWrite) {
Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs,
WriteValue.encode(startTs, isPrimary, isDelete));
} else {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 57299e9..4e481e4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
@@ -110,18 +111,22 @@
return TABLE_BASE + tableCounter.incrementAndGet();
}
- protected void printSnapshot() throws Exception {
+ protected void printSnapshot(Consumer<String> out) throws Exception {
try (Snapshot s = client.newSnapshot()) {
- System.out.println("== snapshot start ==");
+ out.accept("== snapshot start ==");
for (RowColumnValue rcv : s.scanner().build()) {
- System.out.println(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue());
+ out.accept(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue());
}
- System.out.println("=== snapshot end ===");
+ out.accept("=== snapshot end ===");
}
}
+ protected void printSnapshot() throws Exception {
+ printSnapshot(System.out::println);
+ }
+
@AfterClass
public static void tearDownAccumulo() throws Exception {
if (startedCluster) {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index 57b3923..a175500 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -31,6 +31,7 @@
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.api.client.AbstractTransactionBase;
+import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
@@ -199,4 +200,9 @@
public long getStartTimestamp() {
return tx.getStartTimestamp();
}
+
+ @Override
+ public SnapshotBase withReadLock() {
+ return tx.withReadLock();
+ }
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index bdb4b51..fc553f5 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -29,6 +29,7 @@
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.impl.TransactorNode;
@@ -176,6 +177,88 @@
Assert.assertEquals(0, countInTable("-DATA"));
}
+ private void increment(TransactionBase tx, String row, Column col) {
+ int count = Integer.parseInt(tx.gets(row, col, "0"));
+ tx.set(row, col, count + 1 + "");
+ }
+
+ @Test(timeout = 60000)
+ public void testReadLocks() throws Exception {
+
+ final Column altIdCol = new Column("info", "altId");
+
+ TestTransaction tx1 = new TestTransaction(env);
+ for (int i = 0; i < 10; i++) {
+ tx1.set(String.format("n:%03d", i), altIdCol, "" + (19 * (1 + i)));
+ }
+
+ tx1.done();
+
+ for (int i = 0; i < 50; i++) {
+ String row = String.format("n:%03d", i % 10);
+
+ TestTransaction tx = new TestTransaction(env);
+ String altId = tx.withReadLock().gets(row, altIdCol);
+
+ increment(tx, "a:" + altId, new Column("count", row));
+
+ tx.done();
+ }
+
+ Assert.assertEquals(50, countInTable("-DEL_RLOCK"));
+ Assert.assertEquals(50, countInTable("-RLOCK"));
+
+ TestTransaction tx2 = new TestTransaction(env);
+ for (int i = 0; i < 10; i++) {
+ String row = String.format("n:%03d", i);
+ String newAltId = (13 * (i + 1)) + "";
+ String currAltId = tx2.gets(row, altIdCol);
+
+
+ tx2.set(row, altIdCol, newAltId);
+
+ String count = tx2.gets("a:" + currAltId, new Column("count", row));
+ tx2.set("a:" + newAltId, new Column("count", row), count);
+ tx2.delete("a:" + currAltId, new Column("count", row));
+ }
+
+ tx2.done();
+
+ // all read locks should be garbage collected because of the writes after the read locks
+ conn.tableOperations().compact(table, null, null, true, true);
+
+ Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
+ Assert.assertEquals(0, countInTable("-RLOCK"));
+
+ for (int i = 0; i < 50; i++) {
+ String row = String.format("n:%03d", i % 10);
+
+ TestTransaction tx = new TestTransaction(env);
+ String altId = tx.withReadLock().gets(row, altIdCol);
+
+ increment(tx, "a:" + altId, new Column("count", row));
+
+ tx.done();
+ }
+
+ TestTransaction tx3 = new TestTransaction(env);
+ for (int i = 0; i < 10; i++) {
+ String row = String.format("n:%03d", i);
+ String currAltId = tx3.gets(row, altIdCol);
+ Assert.assertEquals("10", tx3.gets("a:" + currAltId, new Column("count", row)));
+ }
+
+ tx3.done();
+
+ waitForGcTime(tx3.getStartTimestamp());
+ conn.tableOperations().compact(table, null, null, true, true);
+
+
+ // all read locks older than GC time should be dropped
+ Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
+ Assert.assertEquals(0, countInTable("-RLOCK"));
+ }
+
private int countInTable(String str) throws TableNotFoundException {
int count = 0;
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
new file mode 100644
index 0000000..489b9e0
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.accumulo.format.FluoFormatter;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.exceptions.CommitException;
+import org.apache.fluo.core.impl.TransactionImpl.CommitData;
+import org.apache.fluo.core.impl.TransactorNode;
+import org.apache.fluo.core.oracle.Stamp;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.apache.fluo.integration.TestTransaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.fluo.integration.impl.ReadLockIT.addEdge;
+import static org.apache.fluo.integration.impl.ReadLockIT.setAlias;
+
+public class ReadLockFailureIT extends ITBaseImpl {
+
+ private void dumpTable(Consumer<String> out) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
+ for (Entry<Key, Value> entry : scanner) {
+ out.accept(FluoFormatter.toString(entry));
+ }
+ }
+
+ private Set<String> getDerivedEdges() {
+ Set<String> derivedEdges = new HashSet<>();
+ try (Snapshot snap = client.newSnapshot()) {
+ snap.scanner().over(Span.prefix("d:")).build().stream().map(RowColumnValue::getsRow)
+ .map(r -> r.substring(2)).forEach(derivedEdges::add);
+ }
+ return derivedEdges;
+ }
+
+ private void expectCommitException(Consumer<Transaction> retryAction) {
+ try (Transaction tx = client.newTransaction()) {
+ retryAction.accept(tx);
+ tx.commit();
+ Assert.fail();
+ } catch (CommitException ce) {
+
+ }
+ }
+
+ private void retryOnce(Consumer<Transaction> retryAction) {
+
+ expectCommitException(retryAction);
+
+ try (Transaction tx = client.newTransaction()) {
+ retryAction.accept(tx);
+ tx.commit();
+ }
+ }
+
+ private void retryTwice(Consumer<Transaction> retryAction) {
+
+ expectCommitException(retryAction);
+ expectCommitException(retryAction);
+
+ try (Transaction tx = client.newTransaction()) {
+ retryAction.accept(tx);
+ tx.commit();
+ }
+ }
+
+
+ private TransactorNode partiallyCommit(Consumer<TransactionBase> action, boolean commitPrimary,
+ boolean closeTransactor) throws Exception {
+ TransactorNode t2 = new TransactorNode(env);
+ TestTransaction tx2 = new TestTransaction(env, t2);
+
+ action.accept(tx2);
+
+ CommitData cd = tx2.createCommitData();
+ Assert.assertTrue(tx2.preCommit(cd));
+
+ if (commitPrimary) {
+ Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
+ Assert.assertTrue(tx2.commitPrimaryColumn(cd, commitTs));
+ }
+
+ if (closeTransactor) {
+ t2.close();
+ }
+
+ return t2;
+ }
+
+ private void testBasicRollback(boolean closeTransactor) throws Exception {
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bob");
+ setAlias(tx, "node2", "joe");
+ setAlias(tx, "node3", "alice");
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ addEdge(tx, "node1", "node2");
+ tx.commit();
+ }
+
+ TransactorNode tn =
+ partiallyCommit(tx -> addEdge(tx, "node1", "node3"), false, closeTransactor);
+
+ Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob"), getDerivedEdges());
+
+ retryOnce(tx -> setAlias(tx, "node1", "bobby"));
+
+ Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby"), getDerivedEdges());
+
+ retryOnce(tx -> setAlias(tx, "node3", "alex"));
+
+ Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby"), getDerivedEdges());
+
+ if (!closeTransactor) {
+ tn.close();
+ }
+ }
+
+ @Test
+ public void testBasicRollback1() throws Exception {
+ testBasicRollback(true);
+ }
+
+ @Test
+ public void testBasicRollback2() throws Exception {
+ testBasicRollback(false);
+ }
+
+ private void testBasicRollforward(boolean closeTransactor) throws Exception {
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bob");
+ setAlias(tx, "node2", "joe");
+ setAlias(tx, "node3", "alice");
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ addEdge(tx, "node1", "node2");
+ tx.commit();
+ }
+
+ TransactorNode tn = partiallyCommit(tx -> addEdge(tx, "node1", "node3"), true, closeTransactor);
+
+ retryOnce(tx -> setAlias(tx, "node1", "bobby"));
+
+ Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alice", "alice:bobby"),
+ getDerivedEdges());
+
+ retryOnce(tx -> setAlias(tx, "node3", "alex"));
+
+ Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alex", "alex:bobby"),
+ getDerivedEdges());
+
+ if (!closeTransactor) {
+ tn.close();
+ }
+ }
+
+ @Test
+ public void testBasicRollforward1() throws Exception {
+ testBasicRollforward(false);
+ }
+
+ @Test
+ public void testBasicRollforward2() throws Exception {
+ testBasicRollforward(true);
+ }
+
+ private void testParallelScan(boolean closeTransactor) throws Exception {
+ Column crCol = new Column("stat", "completionRatio");
+
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("user5", crCol, "0.5");
+ tx.set("user6", crCol, "0.75");
+ tx.commit();
+ }
+
+ TransactorNode tn = partiallyCommit(tx -> {
+ // get multiple read locks with a parallel scan
+ Map<String, Map<Column, String>> ratios =
+ tx.withReadLock().gets(Arrays.asList("user5", "user6"), crCol);
+
+ double cr1 = Double.parseDouble(ratios.get("user5").get(crCol));
+ double cr2 = Double.parseDouble(ratios.get("user5").get(crCol));
+
+ tx.set("org1", crCol, (cr1 + cr2) / 2 + "");
+ }, false, closeTransactor);
+
+ retryTwice(tx -> {
+ Map<String, Map<Column, String>> ratios = tx.gets(Arrays.asList("user5", "user6"), crCol);
+
+ tx.set("user5", crCol, "0.51");
+ tx.set("user6", crCol, "0.76");
+ });
+
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertNull(snap.gets("org1", crCol));
+ Assert.assertEquals("0.51", snap.gets("user5", crCol));
+ Assert.assertEquals("0.76", snap.gets("user6", crCol));
+ }
+
+ if (!closeTransactor) {
+ tn.close();
+ }
+ }
+
+ @Test
+ public void testParallelScan1() throws Exception {
+ testParallelScan(true);
+ }
+
+ @Test
+ public void testParallelScan2() throws Exception {
+ testParallelScan(false);
+ }
+
+ private void testParallelScanRC(boolean closeTransactor) throws Exception {
+ // currently get w/ RowColumn has a different code path than other gets that take multiple rows
+ // and columns
+
+ Column crCol = new Column("stat", "completionRatio");
+
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("user5", crCol, "0.5");
+ tx.set("user6", crCol, "0.75");
+ tx.commit();
+ }
+
+ TransactorNode tn = partiallyCommit(tx -> {
+ // get multiple read locks with a parallel scan
+ Map<RowColumn, String> ratios = tx.withReadLock()
+ .gets(Arrays.asList(new RowColumn("user5", crCol), new RowColumn("user6", crCol)));
+
+
+ double cr1 = Double.parseDouble(ratios.get(new RowColumn("user5", crCol)));
+ double cr2 = Double.parseDouble(ratios.get(new RowColumn("user6", crCol)));
+
+ tx.set("org1", crCol, (cr1 + cr2) / 2 + "");
+ }, false, true);
+
+ retryTwice(tx -> {
+ Map<RowColumn, String> ratios =
+ tx.gets(Arrays.asList(new RowColumn("user5", crCol), new RowColumn("user6", crCol)));
+
+ tx.set("user5", crCol, "0.51");
+ tx.set("user6", crCol, "0.76");
+ });
+
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertNull(snap.gets("org1", crCol));
+ Assert.assertEquals("0.51", snap.gets("user5", crCol));
+ Assert.assertEquals("0.76", snap.gets("user6", crCol));
+ }
+
+ if (!closeTransactor) {
+ tn.close();
+ }
+ }
+
+ @Test
+ public void testParallelScanRC1() throws Exception {
+ testParallelScanRC(true);
+ }
+
+ @Test
+ public void testParallelScanRC2() throws Exception {
+ testParallelScanRC(false);
+ }
+
+ private void testWriteWoRead(boolean commitPrimary, boolean closeTransactor) throws Exception {
+ // Reads can cause locks to be recovered. This test the case of a transactions that only does a
+ // write to a field that has an open read lock.
+
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("r1", new Column("f1", "q1"), "v1");
+ tx.set("r2", new Column("f1", "q1"), "v2");
+ tx.commit();
+ }
+
+ TransactorNode transactor = partiallyCommit(tx -> {
+ String v1 = tx.withReadLock().gets("r1", new Column("f1", "q1"));
+ String v2 = tx.withReadLock().gets("r2", new Column("f1", "q1"));
+
+ tx.set("r3", new Column("f1", "qa"), v1 + ":" + v2);
+ }, commitPrimary, closeTransactor);
+
+ // TODO open an issue... does not really need to retry in this case
+ retryOnce(tx -> {
+ tx.set("r1", new Column("f1", "q1"), "v3");
+ });
+
+ try (Transaction tx = client.newTransaction()) {
+ if (commitPrimary) {
+ Assert.assertEquals("v1:v2", tx.gets("r3", new Column("f1", "qa")));
+ } else {
+ Assert.assertNull(tx.gets("r3", new Column("f1", "qa")));
+ }
+ Assert.assertEquals("v3", tx.gets("r1", new Column("f1", "q1")));
+ }
+
+ if (!closeTransactor) {
+ transactor.close();
+ }
+ }
+
+ @Test
+ public void testWriteWoRead1() throws Exception {
+ testWriteWoRead(false, false);
+ }
+
+ @Test
+ public void testWriteWoRead2() throws Exception {
+ testWriteWoRead(false, true);
+ }
+
+ @Test
+ public void testWriteWoRead3() throws Exception {
+ testWriteWoRead(true, false);
+ }
+
+ @Test
+ public void testWriteWoRead4() throws Exception {
+ testWriteWoRead(true, true);
+ }
+
+ private int countInTable(String str) throws TableNotFoundException {
+ int count = 0;
+ Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+ for (String e : Iterables.transform(scanner, FluoFormatter::toString)) {
+ if (e.contains(str)) {
+ count++;
+ }
+ }
+
+ return count;
+ }
+
+ @Test
+ public void testFailDeletesReadLocks() throws Exception {
+ try (Transaction tx = client.newTransaction()) {
+ for (int i = 0; i < 20; i++) {
+ tx.set("r-" + i, new Column("f1", "q1"), "" + i);
+ }
+
+ tx.commit();
+ }
+
+ long startTs = 0;
+
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r-5", new Column("f1", "q1"), "9");
+ try (Transaction tx2 = client.newTransaction()) {
+ tx1.commit();
+
+ int sum = 0;
+ for (int i = 0; i < 20; i++) {
+ sum += Integer.parseInt(tx2.withReadLock().gets("r-" + i, new Column("f1", "q1")));
+ }
+
+ tx2.set("sum1", new Column("f", "s"), "" + sum);
+ startTs = tx2.getStartTimestamp();
+ tx2.commit();
+ Assert.fail();
+ } catch (CommitException e) {
+
+ }
+ }
+
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertNull(snapshot.gets("sum1", new Column("f", "s")));
+ }
+
+ // ensure the failed tx deleted its read locks....
+ Assert.assertEquals(19, countInTable(startTs + "-RLOCK"));
+ Assert.assertEquals(19, countInTable(startTs + "-DEL_RLOCK"));
+
+ try (Transaction tx = client.newTransaction()) {
+ int sum = 0;
+ for (int i = 0; i < 20; i++) {
+ sum += Integer.parseInt(tx.withReadLock().gets("r-" + i, new Column("f1", "q1")));
+ }
+
+ tx.set("sum1", new Column("f", "s"), "" + sum);
+ tx.commit();
+ }
+
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("194", snapshot.gets("sum1", new Column("f", "s")));
+ }
+ }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockIT.java
new file mode 100644
index 0000000..b88d66e
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ReadLockIT.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.impl;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.accumulo.format.FluoFormatter;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.api.exceptions.CommitException;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.apache.fluo.integration.TestTransaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+
+public class ReadLockIT extends ITBaseImpl {
+
+ private static final Column ALIAS_COL = new Column("node", "alias");
+
+ private void addEdge(String node1, String node2) {
+ try (Transaction tx = client.newTransaction()) {
+ addEdge(tx, node1, node2);
+ tx.commit();
+ }
+ }
+
+ static void addEdge(TransactionBase tx, String node1, String node2) {
+ Map<String, Map<Column, String>> aliases =
+ tx.withReadLock().gets(asList("r:" + node1, "r:" + node2), ALIAS_COL);
+ String alias1 = aliases.get("r:" + node1).get(ALIAS_COL);
+ String alias2 = aliases.get("r:" + node2).get(ALIAS_COL);
+
+ addEdge(tx, node1, node2, alias1, alias2);
+ }
+
+ static void addEdge(TransactionBase tx, String node1, String node2, String alias1,
+ String alias2) {
+ tx.set("d:" + alias1 + ":" + alias2, new Column("edge", node1 + ":" + node2), "");
+ tx.set("d:" + alias2 + ":" + alias1, new Column("edge", node2 + ":" + node1), "");
+
+ tx.set("r:" + node1 + ":" + node2, new Column("edge", "aliases"), alias1 + ":" + alias2);
+ tx.set("r:" + node2 + ":" + node1, new Column("edge", "aliases"), alias2 + ":" + alias1);
+ }
+
+ static void setAlias(TransactionBase tx, String node, String alias) {
+ tx.set("r:" + node, new Column("node", "alias"), alias);
+
+ CellScanner scanner = tx.scanner().over(Span.prefix("r:" + node + ":")).build();
+
+ for (RowColumnValue rcv : scanner) {
+ String otherNode = rcv.getsRow().split(":")[2];
+ String[] aliases = rcv.getsValue().split(":");
+
+ if (aliases.length != 2) {
+ throw new RuntimeException("bad alias " + rcv);
+ }
+
+ if (!alias.equals(aliases[0])) {
+ tx.delete("d:" + aliases[0] + ":" + aliases[1], new Column("edge", node + ":" + otherNode));
+ tx.delete("d:" + aliases[1] + ":" + aliases[0], new Column("edge", otherNode + ":" + node));
+
+ addEdge(tx, node, otherNode, alias, aliases[1]);
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentReadlocks() throws Exception {
+
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bob");
+ setAlias(tx, "node2", "joe");
+ setAlias(tx, "node3", "alice");
+ tx.commit();
+ }
+
+
+ TestTransaction tx1 = new TestTransaction(env);
+ setAlias(tx1, "node2", "jojo");
+
+ TestTransaction tx2 = new TestTransaction(env);
+ TestTransaction tx3 = new TestTransaction(env);
+
+ addEdge(tx2, "node1", "node2");
+ addEdge(tx3, "node1", "node3");
+
+ tx2.commit();
+ tx2.close();
+
+ tx3.commit();
+ tx3.close();
+
+ Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob", "bob:alice", "alice:bob"),
+ getDerivedEdges());
+
+ try {
+ tx1.commit();
+ Assert.fail("Expected exception");
+ } catch (CommitException ce) {
+ }
+
+ Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob", "bob:alice", "alice:bob"),
+ getDerivedEdges());
+ }
+
+ @Test
+ public void testWriteCausesReadLockToFail() throws Exception {
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bob");
+ setAlias(tx, "node2", "joe");
+ tx.commit();
+ }
+
+ TestTransaction tx1 = new TestTransaction(env);
+ setAlias(tx1, "node2", "jojo");
+
+ TestTransaction tx2 = new TestTransaction(env);
+
+ addEdge(tx2, "node1", "node2");
+
+ tx1.commit();
+ tx1.close();
+
+ Assert.assertEquals(0, getDerivedEdges().size());
+
+ try {
+ tx2.commit();
+ Assert.fail("Expected exception");
+ } catch (CommitException ce) {
+ }
+
+ // ensure the failed read lock on node1 is cleaned up
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "fred");
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ addEdge(tx, "node1", "node2");
+ tx.commit();
+ }
+
+ Assert.assertEquals(ImmutableSet.of("fred:jojo", "jojo:fred"), getDerivedEdges());
+ }
+
+ private void dumpRow(String row, Consumer<String> out) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
+ scanner.setRange(Range.exact(row));
+ for (Entry<Key, Value> entry : scanner) {
+ out.accept(FluoFormatter.toString(entry));
+ }
+ }
+
+ private void dumpTable(Consumer<String> out) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
+ for (Entry<Key, Value> entry : scanner) {
+ out.accept(FluoFormatter.toString(entry));
+ }
+ }
+
+ @Test
+ public void testWriteAfterReadLock() throws Exception {
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bob");
+ setAlias(tx, "node2", "joe");
+ setAlias(tx, "node3", "alice");
+
+ tx.commit();
+ }
+
+ addEdge("node1", "node2");
+ Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob"), getDerivedEdges());
+
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bobby");
+ tx.commit();
+ }
+
+ addEdge("node1", "node3");
+ Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alice", "alice:bobby"),
+ getDerivedEdges());
+ }
+
+ @Test
+ public void testRandom() throws Exception {
+ int numNodes = 100;
+ int numEdges = 1000;
+ int numAliasChanges = 25;
+
+ Random rand = new Random();
+
+ Map<String, String> nodes = new HashMap<>();
+ while (nodes.size() < numNodes) {
+ nodes.put(String.format("n-%09d", rand.nextInt(1000000000)),
+ String.format("a-%09d", rand.nextInt(1000000000)));
+ }
+
+ List<String> nodesList = new ArrayList<>(nodes.keySet());
+ Set<String> edges = new HashSet<>();
+ while (edges.size() < numEdges) {
+ String n1 = nodesList.get(rand.nextInt(nodesList.size()));
+ String n2 = nodesList.get(rand.nextInt(nodesList.size()));
+ if (n1.equals(n2) || edges.contains(n2 + ":" + n1)) {
+ continue;
+ }
+
+ edges.add(n1 + ":" + n2);
+ }
+
+ try (LoaderExecutor le = client.newLoaderExecutor()) {
+ for (Entry<String, String> entry : nodes.entrySet()) {
+ le.execute((tx, ctx) -> setAlias(tx, entry.getKey(), entry.getValue()));
+ }
+ }
+
+ List<Loader> loadOps = new ArrayList<>();
+ for (String edge : edges) {
+ String[] enodes = edge.split(":");
+ loadOps.add((tx, ctx) -> {
+ try {
+ addEdge(tx, enodes[0], enodes[1]);
+ } catch (NullPointerException e) {
+ // TOOD remove after finding bug
+ System.out.println(
+ " en0 " + enodes[0] + " en1 " + enodes[1] + " start ts " + tx.getStartTimestamp());
+ dumpRow("r:" + enodes[0], System.out::println);
+ dumpRow("r:" + enodes[1], System.out::println);
+ throw e;
+ }
+ });
+ }
+
+ Map<String, String> changes = new HashMap<>();
+ while (changes.size() < numAliasChanges) {
+ String node = nodesList.get(rand.nextInt(nodesList.size()));
+ String alias = String.format("a-%09d", rand.nextInt(1000000000));
+ changes.put(node, alias);
+ }
+
+ Map<String, String> nodes2 = new HashMap<>(nodes);
+ nodes2.putAll(changes);
+
+ changes.forEach((node, alias) -> {
+ loadOps.add((tx, ctx) -> setAlias(tx, node, alias));
+ });
+
+ Collections.shuffle(loadOps, rand);
+
+ FluoConfiguration conf = new FluoConfiguration(config);
+ conf.setLoaderThreads(20);
+ try (FluoClient client = FluoFactory.newClient(conf);
+ LoaderExecutor le = client.newLoaderExecutor()) {
+ loadOps.forEach(loader -> le.execute(loader));
+ }
+
+ Set<String> expectedEdges = new HashSet<>();
+ for (String edge : edges) {
+ String[] enodes = edge.split(":");
+ String alias1 = nodes2.get(enodes[0]);
+ String alias2 = nodes2.get(enodes[1]);
+
+ expectedEdges.add(alias1 + ":" + alias2);
+ expectedEdges.add(alias2 + ":" + alias1);
+ }
+
+ Set<String> actualEdges = getDerivedEdges();
+
+ if (!expectedEdges.equals(actualEdges)) {
+ Path dumpFile = Paths.get("target/ReadLockIT.txt");
+
+ try (BufferedWriter writer = Files.newBufferedWriter(dumpFile)) {
+
+ Consumer<String> out = s -> {
+ try {
+ writer.append(s);
+ writer.append("\n");
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+
+
+ writer.append("Alias changes : \n");
+ Maps.difference(nodes, nodes2).entriesDiffering()
+ .forEach((k, v) -> out.accept(k + " " + v));
+
+ writer.append("expected - actual : \n");
+ Sets.difference(expectedEdges, actualEdges).forEach(out);
+ writer.append("\n");
+
+ writer.append("actual - expected : \n");
+ Sets.difference(actualEdges, expectedEdges).forEach(out);
+ writer.append("\n");
+
+ printSnapshot(out);
+ dumpTable(out);
+ }
+ Assert.fail("Did not produce expected graph, dumped info to " + dumpFile);
+ }
+ }
+
+ private Set<String> getDerivedEdges() {
+ Set<String> derivedEdges = new HashSet<>();
+ try (Snapshot snap = client.newSnapshot()) {
+ snap.scanner().over(Span.prefix("d:")).build().stream().map(RowColumnValue::getsRow)
+ .map(r -> r.substring(2)).forEach(derivedEdges::add);
+ }
+ return derivedEdges;
+ }
+
+ @Test(expected = AlreadySetException.class)
+ public void testReadAndWriteLockInSameTx() {
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bob");
+ setAlias(tx, "node2", "joe");
+
+
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ setAlias(tx, "node1", "bobby");
+ // tries to get a read lock on node1 in same tx
+ addEdge(tx, "node1", "node2");
+ }
+ }
+
+ @Test(expected = AlreadySetException.class)
+ public void testReadAndDeleteInSameTx() {
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("123456", new Column("f", "q"), "abc");
+ tx.commit();
+ }
+
+ try (Transaction tx = client.newTransaction()) {
+ tx.delete("123456", new Column("f", "q"));
+ // should fail here because already have write lock
+ String val = tx.withReadLock().gets("123456", new Column("f", "q"));
+ tx.set("123457", new Column("f", "q"), val + "7");
+ tx.commit();
+ }
+ }
+
+ private static final Column c1 = new Column("f1", "q1");
+ private static final Column c2 = new Column("f1", "q2");
+ private static final Column invCol = new Column("f1", "inv");
+
+ final private void ensureReadLocksSet(Consumer<TransactionBase> readLockOperation) {
+
+ try (Transaction txi = client.newTransaction()) {
+ txi.set("test1", c1, "45");
+ txi.set("test1", c2, "90");
+ txi.set("test2", c1, "30");
+ txi.set("test2", c2, "60");
+ txi.commit();
+ }
+
+
+ List<Consumer<TransactionBase>> writeLockOperations = ImmutableList.of(txw -> {
+ txw.set("test1", c1, "47");
+ }, txw -> {
+ txw.set("test1", c2, "94");
+ }, txw -> {
+ txw.set("test2", c1, "37");
+ }, txw -> {
+ txw.set("test2", c2, "74");
+ });
+
+ List<Transaction> writeTxs = new ArrayList<>();
+ for (Consumer<TransactionBase> wop : writeLockOperations) {
+ Transaction wtx = client.newTransaction();
+ wop.accept(wtx);
+ writeTxs.add(wtx);
+ }
+
+ try (Transaction txr = client.newTransaction()) {
+ readLockOperation.accept(txr);
+ txr.commit();
+ }
+
+ for (Transaction wtx : writeTxs) {
+ try {
+ wtx.commit();
+ Assert.fail();
+ } catch (CommitException ce) {
+ }
+ }
+
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("45", snap.gets("test1", c1));
+ Assert.assertEquals("90", snap.gets("test1", c2));
+ Assert.assertEquals("test1", snap.gets("45", invCol));
+ Assert.assertEquals("test1", snap.gets("90", invCol));
+ Assert.assertEquals("30", snap.gets("test2", c1));
+ Assert.assertEquals("60", snap.gets("test2", c2));
+ Assert.assertEquals("test2", snap.gets("30", invCol));
+ Assert.assertEquals("test2", snap.gets("60", invCol));
+ }
+ }
+
+ @Test
+ public void testGet() {
+ ensureReadLocksSet(txr -> {
+ // ensure this operation sets two read locks
+ SnapshotBase rlSnap = txr.withReadLock();
+
+ txr.set(rlSnap.gets("test1", c1), invCol, "test1");
+ txr.set(rlSnap.gets("test1", c2), invCol, "test1");
+ txr.set(rlSnap.gets("test2", c1), invCol, "test2");
+ txr.set(rlSnap.gets("test2", c2), invCol, "test2");
+ });
+ }
+
+ @Test
+ public void testGetColumns() {
+ ensureReadLocksSet(txr -> {
+ // ensure this operation sets two read locks
+ Map<Column, String> vals = txr.withReadLock().gets("test1", c1, c2);
+ txr.set(vals.get(c1), invCol, "test1");
+ txr.set(vals.get(c2), invCol, "test1");
+ vals = txr.withReadLock().gets("test2", c1, c2);
+ txr.set(vals.get(c1), invCol, "test2");
+ txr.set(vals.get(c2), invCol, "test2");
+ });
+ }
+
+ @Test
+ public void testGetRowsColumns() {
+ ensureReadLocksSet(txr -> {
+ // ensure this operation sets two read locks
+ Map<String, Map<Column, String>> vals =
+ txr.withReadLock().gets(ImmutableList.of("test1", "test2"), c1, c2);
+ txr.set(vals.get("test1").get(c1), invCol, "test1");
+ txr.set(vals.get("test1").get(c2), invCol, "test1");
+ txr.set(vals.get("test2").get(c1), invCol, "test2");
+ txr.set(vals.get("test2").get(c2), invCol, "test2");
+ });
+ }
+
+ @Test
+ public void testGetRowColumns() {
+ ensureReadLocksSet(txr -> {
+ // ensure this operation sets two read locks
+ Map<RowColumn, String> vals =
+ txr.withReadLock().gets(ImmutableList.of(new RowColumn("test1", c1),
+ new RowColumn("test1", c2), new RowColumn("test2", c1), new RowColumn("test2", c2)));
+ txr.set(vals.get(new RowColumn("test1", c1)), invCol, "test1");
+ txr.set(vals.get(new RowColumn("test1", c2)), invCol, "test1");
+ txr.set(vals.get(new RowColumn("test2", c1)), invCol, "test2");
+ txr.set(vals.get(new RowColumn("test2", c2)), invCol, "test2");
+ });
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScan() {
+ try (Transaction tx = client.newTransaction()) {
+ tx.withReadLock().scanner().build();
+ }
+ }
+
+ @Test
+ public void testOnlyReadLocks() {
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("r1", new Column("q1", "f1"), "v1");
+ tx.set("r2", new Column("q1", "f1"), "v2");
+ tx.commit();
+ }
+
+ try (Transaction tx1 = client.newTransaction()) {
+ try (Transaction tx2 = client.newTransaction()) {
+ String v1 = tx2.withReadLock().gets("r1", new Column("q1", "f1"));
+ String v2 = tx2.withReadLock().gets("r2", new Column("q1", "f1"));
+
+ Assert.assertEquals("v1", v1);
+ Assert.assertEquals("v2", v2);
+
+ // commit should be a no-op because only read locks
+ tx2.commit();
+ }
+
+ tx1.set("r1", new Column("q1", "f1"), "v3");
+ tx1.set("r2", new Column("q1", "f1"), "v4");
+
+ // should not collide with read locks
+ tx1.commit();
+ }
+ try (Snapshot snap = client.newSnapshot()) {
+ String v1 = snap.gets("r1", new Column("q1", "f1"));
+ String v2 = snap.gets("r2", new Column("q1", "f1"));
+
+ Assert.assertEquals("v3", v1);
+ Assert.assertEquals("v4", v2);
+ }
+ }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
index e5a8d02..0a6bc99 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
@@ -70,7 +70,7 @@
try (Snapshot snap = client.newSnapshot()) {
HashSet<RowColumnValue> actual = new HashSet<>();
- Iterables.addAll(actual, snap.scanner().over("r2").build());
+ Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).build());
Assert.assertEquals(expectedR2, actual);
actual.clear();
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 1736248..0835310 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -18,6 +18,7 @@
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
@@ -592,4 +593,72 @@
Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
}
+
+ @Test
+ public void testReadLocks() {
+ Column c1 = new Column("f1", "q1");
+ Column c2 = new Column("f1", "q2");
+
+ try (Transaction tx = client.newTransaction()) {
+ tx.set("r1", c1, "v1");
+ tx.set("r1", c2, "v2");
+ tx.set("r2", c1, "v3");
+ tx.set("r2", c2, "v4");
+ tx.commit();
+ }
+
+ Logger logger = Logger.getLogger("fluo.tx");
+
+ StringWriter writer = new StringWriter();
+ WriterAppender appender =
+ new WriterAppender(new PatternLayout("%d{ISO8601} [%-8c{2}] %-5p: %m%n"), writer);
+
+ Level level = logger.getLevel();
+ boolean additivity = logger.getAdditivity();
+
+ try {
+ logger.setLevel(Level.TRACE);
+ logger.setAdditivity(false);
+ logger.addAppender(appender);
+
+ try (Transaction tx = client.newTransaction()) {
+ Assert.assertEquals("v1", tx.withReadLock().gets("r1", c1));
+ Assert.assertEquals(ImmutableMap.of(c1, "v3", c2, "v4"),
+ tx.withReadLock().gets("r2", c1, c2));
+ Assert.assertEquals(ImmutableMap.of(new RowColumn("r1", c2), "v2"),
+ tx.withReadLock().gets(Arrays.asList(new RowColumn("r1", c2))));
+ Map<String, Map<Column, String>> expected = new HashMap<>();
+ expected.computeIfAbsent("r1", k -> new HashMap<>()).put(c1, "v1");
+ expected.computeIfAbsent("r2", k -> new HashMap<>()).put(c1, "v3");
+ Map<String, Map<Column, String>> actual =
+ tx.withReadLock().gets(Arrays.asList("r1", "r2"), ImmutableSet.of(c1));
+ Assert.assertEquals(expected, actual);
+ tx.set("r3", c1, "345");
+ tx.commit();
+ }
+
+ } finally {
+ logger.removeAppender(appender);
+ logger.setAdditivity(additivity);
+ logger.setLevel(level);
+ }
+
+ String origLogMsgs = writer.toString();
+ String logMsgs = origLogMsgs.replace('\n', ' ');
+
+ String pattern = "";
+
+ pattern += ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+ pattern += ".*txid: \\1 \\QwithReadLock().get(r1, f1 q1 ) -> v1\\E";
+ pattern +=
+ ".*txid: \\1 \\QwithReadLock().get(r2, [f1 q1 , f1 q2 ]) -> [f1 q1 =v3, f1 q2 =v4]\\E";
+ pattern += ".*txid: \\1 \\QwithReadLock().get([r1 f1 q2 ]) -> [r1 f1 q2 =v2]\\E";
+ pattern +=
+ ".*txid: \\1 \\QwithReadLock().get([r1, r2], [f1 q1 ]) -> [r1=[f1 q1 =v1], r2=[f1 q1 =v3]]\\E";
+ pattern += ".*txid: \\1 \\Qset(r3, f1 q1 , 345)\\E";
+ pattern += ".*txid: \\1 \\Qcommit()\\E -> SUCCESSFUL commitTs: \\d+";
+ pattern += ".*txid: \\1 \\Qclose()\\E.*";
+
+ Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
+ }
}