blob: b88d66e4aa4b5043ecca05a4761a7bbd758a3094 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.junit.Assert;
import org.junit.Test;
import static java.util.Arrays.asList;
public class ReadLockIT extends ITBaseImpl {
private static final Column ALIAS_COL = new Column("node", "alias");
private void addEdge(String node1, String node2) {
try (Transaction tx = client.newTransaction()) {
addEdge(tx, node1, node2);
tx.commit();
}
}
static void addEdge(TransactionBase tx, String node1, String node2) {
Map<String, Map<Column, String>> aliases =
tx.withReadLock().gets(asList("r:" + node1, "r:" + node2), ALIAS_COL);
String alias1 = aliases.get("r:" + node1).get(ALIAS_COL);
String alias2 = aliases.get("r:" + node2).get(ALIAS_COL);
addEdge(tx, node1, node2, alias1, alias2);
}
static void addEdge(TransactionBase tx, String node1, String node2, String alias1,
String alias2) {
tx.set("d:" + alias1 + ":" + alias2, new Column("edge", node1 + ":" + node2), "");
tx.set("d:" + alias2 + ":" + alias1, new Column("edge", node2 + ":" + node1), "");
tx.set("r:" + node1 + ":" + node2, new Column("edge", "aliases"), alias1 + ":" + alias2);
tx.set("r:" + node2 + ":" + node1, new Column("edge", "aliases"), alias2 + ":" + alias1);
}
static void setAlias(TransactionBase tx, String node, String alias) {
tx.set("r:" + node, new Column("node", "alias"), alias);
CellScanner scanner = tx.scanner().over(Span.prefix("r:" + node + ":")).build();
for (RowColumnValue rcv : scanner) {
String otherNode = rcv.getsRow().split(":")[2];
String[] aliases = rcv.getsValue().split(":");
if (aliases.length != 2) {
throw new RuntimeException("bad alias " + rcv);
}
if (!alias.equals(aliases[0])) {
tx.delete("d:" + aliases[0] + ":" + aliases[1], new Column("edge", node + ":" + otherNode));
tx.delete("d:" + aliases[1] + ":" + aliases[0], new Column("edge", otherNode + ":" + node));
addEdge(tx, node, otherNode, alias, aliases[1]);
}
}
}
@Test
public void testConcurrentReadlocks() throws Exception {
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "bob");
setAlias(tx, "node2", "joe");
setAlias(tx, "node3", "alice");
tx.commit();
}
TestTransaction tx1 = new TestTransaction(env);
setAlias(tx1, "node2", "jojo");
TestTransaction tx2 = new TestTransaction(env);
TestTransaction tx3 = new TestTransaction(env);
addEdge(tx2, "node1", "node2");
addEdge(tx3, "node1", "node3");
tx2.commit();
tx2.close();
tx3.commit();
tx3.close();
Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob", "bob:alice", "alice:bob"),
getDerivedEdges());
try {
tx1.commit();
Assert.fail("Expected exception");
} catch (CommitException ce) {
}
Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob", "bob:alice", "alice:bob"),
getDerivedEdges());
}
@Test
public void testWriteCausesReadLockToFail() throws Exception {
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "bob");
setAlias(tx, "node2", "joe");
tx.commit();
}
TestTransaction tx1 = new TestTransaction(env);
setAlias(tx1, "node2", "jojo");
TestTransaction tx2 = new TestTransaction(env);
addEdge(tx2, "node1", "node2");
tx1.commit();
tx1.close();
Assert.assertEquals(0, getDerivedEdges().size());
try {
tx2.commit();
Assert.fail("Expected exception");
} catch (CommitException ce) {
}
// ensure the failed read lock on node1 is cleaned up
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "fred");
tx.commit();
}
try (Transaction tx = client.newTransaction()) {
addEdge(tx, "node1", "node2");
tx.commit();
}
Assert.assertEquals(ImmutableSet.of("fred:jojo", "jojo:fred"), getDerivedEdges());
}
private void dumpRow(String row, Consumer<String> out) throws TableNotFoundException {
Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
scanner.setRange(Range.exact(row));
for (Entry<Key, Value> entry : scanner) {
out.accept(FluoFormatter.toString(entry));
}
}
private void dumpTable(Consumer<String> out) throws TableNotFoundException {
Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);
for (Entry<Key, Value> entry : scanner) {
out.accept(FluoFormatter.toString(entry));
}
}
@Test
public void testWriteAfterReadLock() throws Exception {
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "bob");
setAlias(tx, "node2", "joe");
setAlias(tx, "node3", "alice");
tx.commit();
}
addEdge("node1", "node2");
Assert.assertEquals(ImmutableSet.of("bob:joe", "joe:bob"), getDerivedEdges());
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "bobby");
tx.commit();
}
addEdge("node1", "node3");
Assert.assertEquals(ImmutableSet.of("bobby:joe", "joe:bobby", "bobby:alice", "alice:bobby"),
getDerivedEdges());
}
@Test
public void testRandom() throws Exception {
int numNodes = 100;
int numEdges = 1000;
int numAliasChanges = 25;
Random rand = new Random();
Map<String, String> nodes = new HashMap<>();
while (nodes.size() < numNodes) {
nodes.put(String.format("n-%09d", rand.nextInt(1000000000)),
String.format("a-%09d", rand.nextInt(1000000000)));
}
List<String> nodesList = new ArrayList<>(nodes.keySet());
Set<String> edges = new HashSet<>();
while (edges.size() < numEdges) {
String n1 = nodesList.get(rand.nextInt(nodesList.size()));
String n2 = nodesList.get(rand.nextInt(nodesList.size()));
if (n1.equals(n2) || edges.contains(n2 + ":" + n1)) {
continue;
}
edges.add(n1 + ":" + n2);
}
try (LoaderExecutor le = client.newLoaderExecutor()) {
for (Entry<String, String> entry : nodes.entrySet()) {
le.execute((tx, ctx) -> setAlias(tx, entry.getKey(), entry.getValue()));
}
}
List<Loader> loadOps = new ArrayList<>();
for (String edge : edges) {
String[] enodes = edge.split(":");
loadOps.add((tx, ctx) -> {
try {
addEdge(tx, enodes[0], enodes[1]);
} catch (NullPointerException e) {
// TOOD remove after finding bug
System.out.println(
" en0 " + enodes[0] + " en1 " + enodes[1] + " start ts " + tx.getStartTimestamp());
dumpRow("r:" + enodes[0], System.out::println);
dumpRow("r:" + enodes[1], System.out::println);
throw e;
}
});
}
Map<String, String> changes = new HashMap<>();
while (changes.size() < numAliasChanges) {
String node = nodesList.get(rand.nextInt(nodesList.size()));
String alias = String.format("a-%09d", rand.nextInt(1000000000));
changes.put(node, alias);
}
Map<String, String> nodes2 = new HashMap<>(nodes);
nodes2.putAll(changes);
changes.forEach((node, alias) -> {
loadOps.add((tx, ctx) -> setAlias(tx, node, alias));
});
Collections.shuffle(loadOps, rand);
FluoConfiguration conf = new FluoConfiguration(config);
conf.setLoaderThreads(20);
try (FluoClient client = FluoFactory.newClient(conf);
LoaderExecutor le = client.newLoaderExecutor()) {
loadOps.forEach(loader -> le.execute(loader));
}
Set<String> expectedEdges = new HashSet<>();
for (String edge : edges) {
String[] enodes = edge.split(":");
String alias1 = nodes2.get(enodes[0]);
String alias2 = nodes2.get(enodes[1]);
expectedEdges.add(alias1 + ":" + alias2);
expectedEdges.add(alias2 + ":" + alias1);
}
Set<String> actualEdges = getDerivedEdges();
if (!expectedEdges.equals(actualEdges)) {
Path dumpFile = Paths.get("target/ReadLockIT.txt");
try (BufferedWriter writer = Files.newBufferedWriter(dumpFile)) {
Consumer<String> out = s -> {
try {
writer.append(s);
writer.append("\n");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
writer.append("Alias changes : \n");
Maps.difference(nodes, nodes2).entriesDiffering()
.forEach((k, v) -> out.accept(k + " " + v));
writer.append("expected - actual : \n");
Sets.difference(expectedEdges, actualEdges).forEach(out);
writer.append("\n");
writer.append("actual - expected : \n");
Sets.difference(actualEdges, expectedEdges).forEach(out);
writer.append("\n");
printSnapshot(out);
dumpTable(out);
}
Assert.fail("Did not produce expected graph, dumped info to " + dumpFile);
}
}
private Set<String> getDerivedEdges() {
Set<String> derivedEdges = new HashSet<>();
try (Snapshot snap = client.newSnapshot()) {
snap.scanner().over(Span.prefix("d:")).build().stream().map(RowColumnValue::getsRow)
.map(r -> r.substring(2)).forEach(derivedEdges::add);
}
return derivedEdges;
}
@Test(expected = AlreadySetException.class)
public void testReadAndWriteLockInSameTx() {
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "bob");
setAlias(tx, "node2", "joe");
tx.commit();
}
try (Transaction tx = client.newTransaction()) {
setAlias(tx, "node1", "bobby");
// tries to get a read lock on node1 in same tx
addEdge(tx, "node1", "node2");
}
}
@Test(expected = AlreadySetException.class)
public void testReadAndDeleteInSameTx() {
try (Transaction tx = client.newTransaction()) {
tx.set("123456", new Column("f", "q"), "abc");
tx.commit();
}
try (Transaction tx = client.newTransaction()) {
tx.delete("123456", new Column("f", "q"));
// should fail here because already have write lock
String val = tx.withReadLock().gets("123456", new Column("f", "q"));
tx.set("123457", new Column("f", "q"), val + "7");
tx.commit();
}
}
private static final Column c1 = new Column("f1", "q1");
private static final Column c2 = new Column("f1", "q2");
private static final Column invCol = new Column("f1", "inv");
final private void ensureReadLocksSet(Consumer<TransactionBase> readLockOperation) {
try (Transaction txi = client.newTransaction()) {
txi.set("test1", c1, "45");
txi.set("test1", c2, "90");
txi.set("test2", c1, "30");
txi.set("test2", c2, "60");
txi.commit();
}
List<Consumer<TransactionBase>> writeLockOperations = ImmutableList.of(txw -> {
txw.set("test1", c1, "47");
}, txw -> {
txw.set("test1", c2, "94");
}, txw -> {
txw.set("test2", c1, "37");
}, txw -> {
txw.set("test2", c2, "74");
});
List<Transaction> writeTxs = new ArrayList<>();
for (Consumer<TransactionBase> wop : writeLockOperations) {
Transaction wtx = client.newTransaction();
wop.accept(wtx);
writeTxs.add(wtx);
}
try (Transaction txr = client.newTransaction()) {
readLockOperation.accept(txr);
txr.commit();
}
for (Transaction wtx : writeTxs) {
try {
wtx.commit();
Assert.fail();
} catch (CommitException ce) {
}
}
try (Snapshot snap = client.newSnapshot()) {
Assert.assertEquals("45", snap.gets("test1", c1));
Assert.assertEquals("90", snap.gets("test1", c2));
Assert.assertEquals("test1", snap.gets("45", invCol));
Assert.assertEquals("test1", snap.gets("90", invCol));
Assert.assertEquals("30", snap.gets("test2", c1));
Assert.assertEquals("60", snap.gets("test2", c2));
Assert.assertEquals("test2", snap.gets("30", invCol));
Assert.assertEquals("test2", snap.gets("60", invCol));
}
}
@Test
public void testGet() {
ensureReadLocksSet(txr -> {
// ensure this operation sets two read locks
SnapshotBase rlSnap = txr.withReadLock();
txr.set(rlSnap.gets("test1", c1), invCol, "test1");
txr.set(rlSnap.gets("test1", c2), invCol, "test1");
txr.set(rlSnap.gets("test2", c1), invCol, "test2");
txr.set(rlSnap.gets("test2", c2), invCol, "test2");
});
}
@Test
public void testGetColumns() {
ensureReadLocksSet(txr -> {
// ensure this operation sets two read locks
Map<Column, String> vals = txr.withReadLock().gets("test1", c1, c2);
txr.set(vals.get(c1), invCol, "test1");
txr.set(vals.get(c2), invCol, "test1");
vals = txr.withReadLock().gets("test2", c1, c2);
txr.set(vals.get(c1), invCol, "test2");
txr.set(vals.get(c2), invCol, "test2");
});
}
@Test
public void testGetRowsColumns() {
ensureReadLocksSet(txr -> {
// ensure this operation sets two read locks
Map<String, Map<Column, String>> vals =
txr.withReadLock().gets(ImmutableList.of("test1", "test2"), c1, c2);
txr.set(vals.get("test1").get(c1), invCol, "test1");
txr.set(vals.get("test1").get(c2), invCol, "test1");
txr.set(vals.get("test2").get(c1), invCol, "test2");
txr.set(vals.get("test2").get(c2), invCol, "test2");
});
}
@Test
public void testGetRowColumns() {
ensureReadLocksSet(txr -> {
// ensure this operation sets two read locks
Map<RowColumn, String> vals =
txr.withReadLock().gets(ImmutableList.of(new RowColumn("test1", c1),
new RowColumn("test1", c2), new RowColumn("test2", c1), new RowColumn("test2", c2)));
txr.set(vals.get(new RowColumn("test1", c1)), invCol, "test1");
txr.set(vals.get(new RowColumn("test1", c2)), invCol, "test1");
txr.set(vals.get(new RowColumn("test2", c1)), invCol, "test2");
txr.set(vals.get(new RowColumn("test2", c2)), invCol, "test2");
});
}
@Test(expected = UnsupportedOperationException.class)
public void testScan() {
try (Transaction tx = client.newTransaction()) {
tx.withReadLock().scanner().build();
}
}
@Test
public void testOnlyReadLocks() {
try (Transaction tx = client.newTransaction()) {
tx.set("r1", new Column("q1", "f1"), "v1");
tx.set("r2", new Column("q1", "f1"), "v2");
tx.commit();
}
try (Transaction tx1 = client.newTransaction()) {
try (Transaction tx2 = client.newTransaction()) {
String v1 = tx2.withReadLock().gets("r1", new Column("q1", "f1"));
String v2 = tx2.withReadLock().gets("r2", new Column("q1", "f1"));
Assert.assertEquals("v1", v1);
Assert.assertEquals("v2", v2);
// commit should be a no-op because only read locks
tx2.commit();
}
tx1.set("r1", new Column("q1", "f1"), "v3");
tx1.set("r2", new Column("q1", "f1"), "v4");
// should not collide with read locks
tx1.commit();
}
try (Snapshot snap = client.newSnapshot()) {
String v1 = snap.gets("r1", new Column("q1", "f1"));
String v2 = snap.gets("r2", new Column("q1", "f1"));
Assert.assertEquals("v3", v1);
Assert.assertEquals("v4", v2);
}
}
}