blob: 31f4bdd365c5d06c94dac6275c3b63e19819573d [file] [log] [blame]
/*
* Copyright © 2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.tephra.hbase98;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionConflictException;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionSystemClient;
import co.cask.tephra.TxConstants;
import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
import co.cask.tephra.inmemory.InMemoryTxSystemClient;
import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.InMemoryTransactionStateStorage;
import co.cask.tephra.persist.TransactionStateStorage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for TransactionAwareHTables.
*/
public class TransactionAwareHTableTest {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
private static HBaseTestingUtility testUtil;
private static HBaseAdmin hBaseAdmin;
private static TransactionStateStorage txStateStorage;
private static TransactionManager txManager;
private static Configuration conf;
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
private static final byte[] family = Bytes.toBytes("f1");
private static final byte[] family2 = Bytes.toBytes("f2");
private static final byte[] qualifier = Bytes.toBytes("col1");
private static final byte[] qualifier2 = Bytes.toBytes("col2");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] row2 = Bytes.toBytes("row2");
private static final byte[] row3 = Bytes.toBytes("row3");
private static final byte[] row4 = Bytes.toBytes("row4");
private static final byte[] value = Bytes.toBytes("value");
private static final byte[] value2 = Bytes.toBytes("value2");
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
testUtil = new HBaseTestingUtility(conf);
testUtil.startMiniCluster();
conf = testUtil.getConfiguration();
hBaseAdmin = testUtil.getHBaseAdmin();
txStateStorage = new InMemoryTransactionStateStorage();
txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
txManager.startAndWait();
}
@AfterClass
public static void shutdownAfterClass() throws Exception {
testUtil.shutdownMiniCluster();
hBaseAdmin.close();
}
@Before
public void setupBeforeTest() throws Exception {
HTable hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family});
transactionAwareHTable = new TransactionAwareHTable(hTable);
transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable);
}
@After
public void shutdownAfterTest() throws IOException {
hBaseAdmin.disableTable(TestBytes.table);
hBaseAdmin.deleteTable(TestBytes.table);
}
private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
HTableDescriptor desc = getTableDescriptor(tableName, columnFamilies);
desc.addCoprocessor(TransactionProcessor.class.getName());
return doCreateTable(tableName, desc);
}
private HTable createNonTxTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
HTableDescriptor desc = getTableDescriptor(tableName, columnFamilies);
return doCreateTable(tableName, desc);
}
private HTable doCreateTable(byte[] tableName, HTableDescriptor desc) throws IOException, InterruptedException {
hBaseAdmin.createTable(desc);
testUtil.waitTableAvailable(tableName, 5000);
return new HTable(testUtil.getConfiguration(), tableName);
}
private HTableDescriptor getTableDescriptor(byte[] tableName, byte[][] columnFamilies) {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : columnFamilies) {
HColumnDescriptor columnDesc = new HColumnDescriptor(family);
columnDesc.setMaxVersions(Integer.MAX_VALUE);
desc.addFamily(columnDesc);
}
return desc;
}
/**
* Test transactional put and get requests.
*
* @throws Exception
*/
@Test
public void testValidTransactionalPutAndGet() throws Exception {
transactionContext.start();
Put put = new Put(TestBytes.row);
put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
transactionAwareHTable.put(put);
transactionContext.finish();
transactionContext.start();
Result result = transactionAwareHTable.get(new Get(TestBytes.row));
transactionContext.finish();
byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
assertArrayEquals(TestBytes.value, value);
}
/**
* Test aborted put requests, that must be rolled back.
*
* @throws Exception
*/
@Test
public void testAbortedTransactionPutAndGet() throws Exception {
transactionContext.start();
Put put = new Put(TestBytes.row);
put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
transactionAwareHTable.put(put);
transactionContext.abort();
transactionContext.start();
Result result = transactionAwareHTable.get(new Get(TestBytes.row));
transactionContext.finish();
byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
assertArrayEquals(value, null);
}
/**
* Test transactional delete operations.
*
* @throws Exception
*/
@Test
public void testValidTransactionalDelete() throws Exception {
HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
new byte[][]{TestBytes.family, TestBytes.family2});
try {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
Put put = new Put(TestBytes.row);
put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
txTable.put(put);
txContext.finish();
txContext.start();
Result result = txTable.get(new Get(TestBytes.row));
txContext.finish();
byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
assertArrayEquals(TestBytes.value, value);
value = result.getValue(TestBytes.family2, TestBytes.qualifier);
assertArrayEquals(TestBytes.value2, value);
// test full row delete
txContext.start();
Delete delete = new Delete(TestBytes.row);
txTable.delete(delete);
txContext.finish();
txContext.start();
result = txTable.get(new Get(TestBytes.row));
txContext.finish();
assertTrue(result.isEmpty());
// test column delete
// load 10 rows
txContext.start();
int rowCount = 10;
for (int i = 0; i < rowCount; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
for (int j = 0; j < 10; j++) {
p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value);
}
txTable.put(p);
}
txContext.finish();
// verify loaded rows
txContext.start();
for (int i = 0; i < rowCount; i++) {
Get g = new Get(Bytes.toBytes("row" + i));
Result r = txTable.get(g);
assertFalse(r.isEmpty());
for (int j = 0; j < 10; j++) {
assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j)));
}
}
txContext.finish();
// delete odds columns from odd rows and even columns from even rows
txContext.start();
for (int i = 0; i < rowCount; i++) {
Delete d = new Delete(Bytes.toBytes("row" + i));
for (int j = 0; j < 10; j++) {
if (i % 2 == j % 2) {
LOG.info("Deleting row={}, column={}", i, j);
d.deleteColumns(TestBytes.family, Bytes.toBytes(j));
}
}
txTable.delete(d);
}
txContext.finish();
// verify deleted columns
txContext.start();
for (int i = 0; i < rowCount; i++) {
Get g = new Get(Bytes.toBytes("row" + i));
Result r = txTable.get(g);
assertEquals(5, r.size());
for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) {
int col = Bytes.toInt(entry.getKey());
LOG.info("Got row={}, col={}", i, col);
// each row should only have the opposite mod (odd=even, even=odd)
assertNotEquals(i % 2, col % 2);
assertArrayEquals(TestBytes.value, entry.getValue());
}
}
txContext.finish();
// test family delete
// load 10 rows
txContext.start();
for (int i = 0; i < rowCount; i++) {
Put p = new Put(Bytes.toBytes("famrow" + i));
p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2);
txTable.put(p);
}
txContext.finish();
// verify all loaded rows
txContext.start();
for (int i = 0; i < rowCount; i++) {
Get g = new Get(Bytes.toBytes("famrow" + i));
Result r = txTable.get(g);
assertEquals(2, r.size());
assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
}
txContext.finish();
// delete family1 for even rows, family2 for odd rows
txContext.start();
for (int i = 0; i < rowCount; i++) {
Delete d = new Delete(Bytes.toBytes("famrow" + i));
d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2);
txTable.delete(d);
}
txContext.finish();
// verify deleted families
txContext.start();
for (int i = 0; i < rowCount; i++) {
Get g = new Get(Bytes.toBytes("famrow" + i));
Result r = txTable.get(g);
assertEquals(1, r.size());
if (i % 2 == 0) {
assertNull(r.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
} else {
assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2));
}
}
txContext.finish();
} finally {
hTable.close();
}
}
/**
* Test aborted transactional delete requests, that must be rolled back.
*
* @throws Exception
*/
@Test
public void testAbortedTransactionalDelete() throws Exception {
transactionContext.start();
Put put = new Put(TestBytes.row);
put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
transactionAwareHTable.put(put);
transactionContext.finish();
transactionContext.start();
Result result = transactionAwareHTable.get(new Get(TestBytes.row));
transactionContext.finish();
byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
assertArrayEquals(TestBytes.value, value);
transactionContext.start();
Delete delete = new Delete(TestBytes.row);
transactionAwareHTable.delete(delete);
transactionContext.abort();
transactionContext.start();
result = transactionAwareHTable.get(new Get(TestBytes.row));
transactionContext.finish();
value = result.getValue(TestBytes.family, TestBytes.qualifier);
assertArrayEquals(TestBytes.value, value);
}
@Test
public void testRowDelete() throws Exception {
HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW);
try {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
// Test 1: full row delete
txContext.start();
txTable.put(new Put(TestBytes.row)
.add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
txContext.finish();
txContext.start();
Get get = new Get(TestBytes.row);
Result result = txTable.get(get);
assertFalse(result.isEmpty());
assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2));
assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier));
assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2));
txContext.finish();
// delete entire row
txContext.start();
txTable.delete(new Delete(TestBytes.row));
txContext.finish();
// verify row is now empty
txContext.start();
result = txTable.get(new Get(TestBytes.row));
assertTrue(result.isEmpty());
// verify row is empty for explicit column retrieval
result = txTable.get(new Get(TestBytes.row)
.addColumn(TestBytes.family, TestBytes.qualifier)
.addFamily(TestBytes.family2));
assertTrue(result.isEmpty());
// verify row is empty for scan
ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row));
assertNull(scanner.next());
scanner.close();
// verify row is empty for scan with explicit column
scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2));
assertNull(scanner.next());
scanner.close();
txContext.finish();
// write swapped values to one column per family
txContext.start();
txTable.put(new Put(TestBytes.row)
.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value));
txContext.finish();
// verify new values appear
txContext.start();
result = txTable.get(new Get(TestBytes.row));
assertFalse(result.isEmpty());
assertEquals(2, result.size());
assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
scanner = txTable.getScanner(new Scan(TestBytes.row));
Result result1 = scanner.next();
assertNotNull(result1);
assertFalse(result1.isEmpty());
assertEquals(2, result1.size());
assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
scanner.close();
txContext.finish();
// Test 2: delete of first column family
txContext.start();
txTable.put(new Put(TestBytes.row2)
.add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
txContext.finish();
txContext.start();
txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
txContext.finish();
txContext.start();
Result fam1Result = txTable.get(new Get(TestBytes.row2));
assertFalse(fam1Result.isEmpty());
assertEquals(2, fam1Result.size());
assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier));
assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2));
txContext.finish();
// Test 3: delete of second column family
txContext.start();
txTable.put(new Put(TestBytes.row3)
.add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
txContext.finish();
txContext.start();
txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2));
txContext.finish();
txContext.start();
Result fam2Result = txTable.get(new Get(TestBytes.row3));
assertFalse(fam2Result.isEmpty());
assertEquals(2, fam2Result.size());
assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2));
txContext.finish();
// Test 4: delete specific rows in a range
txContext.start();
for (int i = 0; i < 10; i++) {
txTable.put(new Put(Bytes.toBytes("z" + i))
.add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i))
.add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i)));
}
txContext.finish();
txContext.start();
// delete odd rows
for (int i = 1; i < 10; i += 2) {
txTable.delete(new Delete(Bytes.toBytes("z" + i)));
}
txContext.finish();
txContext.start();
int cnt = 0;
ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0")));
Result res;
while ((res = zScanner.next()) != null) {
assertFalse(res.isEmpty());
assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow());
assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier));
assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2));
cnt += 2;
}
// Test 5: delete prior writes in the same transaction
txContext.start();
txTable.put(new Put(TestBytes.row4)
.add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
txTable.delete(new Delete(TestBytes.row4));
txContext.finish();
txContext.start();
Result row4Result = txTable.get(new Get(TestBytes.row4));
assertTrue(row4Result.isEmpty());
txContext.finish();
} finally {
txTable.close();
}
}
/**
* Expect an exception since a transaction hasn't been started.
*
* @throws Exception
*/
@Test(expected = IOException.class)
public void testTransactionlessFailure() throws Exception {
transactionAwareHTable.get(new Get(TestBytes.row));
}
/**
* Tests that each transaction can see its own persisted writes, while not seeing writes from other
* in-progress transactions.
*/
@Test
public void testReadYourWrites() throws Exception {
// In-progress tx1: started before our main transaction
HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
// In-progress tx2: started while our main transaction is running
HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
// create an in-progress write that should be ignored
byte[] col2 = Bytes.toBytes("col2");
inprogressTxContext1.start();
Put putCol2 = new Put(TestBytes.row);
byte[] valueCol2 = Bytes.toBytes("writing in progress");
putCol2.add(TestBytes.family, col2, valueCol2);
txHTable1.put(putCol2);
// start a tx and write a value to test reading in same tx
transactionContext.start();
Put put = new Put(TestBytes.row);
byte[] value = Bytes.toBytes("writing");
put.add(TestBytes.family, TestBytes.qualifier, value);
transactionAwareHTable.put(put);
// test that a write from a tx started after the first is not visible
inprogressTxContext2.start();
Put put2 = new Put(TestBytes.row);
byte[] value2 = Bytes.toBytes("writing2");
put2.add(TestBytes.family, TestBytes.qualifier, value2);
txHTable2.put(put2);
Get get = new Get(TestBytes.row);
Result row = transactionAwareHTable.get(get);
assertFalse(row.isEmpty());
byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
Assert.assertNotNull(col1Value);
Assert.assertArrayEquals(value, col1Value);
// write from in-progress transaction should not be visible
byte[] col2Value = row.getValue(TestBytes.family, col2);
assertNull(col2Value);
// commit in-progress transaction, should still not be visible
inprogressTxContext1.finish();
get = new Get(TestBytes.row);
row = transactionAwareHTable.get(get);
assertFalse(row.isEmpty());
col2Value = row.getValue(TestBytes.family, col2);
assertNull(col2Value);
transactionContext.finish();
inprogressTxContext2.abort();
}
@Test
public void testRowLevelConflictDetection() throws Exception {
TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
byte[] row1 = Bytes.toBytes("row1");
byte[] row2 = Bytes.toBytes("row2");
byte[] col1 = Bytes.toBytes("c1");
byte[] col2 = Bytes.toBytes("c2");
byte[] val1 = Bytes.toBytes("val1");
byte[] val2 = Bytes.toBytes("val2");
// test that concurrent writing to different rows succeeds
txContext1.start();
txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
txContext2.start();
txTable2.put(new Put(row2).add(TestBytes.family, col1, val2));
// should be no conflicts
txContext1.finish();
txContext2.finish();
transactionContext.start();
Result res = transactionAwareHTable.get(new Get(row1));
assertFalse(res.isEmpty());
Cell cell = res.getColumnLatestCell(TestBytes.family, col1);
assertNotNull(cell);
assertArrayEquals(val1, CellUtil.cloneValue(cell));
res = transactionAwareHTable.get(new Get(row2));
assertFalse(res.isEmpty());
cell = res.getColumnLatestCell(TestBytes.family, col1);
assertNotNull(cell);
assertArrayEquals(val2, CellUtil.cloneValue(cell));
transactionContext.finish();
// test that writing to different columns in the same row fails
txContext1.start();
txTable1.put(new Put(row1).add(TestBytes.family, col1, val2));
txContext2.start();
txTable2.put(new Put(row1).add(TestBytes.family, col2, val2));
txContext1.finish();
try {
txContext2.finish();
fail("txContext2 should have encountered a row-level conflict during commit");
} catch (TransactionConflictException tce) {
txContext2.abort();
}
transactionContext.start();
res = transactionAwareHTable.get(new Get(row1));
assertFalse(res.isEmpty());
cell = res.getColumnLatestCell(TestBytes.family, col1);
assertNotNull(cell);
// should now be val2
assertArrayEquals(val2, CellUtil.cloneValue(cell));
cell = res.getColumnLatestCell(TestBytes.family, col2);
// col2 should not be visible due to conflict
assertNull(cell);
transactionContext.finish();
// test that writing to the same column in the same row fails
txContext1.start();
txTable1.put(new Put(row2).add(TestBytes.family, col2, val1));
txContext2.start();
txTable2.put(new Put(row2).add(TestBytes.family, col2, val2));
txContext1.finish();
try {
txContext2.finish();
fail("txContext2 should have encountered a row and column level conflict during commit");
} catch (TransactionConflictException tce) {
txContext2.abort();
}
transactionContext.start();
res = transactionAwareHTable.get(new Get(row2));
assertFalse(res.isEmpty());
cell = res.getColumnLatestCell(TestBytes.family, col2);
assertNotNull(cell);
// should now be val1
assertArrayEquals(val1, CellUtil.cloneValue(cell));
transactionContext.finish();
// verify change set that is being reported only on rows
txContext1.start();
txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
txTable1.put(new Put(row2).add(TestBytes.family, col2, val2));
Collection<byte[]> changeSet = txTable1.getTxChanges();
assertNotNull(changeSet);
assertEquals(2, changeSet.size());
assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null)));
assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null)));
txContext1.finish();
}
@Test
public void testNoneLevelConflictDetection() throws Exception {
InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
// overlapping writes to the same row + column should not conflict
txContext1.start();
txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
// changes should not be visible yet
txContext2.start();
Result row = txTable2.get(new Get(TestBytes.row));
assertTrue(row.isEmpty());
txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
// both commits should succeed
txContext1.finish();
txContext2.finish();
txContext1.start();
row = txTable1.get(new Get(TestBytes.row));
assertFalse(row.isEmpty());
assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
txContext1.finish();
// transaction abort should still rollback changes
txContext1.start();
txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
txContext1.abort();
// changes to row2 should be rolled back
txContext2.start();
Result row2 = txTable2.get(new Get(TestBytes.row2));
assertTrue(row2.isEmpty());
txContext2.finish();
// transaction invalidate should still make changes invisible
txContext1.start();
Transaction tx1 = txContext1.getCurrentTransaction();
txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
assertNotNull(tx1);
txClient.invalidate(tx1.getWritePointer());
// changes to row2 should be rolled back
txContext2.start();
Result row3 = txTable2.get(new Get(TestBytes.row3));
assertTrue(row3.isEmpty());
txContext2.finish();
}
@Test
public void testCheckpoint() throws Exception {
// start a transaction, using checkpoints between writes
transactionContext.start();
transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
Transaction origTx = transactionContext.getCurrentTransaction();
transactionContext.checkpoint();
Transaction postCheckpointTx = transactionContext.getCurrentTransaction();
assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId());
assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer());
long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers();
assertEquals(1, checkpointPtrs.length);
assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]);
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
transactionContext.checkpoint();
Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction();
assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId());
assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer());
long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers();
assertEquals(2, checkpointPtrs2.length);
assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]);
assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]);
transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
// by default, all rows should be visible with Read-Your-Writes
verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
// when disabling current write pointer, only the previous checkpoints should be visible
transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
Get get = new Get(TestBytes.row);
verifyRow(transactionAwareHTable, get, TestBytes.value);
get = new Get(TestBytes.row2);
verifyRow(transactionAwareHTable, get, TestBytes.value2);
get = new Get(TestBytes.row3);
verifyRow(transactionAwareHTable, get, null);
// test scan results excluding current write pointer
Scan scan = new Scan();
ResultScanner scanner = transactionAwareHTable.getScanner(scan);
Result row = scanner.next();
assertNotNull(row);
assertArrayEquals(TestBytes.row, row.getRow());
assertEquals(1, row.size());
assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier));
row = scanner.next();
assertNotNull(row);
assertArrayEquals(TestBytes.row2, row.getRow());
assertEquals(1, row.size());
assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
row = scanner.next();
assertNull(row);
scanner.close();
transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
// check that writes are still not visible to other clients
TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
txContext2.start();
verifyRow(txTable2, TestBytes.row, null);
verifyRow(txTable2, TestBytes.row2, null);
verifyRow(txTable2, TestBytes.row3, null);
txContext2.finish();
// commit transaction, verify writes are visible
transactionContext.finish();
txContext2.start();
verifyRow(txTable2, TestBytes.row, TestBytes.value);
verifyRow(txTable2, TestBytes.row2, TestBytes.value2);
verifyRow(txTable2, TestBytes.row3, TestBytes.value);
txContext2.finish();
txTable2.close();
}
@Test
public void testCheckpointRollback() throws Exception {
// start a transaction, using checkpoints between writes
transactionContext.start();
transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.checkpoint();
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
transactionContext.checkpoint();
transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.abort();
transactionContext.start();
verifyRow(transactionAwareHTable, TestBytes.row, null);
verifyRow(transactionAwareHTable, TestBytes.row2, null);
verifyRow(transactionAwareHTable, TestBytes.row3, null);
Scan scan = new Scan();
ResultScanner scanner = transactionAwareHTable.getScanner(scan);
assertNull(scanner.next());
scanner.close();
transactionContext.finish();
}
@Test
public void testCheckpointInvalidate() throws Exception {
// start a transaction, using checkpoints between writes
transactionContext.start();
Transaction origTx = transactionContext.getCurrentTransaction();
transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.checkpoint();
Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
transactionContext.checkpoint();
Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
// check that writes are not visible
TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
txContext2.start();
Transaction newTx = txContext2.getCurrentTransaction();
// all 3 writes pointers from the previous transaction should now be excluded
assertTrue(newTx.isExcluded(origTx.getWritePointer()));
assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));
verifyRow(txTable2, TestBytes.row, null);
verifyRow(txTable2, TestBytes.row2, null);
verifyRow(txTable2, TestBytes.row3, null);
Scan scan = new Scan();
ResultScanner scanner = txTable2.getScanner(scan);
assertNull(scanner.next());
scanner.close();
txContext2.finish();
}
private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
verifyRow(table, new Get(rowkey), expectedValue);
}
private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
Result result = table.get(get);
if (expectedValue == null) {
assertTrue(result.isEmpty());
} else {
assertFalse(result.isEmpty());
if (get.hasFamilies()) {
byte[] family = get.getFamilyMap().keySet().iterator().next();
byte[] col = get.getFamilyMap().get(family).first();
assertArrayEquals(expectedValue, result.getValue(family, col));
} else {
assertArrayEquals(expectedValue, result.getValue(TestBytes.family, TestBytes.qualifier));
}
}
}
@Test
public void testExistingData() throws Exception {
byte[] row1 = Bytes.toBytes("row1");
byte[] row2 = Bytes.toBytes("row2");
byte[] row3 = Bytes.toBytes("row3");
byte[] col1 = Bytes.toBytes("c1");
byte[] col2 = Bytes.toBytes("c2");
byte[] val11 = Bytes.toBytes("val11");
byte[] val12 = Bytes.toBytes("val12");
byte[] val21 = Bytes.toBytes("val21");
byte[] val22 = Bytes.toBytes("val22");
byte[] val31 = Bytes.toBytes("val31");
long ttl = TimeUnit.DAYS.toMillis(6);
// Write some data to a non-transactional table
byte[] tableName = Bytes.toBytes("testExistingData");
try {
try (HTable noTxTable = createNonTxTable(tableName, new byte[][]{TestBytes.family})) {
long existingDataTimestamp1 = System.currentTimeMillis() - ttl / 2;
long existingDataTimestamp2 = System.currentTimeMillis() - ttl / 3;
noTxTable.put(new Put(row1).add(TestBytes.family, col1, existingDataTimestamp1, val11));
noTxTable.put(new Put(row1).add(TestBytes.family, col2, existingDataTimestamp2, val12));
noTxTable.put(new Put(row2).add(TestBytes.family, col1, existingDataTimestamp1, val21));
noTxTable.put(new Put(row2).add(TestBytes.family, col2, existingDataTimestamp2, val22));
noTxTable.flushCommits();
verifyRow(noTxTable, new Get(row1).addColumn(TestBytes.family, col1), val11);
verifyRow(noTxTable, new Get(row1).addColumn(TestBytes.family, col2), val12);
verifyRow(noTxTable, new Get(row2).addColumn(TestBytes.family, col1), val21);
verifyRow(noTxTable, new Get(row2).addColumn(TestBytes.family, col2), val22);
}
// Now try reading the data as a transactional table
// Attach co-processor to the table
HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName);
tableDescriptor.addCoprocessor(TransactionProcessor.class.getName());
hBaseAdmin.modifyTable(tableName, tableDescriptor);
setTtl(tableDescriptor, TestBytes.family, ttl);
InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
try (TransactionAwareHTable txTable = new TransactionAwareHTable(new HTable(conf, tableName))) {
TransactionContext txContext = new TransactionContext(txClient, txTable);
txContext.start();
txTable.put(new Put(row3).add(TestBytes.family, col1, val31));
txContext.finish();
txTable.flushCommits();
// Should be able to read existing data after major compaction
LOG.error("111111111 running major compaction");
majorCompact(tableDescriptor, TestBytes.family);
LOG.error("111111111 done running major compaction");
txContext = new TransactionContext(txClient, txTable);
txContext.start();
verifyRow(txTable, new Get(row3).addColumn(TestBytes.family, col1), val31);
verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col1), val11);
verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col2), val12);
verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col1), val21);
verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col2), val22);
txContext.finish();
setTtl(tableDescriptor, TestBytes.family, ttl / 2);
LOG.error("111111111 running major compaction");
majorCompact(tableDescriptor, TestBytes.family);
LOG.error("111111111 done running major compaction");
txContext = new TransactionContext(txClient, txTable);
txContext.start();
verifyRow(txTable, new Get(row3).addColumn(TestBytes.family, col1), val31);
verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col1), null);
verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col2), val12);
verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col1), null);
verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col2), val22);
txContext.finish();
setTtl(tableDescriptor, TestBytes.family, 1);
LOG.error("111111111 running major compaction");
majorCompact(tableDescriptor, TestBytes.family);
LOG.error("111111111 done running major compaction");
txContext = new TransactionContext(txClient, txTable);
txContext.start();
verifyRow(txTable, new Get(row3).addColumn(TestBytes.family, col1), null);
verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col1), null);
verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col2), null);
verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col1), null);
verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col2), null);
txContext.finish();
}
} finally {
if (hBaseAdmin.tableExists(tableName)) {
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
}
}
}
private void setTtl(HTableDescriptor tableDescriptor, byte[] family, long ttl) throws Exception {
HColumnDescriptor familyDescriptor = new HColumnDescriptor(family);
familyDescriptor.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
tableDescriptor.addFamily(familyDescriptor);
hBaseAdmin.modifyTable(tableDescriptor.getTableName(), tableDescriptor);
}
private void majorCompact(HTableDescriptor tableDescriptor, byte[] family) throws Exception {
long before = System.currentTimeMillis();
testUtil.compact(tableDescriptor.getTableName(), true);
while (true) {
for (StoreFileInfo storeFileInfo : getStoreFiles(tableDescriptor, family)) {
LOG.error("Store file = {}, start time = {}, modification time = {}",
storeFileInfo.getPath().toUri(), before, storeFileInfo.getModificationTime());
if (storeFileInfo.getModificationTime() > before) {
return;
}
}
TimeUnit.MILLISECONDS.sleep(10);
}
}
private Set<StoreFileInfo> getStoreFiles(HTableDescriptor tableDescriptor, byte[] family) throws Exception {
Set<StoreFileInfo> storeFiles = new HashSet<>();
Path tableDir = FSUtils.getTableDir(testUtil.getDefaultRootDirPath(), tableDescriptor.getTableName());
for (HRegionInfo regionInfo : hBaseAdmin.getTableRegions(tableDescriptor.getTableName())) {
HRegionFileSystem hRegionFileSystem =
HRegionFileSystem.openRegionFromFileSystem(conf, testUtil.getTestFileSystem(), tableDir, regionInfo, true);
storeFiles.addAll(hRegionFileSystem.getStoreFiles(family));
}
return storeFiles;
}
}