blob: 87106007cd55f8b1f41c61319d657a5e5ee4cd15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.observer.Observer.NotificationType;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.apache.fluo.integration.BankUtil.BALANCE;
public class FluoIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
public static class FluoITObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
or.forColumn(BALANCE, NotificationType.STRONG).useObserver((tx, row, col) -> {
Assert.fail();
});
}
}
@Override
protected Class<? extends ObserverProvider> getObserverProviderClass() {
return FluoITObserverProvider.class;
}
@Test
public void testFluoFactory() throws Exception {
try (FluoAdmin admin = FluoFactory.newAdmin(config)) {
Assert.assertNotNull(admin);
}
try (FluoClient client = FluoFactory.newClient(config)) {
Assert.assertNotNull(client);
Assert.assertNotNull(client.newLoaderExecutor());
try (Snapshot s = client.newSnapshot()) {
Assert.assertNotNull(s);
s.get(Bytes.of("test"), new Column(Bytes.of("cf"), Bytes.of("cq")));
}
}
}
@Test
public void testOverlap1() throws Exception {
// test transactions that overlap reads and both attempt to write
// TX1 starts
// TX2 starts
// TX1 reads/writes
// TX2 reads/writes
// TX2 commits -- succeeds
// TX1 commits -- fails
final TestTransaction tx = new TestTransaction(env);
tx.set("bob", BALANCE, "10");
tx.set("joe", BALANCE, "20");
tx.set("jill", BALANCE, "60");
tx.done();
final TestTransaction tx1 = new TestTransaction(env);
Assert.assertEquals("10", tx1.gets("bob", BALANCE));
Assert.assertEquals("20", tx1.gets("joe", BALANCE));
TestUtil.increment(tx1, "bob", BALANCE, -5);
TestUtil.increment(tx1, "joe", BALANCE, 5);
final TestTransaction tx2 = new TestTransaction(env);
Assert.assertEquals("10", tx2.gets("bob", BALANCE));
Assert.assertEquals("60", tx2.gets("jill", BALANCE));
TestUtil.increment(tx2, "bob", BALANCE, -5);
TestUtil.increment(tx2, "jill", BALANCE, 5);
tx2.done();
assertCommitFails(tx1);
final TestTransaction tx3 = new TestTransaction(env);
Assert.assertEquals("5", tx3.gets("bob", BALANCE));
Assert.assertEquals("20", tx3.gets("joe", BALANCE));
Assert.assertEquals("65", tx3.gets("jill", BALANCE));
tx3.done();
}
private void assertCommitFails(TestTransaction tx) {
try {
tx.done();
Assert.fail();
} catch (CommitException ce) {
// expected
}
}
private void assertAAck(TestTransaction tx) {
try {
tx.done();
Assert.fail();
} catch (AlreadyAcknowledgedException ce) {
// expected
}
}
@Test
public void testSnapshots() throws Exception {
// test the following case
// TX1 starts
// TX2 starts
// TX2 reads/writes
// TX2 commits
// TX1 reads -- should not see TX2 writes
final TestTransaction tx = new TestTransaction(env);
tx.set("bob", BALANCE, "10");
tx.set("joe", BALANCE, "20");
tx.set("jill", BALANCE, "60");
tx.set("jane", BALANCE, "0");
tx.done();
final TestTransaction tx1 = new TestTransaction(env);
final TestTransaction tx2 = new TestTransaction(env);
TestUtil.increment(tx2, "bob", BALANCE, -5);
TestUtil.increment(tx2, "joe", BALANCE, -5);
TestUtil.increment(tx2, "jill", BALANCE, 10);
Assert.assertEquals("10", tx1.gets("bob", BALANCE));
tx2.done();
final TestTransaction txd = new TestTransaction(env);
txd.delete("jane", BALANCE);
txd.done();
Assert.assertEquals("20", tx1.gets("joe", BALANCE));
Assert.assertEquals("60", tx1.gets("jill", BALANCE));
Assert.assertEquals("0", tx1.gets("jane", BALANCE));
tx1.set("bob", BALANCE, "5");
tx1.set("joe", BALANCE, "25");
assertCommitFails(tx1);
final TestTransaction tx3 = new TestTransaction(env);
final TestTransaction tx4 = new TestTransaction(env);
tx4.set("jane", BALANCE, "3");
tx4.done();
Assert.assertEquals("5", tx3.gets("bob", BALANCE));
Assert.assertEquals("15", tx3.gets("joe", BALANCE));
Assert.assertEquals("70", tx3.gets("jill", BALANCE));
Assert.assertNull(tx3.gets("jane", BALANCE));
tx3.done();
final TestTransaction tx5 = new TestTransaction(env);
Assert.assertEquals("5", tx5.gets("bob", BALANCE));
Assert.assertEquals("15", tx5.gets("joe", BALANCE));
Assert.assertEquals("70", tx5.gets("jill", BALANCE));
Assert.assertEquals("3", tx5.gets("jane", BALANCE));
tx5.done();
}
@Test
public void testAck() throws Exception {
// when two transactions run against the same observed column, only one should commit
final TestTransaction tx = new TestTransaction(env);
tx.set("bob", BALANCE, "10");
tx.set("joe", BALANCE, "20");
tx.set("jill", BALANCE, "60");
tx.done();
final TestTransaction tx1 = new TestTransaction(env, "joe", BALANCE);
tx1.gets("joe", BALANCE);
tx1.set("jill", BALANCE, "61");
final TestTransaction tx2 = new TestTransaction(env, "joe", BALANCE);
tx2.gets("joe", BALANCE);
tx2.set("bob", BALANCE, "11");
tx1.done();
assertAAck(tx2);
final TestTransaction tx3 = new TestTransaction(env);
Assert.assertEquals("10", tx3.gets("bob", BALANCE));
Assert.assertEquals("20", tx3.gets("joe", BALANCE));
Assert.assertEquals("61", tx3.gets("jill", BALANCE));
// update joe, so it can be acknowledged again
tx3.set("joe", BALANCE, "21");
tx3.done();
final TestTransaction tx4 = new TestTransaction(env, "joe", BALANCE);
tx4.gets("joe", BALANCE);
tx4.set("jill", BALANCE, "62");
final TestTransaction tx5 = new TestTransaction(env, "joe", BALANCE);
tx5.gets("joe", BALANCE);
tx5.set("bob", BALANCE, "11");
final TestTransaction tx7 = new TestTransaction(env, "joe", BALANCE);
// make the 2nd transaction to start commit 1st
tx5.done();
assertAAck(tx4);
final TestTransaction tx6 = new TestTransaction(env);
Assert.assertEquals("11", tx6.gets("bob", BALANCE));
Assert.assertEquals("21", tx6.gets("joe", BALANCE));
Assert.assertEquals("61", tx6.gets("jill", BALANCE));
tx6.done();
tx7.gets("joe", BALANCE);
tx7.set("bob", BALANCE, "15");
tx7.set("jill", BALANCE, "60");
assertAAck(tx7);
final TestTransaction tx8 = new TestTransaction(env);
Assert.assertEquals("11", tx8.gets("bob", BALANCE));
Assert.assertEquals("21", tx8.gets("joe", BALANCE));
Assert.assertEquals("61", tx8.gets("jill", BALANCE));
tx8.done();
}
@Test
public void testAck2() throws Exception {
final TestTransaction tx = new TestTransaction(env);
final Column addrCol = new Column("account", "addr");
tx.set("bob", BALANCE, "10");
tx.set("joe", BALANCE, "20");
tx.set("jill", BALANCE, "60");
tx.done();
final TestTransaction tx1 = new TestTransaction(env, "bob", BALANCE);
final TestTransaction tx2 = new TestTransaction(env, "bob", BALANCE);
final TestTransaction tx3 = new TestTransaction(env, "bob", BALANCE);
tx1.gets("bob", BALANCE);
tx2.gets("bob", BALANCE);
tx1.gets("bob", addrCol);
tx2.gets("bob", addrCol);
tx1.set("bob", addrCol, "1 loop pl");
tx2.set("bob", addrCol, "1 loop pl");
// this test overlaps the commits of two transactions w/ the same trigger
CommitData cd = tx1.createCommitData();
Assert.assertTrue(tx1.preCommit(cd));
assertCommitFails(tx2);
Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
Assert.assertTrue(tx1.commitPrimaryColumn(cd, commitTs));
tx1.finishCommit(cd, commitTs);
tx1.close();
tx3.set("bob", addrCol, "2 loop pl");
assertAAck(tx3);
}
@Test
public void testAck3() throws Exception {
final TestTransaction tx = new TestTransaction(env);
tx.set("bob", BALANCE, "10");
tx.set("joe", BALANCE, "20");
tx.set("jill", BALANCE, "60");
tx.done();
long notTS1 = TestTransaction.getNotificationTS(env, "bob", BALANCE);
// this transaction should create a second notification
final TestTransaction tx1 = new TestTransaction(env);
tx1.set("bob", BALANCE, "11");
tx1.done();
long notTS2 = TestTransaction.getNotificationTS(env, "bob", BALANCE);
Assert.assertTrue(notTS1 < notTS2);
// even though there were two notifications and TX is using 1st notification TS... only 1st TX
// should execute
// google paper calls this message collapsing
final TestTransaction tx3 = new TestTransaction(env, "bob", BALANCE, notTS1);
final TestTransaction tx2 = new TestTransaction(env, "bob", BALANCE, notTS1);
Assert.assertEquals("11", tx2.gets("bob", BALANCE));
tx2.done();
Assert.assertEquals("11", tx3.gets("bob", BALANCE));
assertAAck(tx3);
final TestTransaction tx4 = new TestTransaction(env, "bob", BALANCE, notTS2);
Assert.assertEquals("11", tx4.gets("bob", BALANCE));
assertAAck(tx4);
}
@Test
public void testWriteObserved() throws Exception {
// setting an acknowledged observed column in a transaction should not affect acknowledged
// status
final TestTransaction tx = new TestTransaction(env);
tx.set("bob", BALANCE, "10");
tx.set("joe", BALANCE, "20");
tx.set("jill", BALANCE, "60");
tx.done();
final TestTransaction tx2 = new TestTransaction(env, "joe", BALANCE);
tx2.gets("joe", BALANCE);
tx2.set("joe", BALANCE, "21");
tx2.set("bob", BALANCE, "11");
final TestTransaction tx1 = new TestTransaction(env, "joe", BALANCE);
tx1.gets("joe", BALANCE);
tx1.set("jill", BALANCE, "61");
tx1.done();
assertAAck(tx2);
final TestTransaction tx3 = new TestTransaction(env);
Assert.assertEquals("10", tx3.gets("bob", BALANCE));
Assert.assertEquals("20", tx3.gets("joe", BALANCE));
Assert.assertEquals("61", tx3.gets("jill", BALANCE));
tx3.done();
}
@Test
public void testVisibility() throws Exception {
aClient.securityOperations().changeUserAuthorizations(USER, new Authorizations("A", "B", "C"));
env.setAuthorizations(new Authorizations("A", "B", "C"));
Column balanceCol = new Column("account", "balance", "A|B");
final TestTransaction tx = new TestTransaction(env);
tx.set("bob", balanceCol, "10");
tx.set("joe", balanceCol, "20");
tx.set("jill", balanceCol, "60");
tx.done();
FluoConfiguration fc = new FluoConfiguration(config);
Environment env2 = new Environment(fc);
env2.setAuthorizations(new Authorizations("B"));
final TestTransaction tx2 = new TestTransaction(env2);
Assert.assertEquals("10", tx2.gets("bob", balanceCol));
Assert.assertEquals("20", tx2.gets("joe", balanceCol));
Assert.assertEquals("60", tx2.gets("jill", balanceCol));
tx2.done();
env2.close();
Environment env3 = new Environment(fc);
env3.setAuthorizations(new Authorizations("C"));
final TestTransaction tx3 = new TestTransaction(env3);
Assert.assertNull(tx3.gets("bob", balanceCol));
Assert.assertNull(tx3.gets("joe", balanceCol));
Assert.assertNull(tx3.gets("jill", balanceCol));
tx3.done();
env3.close();
}
@Test
public void testRange() throws Exception {
// setting an acknowledged observed column in a transaction should not affect acknowledged
// status
final TestTransaction tx = new TestTransaction(env);
tx.set("d00001", new Column("data", "content"),
"blah blah, blah http://a.com. Blah blah http://b.com. Blah http://c.com");
tx.set("d00001", new Column("outlink", "http://a.com"), "");
tx.set("d00001", new Column("outlink", "http://b.com"), "");
tx.set("d00001", new Column("outlink", "http://c.com"), "");
tx.set("d00002", new Column("data", "content"),
"blah blah, blah http://d.com. Blah blah http://e.com. Blah http://c.com");
tx.set("d00002", new Column("outlink", "http://d.com"), "");
tx.set("d00002", new Column("outlink", "http://e.com"), "");
tx.set("d00002", new Column("outlink", "http://c.com"), "");
tx.done();
final TestTransaction tx2 = new TestTransaction(env);
final TestTransaction tx3 = new TestTransaction(env);
tx3.set("d00001", new Column("data", "content"),
"blah blah, blah http://a.com. Blah http://c.com . Blah http://z.com");
tx3.set("d00001", new Column("outlink", "http://a.com"), "");
tx3.delete("d00001", new Column("outlink", "http://b.com"));
tx3.set("d00001", new Column("outlink", "http://c.com"), "");
tx3.set("d00001", new Column("outlink", "http://z.com"), "");
tx3.done();
HashSet<Column> columns = new HashSet<>();
CellScanner cellScanner = tx2.scanner().over("d00001").fetch(new Column("outlink")).build();
for (RowColumnValue rcv : cellScanner) {
columns.add(rcv.getColumn());
}
tx2.done();
HashSet<Column> expected = new HashSet<>();
expected.add(new Column("outlink", "http://a.com"));
expected.add(new Column("outlink", "http://b.com"));
expected.add(new Column("outlink", "http://c.com"));
Assert.assertEquals(expected, columns);
final TestTransaction tx4 = new TestTransaction(env);
columns.clear();
cellScanner = tx4.scanner().over("d00001").fetch(new Column("outlink")).build();
for (RowColumnValue rcv : cellScanner) {
columns.add(rcv.getColumn());
}
expected.add(new Column("outlink", "http://z.com"));
expected.remove(new Column("outlink", "http://b.com"));
Assert.assertEquals(expected, columns);
tx4.done();
}
@Test
public void testStringMethods() {
final TestTransaction tx = new TestTransaction(env);
Column ccol = new Column("doc", "content");
Column tcol = new Column("doc", "time");
tx.set("d:0001", ccol, "abc def");
tx.set("d:0001", tcol, "45");
tx.set("d:0002", ccol, "neb feg");
tx.set("d:0002", tcol, "97");
tx.set("d:0003", ccol, "xyz abc");
tx.set("d:0003", tcol, "42");
tx.done();
final TestTransaction tx2 = new TestTransaction(env);
Map<String, Map<Column, String>> map1 =
tx2.gets(Arrays.asList("d:0001", "d:0002"), Collections.singleton(ccol));
Map<String, ImmutableMap<Column, String>> expected1 = ImmutableMap.of("d:0001",
ImmutableMap.of(ccol, "abc def"), "d:0002", ImmutableMap.of(ccol, "neb feg"));
Assert.assertEquals(expected1, map1);
Assert.assertEquals("45", tx2.gets("d:0001", tcol));
Assert.assertEquals("xyz abc", tx2.gets("d:0003", ccol));
Map<Column, String> map2 = tx2.gets("d:0002", ImmutableSet.of(ccol, tcol));
Map<Column, String> expected2 = ImmutableMap.of(ccol, "neb feg", tcol, "97");
Assert.assertEquals(expected2, map2);
tx2.delete("d:0003", ccol);
tx2.delete("d:0003", tcol);
tx2.done();
final TestTransaction tx3 = new TestTransaction(env);
Assert.assertNull(tx3.gets("d:0003", ccol));
Assert.assertNull(tx3.gets("d:0003", tcol));
tx3.done();
}
}