blob: fc553f5065d1a01047a8dca7ee4ad486ed430c35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import java.util.Iterator;
import java.util.Map.Entry;
import com.google.common.collect.Iterables;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.integration.BankUtil;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* Tests GarbageCollectionIterator class
*/
public class GarbageCollectionIteratorIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
private void waitForGcTime(long expectedTime) throws Exception {
env.getSharedResources().getTimestampTracker().updateZkNode();
long oldestTs = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
while (oldestTs < expectedTime) {
Thread.sleep(500);
oldestTs = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
}
}
public void testVerifyAfterGC() throws Exception {
TestTransaction tx1 = new TestTransaction(env);
BankUtil.setBalance(tx1, "bob", 10);
BankUtil.setBalance(tx1, "joe", 20);
BankUtil.setBalance(tx1, "jill", 60);
tx1.done();
BankUtil.transfer(env, "joe", "jill", 1);
BankUtil.transfer(env, "joe", "bob", 1);
BankUtil.transfer(env, "bob", "joe", 2);
BankUtil.transfer(env, "jill", "joe", 2);
TestTransaction tx2 = new TestTransaction(env);
waitForGcTime(tx2.getStartTimestamp());
long oldestTs = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
Assert.assertEquals(tx2.getStartTs(), oldestTs);
// Force a garbage collection
conn.tableOperations().flush(table, null, null, true);
verify(oldestTs);
TestTransaction tx3 = new TestTransaction(env);
Assert.assertEquals(9, BankUtil.getBalance(tx3, "bob"));
Assert.assertEquals(22, BankUtil.getBalance(tx3, "joe"));
Assert.assertEquals(59, BankUtil.getBalance(tx3, "jill"));
tx3.done();
tx2.done();
}
public void testDeletedDataIsDropped() throws Exception {
final Column docUri = new Column("doc", "uri");
TestTransaction tx1 = new TestTransaction(env);
tx1.set("001", docUri, "file:///abc.txt");
tx1.done();
TestTransaction tx2 = new TestTransaction(env);
TestTransaction tx3 = new TestTransaction(env);
tx3.delete("001", docUri);
tx3.done();
TestTransaction tx4 = new TestTransaction(env);
waitForGcTime(tx2.getStartTimestamp());
// Force a garbage collection
conn.tableOperations().compact(table, null, null, true, true);
Assert.assertEquals("file:///abc.txt", tx2.gets("001", docUri));
tx2.done();
Assert.assertNull(tx4.gets("001", docUri));
waitForGcTime(tx4.getStartTimestamp());
conn.tableOperations().compact(table, null, null, true, true);
Assert.assertNull(tx4.gets("001", docUri));
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
Assert.assertEquals(0, Iterables.size(scanner));
tx4.done();
}
public void testRolledBackDataIsDropped() throws Exception {
Column col1 = new Column("fam1", "q1");
Column col2 = new Column("fam1", "q2");
TransactorNode t2 = new TransactorNode(env);
TestTransaction tx2 = new TestTransaction(env, t2);
for (int r = 0; r < 10; r++) {
tx2.set(r + "", col1, "1" + r + "0");
tx2.set(r + "", col2, "1" + r + "1");
}
CommitData cd = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd));
t2.close();
// rollback data
TestTransaction tx3 = new TestTransaction(env, t2);
for (int r = 0; r < 10; r++) {
tx3.gets(r + "", col1);
tx3.gets(r + "", col2);
}
tx3.done();
Assert.assertEquals(20, countInTable("-LOCK"));
Assert.assertEquals(20, countInTable("-DEL_LOCK"));
Assert.assertEquals(20, countInTable("-DATA"));
// flush should drop locks and data
conn.tableOperations().flush(table, null, null, true);
Assert.assertEquals(0, countInTable("-LOCK"));
Assert.assertEquals(20, countInTable("-DEL_LOCK"));
Assert.assertEquals(0, countInTable("-DATA"));
// compact should drop all del locks except for primary
conn.tableOperations().compact(table, null, null, true, true);
Assert.assertEquals(0, countInTable("-LOCK"));
Assert.assertEquals(1, countInTable("-DEL_LOCK"));
Assert.assertEquals(0, countInTable("-DATA"));
}
private void increment(TransactionBase tx, String row, Column col) {
int count = Integer.parseInt(tx.gets(row, col, "0"));
tx.set(row, col, count + 1 + "");
}
@Test(timeout = 60000)
public void testReadLocks() throws Exception {
final Column altIdCol = new Column("info", "altId");
TestTransaction tx1 = new TestTransaction(env);
for (int i = 0; i < 10; i++) {
tx1.set(String.format("n:%03d", i), altIdCol, "" + (19 * (1 + i)));
}
tx1.done();
for (int i = 0; i < 50; i++) {
String row = String.format("n:%03d", i % 10);
TestTransaction tx = new TestTransaction(env);
String altId = tx.withReadLock().gets(row, altIdCol);
increment(tx, "a:" + altId, new Column("count", row));
tx.done();
}
Assert.assertEquals(50, countInTable("-DEL_RLOCK"));
Assert.assertEquals(50, countInTable("-RLOCK"));
TestTransaction tx2 = new TestTransaction(env);
for (int i = 0; i < 10; i++) {
String row = String.format("n:%03d", i);
String newAltId = (13 * (i + 1)) + "";
String currAltId = tx2.gets(row, altIdCol);
tx2.set(row, altIdCol, newAltId);
String count = tx2.gets("a:" + currAltId, new Column("count", row));
tx2.set("a:" + newAltId, new Column("count", row), count);
tx2.delete("a:" + currAltId, new Column("count", row));
}
tx2.done();
// all read locks should be garbage collected because of the writes after the read locks
conn.tableOperations().compact(table, null, null, true, true);
Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
Assert.assertEquals(0, countInTable("-RLOCK"));
for (int i = 0; i < 50; i++) {
String row = String.format("n:%03d", i % 10);
TestTransaction tx = new TestTransaction(env);
String altId = tx.withReadLock().gets(row, altIdCol);
increment(tx, "a:" + altId, new Column("count", row));
tx.done();
}
TestTransaction tx3 = new TestTransaction(env);
for (int i = 0; i < 10; i++) {
String row = String.format("n:%03d", i);
String currAltId = tx3.gets(row, altIdCol);
Assert.assertEquals("10", tx3.gets("a:" + currAltId, new Column("count", row)));
}
tx3.done();
waitForGcTime(tx3.getStartTimestamp());
conn.tableOperations().compact(table, null, null, true, true);
// all read locks older than GC time should be dropped
Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
Assert.assertEquals(0, countInTable("-RLOCK"));
}
private int countInTable(String str) throws TableNotFoundException {
int count = 0;
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
for (String e : Iterables.transform(scanner, FluoFormatter::toString)) {
if (e.contains(str)) {
count++;
}
}
return count;
}
@Test
public void testGetOldestTimestamp() throws Exception {
// we are expecting an error in this test
Level curLevel = Logger.getLogger(ZookeeperUtil.class).getLevel();
Logger.getLogger(ZookeeperUtil.class).setLevel(Level.FATAL);
// verify that oracle initial current ts
Assert.assertEquals(0, ZookeeperUtil.getGcTimestamp(config.getAppZookeepers()));
// delete the oracle current timestamp path
env.getSharedResources().getCurator().delete().forPath(ZookeeperPath.ORACLE_GC_TIMESTAMP);
// verify that oldest possible is returned
Assert.assertEquals(ZookeeperUtil.OLDEST_POSSIBLE,
ZookeeperUtil.getGcTimestamp(config.getAppZookeepers()));
// set level back
Logger.getLogger(ZookeeperUtil.class).setLevel(curLevel);
}
/**
* Verifies that older versions of data are newer than given timestamp
*
*/
private void verify(long oldestTs) throws TableNotFoundException {
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
Iterator<Entry<Key, Value>> iter = scanner.iterator();
Entry<Key, Value> prev = null;
int numWrites = 0;
while (iter.hasNext()) {
Entry<Key, Value> entry = iter.next();
if ((prev == null)
|| !prev.getKey().equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
numWrites = 0;
}
long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
if (colType == ColumnConstants.WRITE_PREFIX) {
numWrites++;
if (numWrites > 1) {
Assert.assertTrue("Extra write had ts " + ts + " < " + oldestTs, ts >= oldestTs);
}
}
prev = entry;
}
}
}