blob: a03a5811433892b56b2fe29c53d84e57fc25a86a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.HBaseShims;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Guice;
import com.google.inject.Injector;
public class TestCompaction {
private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);
private static final String TEST_FAMILY = "test-fam";
private static final String TEST_QUALIFIER = "test-qual";
private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
private final byte[] data = Bytes.toBytes("testWrite-1");
private static final int MAX_VERSIONS = 3;
private Random randomGenerator;
private AbstractTransactionManager tm;
private Injector injector;
private Admin admin;
private Configuration hbaseConf;
private HBaseTestingUtility hbaseTestUtil;
private MiniHBaseCluster hbaseCluster;
private TSOServer tso;
private CommitTable commitTable;
private PostCommitActions syncPostCommitter;
private static Connection connection;
@BeforeClass
public void setupTestCompation() throws Exception {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setPort(1234);
tsoConfig.setConflictMapSize(1);
tsoConfig.setWaitStrategy("LOW_CPU");
injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
hbaseConf = injector.getInstance(Configuration.class);
HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
// settings required for #testDuplicateDeletes()
hbaseConf.setInt("hbase.hstore.compaction.min", 2);
hbaseConf.setInt("hbase.hstore.compaction.max", 2);
setupHBase();
connection = ConnectionFactory.createConnection(hbaseConf);
admin = connection.getAdmin();
createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
setupTSO();
commitTable = injector.getInstance(CommitTable.class);
}
private void setupHBase() throws Exception {
LOG.info("--------------------------------------------------------------------------------------------------");
LOG.info("Setting up HBase");
LOG.info("--------------------------------------------------------------------------------------------------");
hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
LOG.info("--------------------------------------------------------------------------------------------------");
LOG.info("Creating HBase MiniCluster");
LOG.info("--------------------------------------------------------------------------------------------------");
hbaseCluster = hbaseTestUtil.startMiniCluster(1);
}
private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
}
private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
if (!admin.tableExists(TableName.valueOf(tableName))) {
LOG.info("Creating {} table...", tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : families) {
HColumnDescriptor datafam = new HColumnDescriptor(family);
datafam.setMaxVersions(MAX_VERSIONS);
desc.addFamily(datafam);
}
desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
admin.createTable(desc);
for (byte[] family : families) {
CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
}
}
}
private void setupTSO() throws IOException, InterruptedException {
tso = injector.getInstance(TSOServer.class);
tso.startAsync();
tso.awaitRunning();
TestUtils.waitForSocketListening("localhost", 1234, 100);
Thread.currentThread().setName("UnitTest(s) thread");
}
@AfterClass
public void cleanupTestCompation() throws Exception {
teardownTSO();
hbaseCluster.shutdown();
}
private void teardownTSO() throws IOException, InterruptedException {
tso.stopAsync();
tso.awaitTerminated();
TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
}
@BeforeMethod
public void setupTestCompactionIndividualTest() throws Exception {
randomGenerator = new Random(0xfeedcafeL);
tm = spy((AbstractTransactionManager) newTransactionManager());
}
private TransactionManager newTransactionManager() throws Exception {
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
hbaseOmidClientConf.setConnectionString("localhost:1234");
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
CommitTable.Client commitTableClient = commitTable.getClient();
syncPostCommitter =
spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
return HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
.build();
}
@Test
public void testShadowCellsAboveLWMSurviveCompaction() throws Exception {
String TEST_TABLE = "testShadowCellsAboveLWMSurviveCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
byte[] rowId = Bytes.toBytes("row");
// Create 3 transactions modifying the same cell in a particular row
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowId);
put1.addColumn(fam, qual, Bytes.toBytes("testValue 1"));
txTable.put(tx1, put1);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put put2 = new Put(rowId);
put2.addColumn(fam, qual, Bytes.toBytes("testValue 2"));
txTable.put(tx2, put2);
tm.commit(tx2);
HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
Put put3 = new Put(rowId);
put3.addColumn(fam, qual, Bytes.toBytes("testValue 3"));
txTable.put(tx3, put3);
tm.commit(tx3);
// Before compaction, the three timestamped values for the cell should be there
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell of Tx1 should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell of Tx1 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put cell of Tx2 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put shadow cell of Tx2 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put cell of Tx3 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put shadow cell of Tx3 should be there");
// Compact
compactWithLWM(0, TEST_TABLE);
// After compaction, the three timestamped values for the cell should be there
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell of Tx1 should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell of Tx1 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put cell of Tx2 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put shadow cell of Tx2 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put cell of Tx3 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put shadow cell of Tx3 should be there");
}
@Test(timeOut = 60_000)
public void testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction() throws Throwable {
String TEST_TABLE = "testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
final int ROWS_TO_ADD = 5;
long fakeAssignedLowWatermark = 0L;
for (int i = 0; i < ROWS_TO_ADD; ++i) {
long rowId = randomGenerator.nextLong();
Transaction tx = tm.begin();
if (i == (ROWS_TO_ADD / 2)) {
fakeAssignedLowWatermark = tx.getTransactionId();
LOG.info("AssignedLowWatermark " + fakeAssignedLowWatermark);
}
Put put = new Put(Bytes.toBytes(rowId));
put.addColumn(fam, qual, data);
txTable.put(tx, put);
tm.commit(tx);
}
LOG.info("Flushing table {}", TEST_TABLE);
admin.flush(TableName.valueOf(TEST_TABLE));
// Return a LWM that triggers compaction & stays between 1 and the max start timestamp assigned to previous TXs
LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
.getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
CommitTable commitTable = injector.getInstance(CommitTable.class);
CommitTable.Client commitTableClient = spy(commitTable.getClient());
SettableFuture<Long> f = SettableFuture.create();
f.set(fakeAssignedLowWatermark);
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClient = commitTableClient;
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
LOG.info("Waking up after 3 secs");
// No rows should have been discarded after compacting
assertEquals(rowCount(TEST_TABLE, fam), ROWS_TO_ADD, "Rows in table after compacting should be " + ROWS_TO_ADD);
}
@Test(timeOut = 60_000)
public void testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction() throws Exception {
String TEST_TABLE = "testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
// The following line emulates a crash after commit that is observed in (*) below
doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
HBaseTransaction problematicTx = (HBaseTransaction) tm.begin();
long row = randomGenerator.nextLong();
// Test shadow cell are created properly
Put put = new Put(Bytes.toBytes(row));
put.addColumn(fam, qual, data);
txTable.put(problematicTx, put);
try {
tm.commit(problematicTx);
} catch (Exception e) { // (*) Crash
// Do nothing
}
assertTrue(CellUtils.hasCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
// Return a LWM that triggers compaction and has all the possible start timestamps below it
LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
.getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
CommitTable commitTable = injector.getInstance(CommitTable.class);
CommitTable.Client commitTableClient = spy(commitTable.getClient());
SettableFuture<Long> f = SettableFuture.create();
f.set(Long.MAX_VALUE);
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClient = commitTableClient;
LOG.info("Flushing table {}", TEST_TABLE);
admin.flush(TableName.valueOf(TEST_TABLE));
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
LOG.info("Waking up after 3 secs");
assertTrue(CellUtils.hasCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertTrue(CellUtils.hasShadowCell(Bytes.toBytes(row), fam, qual, problematicTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
}
@Test(timeOut = 60_000)
public void testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction()
throws Throwable {
String
TEST_TABLE =
"testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
// The KV in this transaction should be discarded
HBaseTransaction neverendingTxBelowLowWatermark = (HBaseTransaction) tm.begin();
long rowId = randomGenerator.nextLong();
Put put = new Put(Bytes.toBytes(rowId));
put.addColumn(fam, qual, data);
txTable.put(neverendingTxBelowLowWatermark, put);
assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
// The KV in this transaction should be added without the shadow cells
HBaseTransaction neverendingTxAboveLowWatermark = (HBaseTransaction) tm.begin();
long anotherRowId = randomGenerator.nextLong();
put = new Put(Bytes.toBytes(anotherRowId));
put.addColumn(fam, qual, data);
txTable.put(neverendingTxAboveLowWatermark, put);
assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table before flushing should be 2");
LOG.info("Flushing table {}", TEST_TABLE);
admin.flush(TableName.valueOf(TEST_TABLE));
assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table after flushing should be 2");
// Return a LWM that triggers compaction and stays between both ST of TXs, so assign 1st TX's start timestamp
LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
.getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
CommitTable commitTable = injector.getInstance(CommitTable.class);
CommitTable.Client commitTableClient = spy(commitTable.getClient());
SettableFuture<Long> f = SettableFuture.create();
f.set(neverendingTxBelowLowWatermark.getStartTimestamp());
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClient = commitTableClient;
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
LOG.info("Waking up after 3 secs");
// One row should have been discarded after compacting
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
// The row from the TX below the LWM should not be there (nor its Shadow Cell)
assertFalse(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should not be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
// The row from the TX above the LWM should be there without the Shadow Cell
assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
}
@Test(timeOut = 60_000)
public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReached";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
// The KV in this transaction should be discarded but in the end should remain there because
// the commit table won't be accessed (simulating an error on access)
HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
long rowId = randomGenerator.nextLong();
Put put = new Put(Bytes.toBytes(rowId));
put.addColumn(fam, qual, data);
txTable.put(neverendingTx, put);
assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
LOG.info("Flushing table {}", TEST_TABLE);
admin.flush(TableName.valueOf(TEST_TABLE));
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
// Break access to CommitTable functionality in Compactor
LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
.getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
CommitTable commitTable = injector.getInstance(CommitTable.class);
CommitTable.Client commitTableClient = spy(commitTable.getClient());
SettableFuture<Long> f = SettableFuture.create();
f.setException(new IOException("Unable to read"));
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClient = commitTableClient;
LOG.info("Compacting table {}", TEST_TABLE);
admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
LOG.info("Waking up after 3 secs");
// All rows should be there after the failed compaction
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should be there");
assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Shadow cell should not be there");
}
@Test(timeOut = 60_000)
public void testOriginalTableParametersAreAvoidedAlsoWhenCompacting() throws Throwable {
String TEST_TABLE = "testOriginalTableParametersAreAvoidedAlsoWhenCompacting";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
long rowId = randomGenerator.nextLong();
for (int versionCount = 0; versionCount <= (2 * MAX_VERSIONS); versionCount++) {
Transaction tx = tm.begin();
Put put = new Put(Bytes.toBytes(rowId));
put.addColumn(fam, qual, Bytes.toBytes("testWrite-" + versionCount));
txTable.put(tx, put);
tm.commit(tx);
}
Transaction tx = tm.begin();
Get get = new Get(Bytes.toBytes(rowId));
get.setMaxVersions(2 * MAX_VERSIONS);
assertEquals(get.getMaxVersions(), (2 * MAX_VERSIONS), "Max versions should be set to " + (2 * MAX_VERSIONS));
get.addColumn(fam, qual);
Result result = txTable.get(tx, get);
tm.commit(tx);
List<Cell> column = result.getColumnCells(fam, qual);
assertEquals(column.size(), 1, "There should be only one version in the result");
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table before flushing");
LOG.info("Flushing table {}", TEST_TABLE);
admin.flush(TableName.valueOf(TEST_TABLE));
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after flushing");
// Return a LWM that triggers compaction
compactEverything(TEST_TABLE);
// One row should have been discarded after compacting
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
tx = tm.begin();
get = new Get(Bytes.toBytes(rowId));
get.setMaxVersions(2 * MAX_VERSIONS);
assertEquals(get.getMaxVersions(), (2 * MAX_VERSIONS), "Max versions should be set to " + (2 * MAX_VERSIONS));
get.addColumn(fam, qual);
result = txTable.get(tx, get);
tm.commit(tx);
column = result.getColumnCells(fam, qual);
assertEquals(column.size(), 1, "There should be only one version in the result");
assertEquals(Bytes.toString(CellUtil.cloneValue(column.get(0))), "testWrite-" + (2 * MAX_VERSIONS),
"Values don't match");
}
// manually flush the regions on the region server.
// flushing like this prevents compaction running
// directly after the flush, which we want to avoid.
private void manualFlush(String tableName) throws Throwable {
LOG.info("Manually flushing all regions and waiting 2 secs");
HBaseShims.flushAllOnlineRegions(hbaseTestUtil.getHBaseCluster().getRegionServer(0),
TableName.valueOf(tableName));
TimeUnit.SECONDS.sleep(2);
}
@Test(timeOut = 60_000)
public void testOldCellsAreDiscardedAfterCompaction() throws Exception {
String TEST_TABLE = "testOldCellsAreDiscardedAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
byte[] rowId = Bytes.toBytes("row");
// Create 3 transactions modifying the same cell in a particular row
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowId);
put1.addColumn(fam, qual, Bytes.toBytes("testValue 1"));
txTable.put(tx1, put1);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put put2 = new Put(rowId);
put2.addColumn(fam, qual, Bytes.toBytes("testValue 2"));
txTable.put(tx2, put2);
tm.commit(tx2);
HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
Put put3 = new Put(rowId);
put3.addColumn(fam, qual, Bytes.toBytes("testValue 3"));
txTable.put(tx3, put3);
tm.commit(tx3);
// Before compaction, the three timestamped values for the cell should be there
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell of Tx1 should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell of Tx1 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put cell of Tx2 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put shadow cell of Tx2 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put cell of Tx3 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put shadow cell of Tx3 should be there");
// Compact
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
// After compaction, only the last value for the cell should have survived
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell of Tx1 should not be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell of Tx1 should not be there");
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put cell of Tx2 should not be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put shadow cell of Tx2 should not be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put cell of Tx3 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put shadow cell of Tx3 should be there");
// A new transaction after compaction should read the last value written
HBaseTransaction newTx1 = (HBaseTransaction) tm.begin();
Get newGet1 = new Get(rowId);
newGet1.addColumn(fam, qual);
Result result = txTable.get(newTx1, newGet1);
assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
// Write a new value
Put newPut1 = new Put(rowId);
newPut1.addColumn(fam, qual, Bytes.toBytes("new testValue 1"));
txTable.put(newTx1, newPut1);
// Start a second new transaction
HBaseTransaction newTx2 = (HBaseTransaction) tm.begin();
// Commit first of the new tx
tm.commit(newTx1);
// The second transaction should still read the previous value
Get newGet2 = new Get(rowId);
newGet2.addColumn(fam, qual);
result = txTable.get(newTx2, newGet2);
assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
tm.commit(newTx2);
// Only two values -the new written by newTx1 and the last value
// for the cell after compaction- should have survived
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell of Tx1 should not be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell of Tx1 should not be there");
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put cell of Tx2 should not be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put shadow cell of Tx2 should not be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put cell of Tx3 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
"Put shadow cell of Tx3 should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, newTx1.getStartTimestamp(), getter),
"Put cell of NewTx1 cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, newTx1.getStartTimestamp(), getter),
"Put shadow cell of NewTx1 should be there");
}
/**
* Tests a case where a temporary failure to flush causes the compactor to crash
*/
@Test(timeOut = 60_000)
public void testDuplicateDeletes() throws Throwable {
String TEST_TABLE = "testDuplicateDeletes";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
// jump through hoops to trigger a minor compaction.
// a minor compaction will only run if there are enough
// files to be compacted, but that is less than the number
// of total files, in which case it will run a major
// compaction. The issue this is testing only shows up
// with minor compaction, as only Deletes can be duplicate
// and major compactions filter them out.
byte[] firstRow = "FirstRow".getBytes();
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
Put put0 = new Put(firstRow);
put0.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx0, put0);
tm.commit(tx0);
// create the first hfile
manualFlush(TEST_TABLE);
// write a row, it won't be committed
byte[] rowToBeCompactedAway = "compactMe".getBytes();
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowToBeCompactedAway);
put1.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx1, put1);
txTable.flushCommits();
// write a row to trigger the double delete problem
byte[] row = "iCauseErrors".getBytes();
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put put2 = new Put(row);
put2.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx2, put2);
tm.commit(tx2);
HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
Put put3 = new Put(row);
put3.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx3, put3);
txTable.flushCommits();
// cause a failure on HBaseTM#preCommit();
Set<HBaseCellId> writeSet = tx3.getWriteSet();
assertEquals(1, writeSet.size());
List<HBaseCellId> newWriteSet = new ArrayList<>();
final AtomicBoolean flushFailing = new AtomicBoolean(true);
for (HBaseCellId id : writeSet) {
TTable failableHTable = spy(id.getTable());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation)
throws Throwable {
if (flushFailing.get()) {
throw new RetriesExhaustedWithDetailsException(new ArrayList<Throwable>(),
new ArrayList<Row>(), new ArrayList<String>());
} else {
invocation.callRealMethod();
}
return null;
}
}).when(failableHTable).flushCommits();
newWriteSet.add(new HBaseCellId(failableHTable,
id.getRow(), id.getFamily(),
id.getQualifier(), id.getTimestamp()));
}
writeSet.clear();
writeSet.addAll(newWriteSet);
try {
tm.commit(tx3);
fail("Shouldn't succeed");
} catch (TransactionException tme) {
flushFailing.set(false);
tm.rollback(tx3);
}
// create second hfile,
// it should contain multiple deletes
manualFlush(TEST_TABLE);
// create loads of files
byte[] anotherRow = "someotherrow".getBytes();
HBaseTransaction tx4 = (HBaseTransaction) tm.begin();
Put put4 = new Put(anotherRow);
put4.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx4, put4);
tm.commit(tx4);
// create third hfile
manualFlush(TEST_TABLE);
// trigger minor compaction and give it time to run
setCompactorLWM(tx4.getStartTimestamp(), TEST_TABLE);
admin.compact(TableName.valueOf(TEST_TABLE));
Thread.sleep(3000);
// check if the cell that should be compacted, is compacted
assertFalse(CellUtils.hasCell(rowToBeCompactedAway, fam, qual, tx1.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
"Cell should not be be there");
}
@Test(timeOut = 60_000)
public void testNonOmidCFIsUntouched() throws Throwable {
String TEST_TABLE = "testNonOmidCFIsUntouched";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
admin.disableTable(TableName.valueOf(TEST_TABLE));
byte[] nonOmidCF = Bytes.toBytes("nonOmidCF");
byte[] nonOmidQual = Bytes.toBytes("nonOmidCol");
HColumnDescriptor nonomidfam = new HColumnDescriptor(nonOmidCF);
nonomidfam.setMaxVersions(MAX_VERSIONS);
admin.addColumn(TableName.valueOf(TEST_TABLE), nonomidfam);
admin.enableTable(TableName.valueOf(TEST_TABLE));
byte[] rowId = Bytes.toBytes("testRow");
Transaction tx = tm.begin();
Put put = new Put(rowId);
put.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx, put);
Put nonTxPut = new Put(rowId);
nonTxPut.addColumn(nonOmidCF, nonOmidQual, Bytes.toBytes("nonTxVal"));
txTable.getHTable().put(nonTxPut);
txTable.flushCommits(); // to make sure it left the client
Get g = new Get(rowId);
Result result = txTable.getHTable().get(g);
assertEquals(result.getColumnCells(nonOmidCF, nonOmidQual).size(), 1, "Should be there, precompact");
assertEquals(result.getColumnCells(fam, qual).size(), 1, "Should be there, precompact");
compactEverything(TEST_TABLE);
result = txTable.getHTable().get(g);
assertEquals(result.getColumnCells(nonOmidCF, nonOmidQual).size(), 1, "Should be there, postcompact");
assertEquals(result.getColumnCells(fam, qual).size(), 0, "Should not be there, postcompact");
}
// ----------------------------------------------------------------------------------------------------------------
// Tests on tombstones and non-transactional Deletes
// ----------------------------------------------------------------------------------------------------------------
/**
* Test that when a major compaction runs, cells that were deleted non-transactionally dissapear
*/
@Test(timeOut = 60_000)
public void testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs() throws Throwable {
String TEST_TABLE = "testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
Table table = txTable.getHTable();
// Write first a value transactionally
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("row1");
Put p0 = new Put(rowId);
p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
txTable.put(tx0, p0);
tm.commit(tx0);
// Then perform a non-transactional Delete
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
table.delete(d);
// Trigger a major compaction
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
// Then perform a non-tx (raw) scan...
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scannerResults = table.getScanner(scan);
// ...and test the deleted cell is not there anymore
assertNull(scannerResults.next(), "There should be no results in scan results");
table.close();
}
/**
* Test that when a minor compaction runs, cells that were deleted non-transactionally are preserved. This is to
* allow users still access the cells when doing "improper" operations on a transactional table
*/
@Test(timeOut = 60_000)
public void testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs() throws Throwable {
String TEST_TABLE = "testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
Table table = txTable.getHTable();
// Configure the environment to create a minor compaction
// Write first a value transactionally
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("row1");
Put p0 = new Put(rowId);
p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
txTable.put(tx0, p0);
tm.commit(tx0);
// create the first hfile
manualFlush(TEST_TABLE);
// Write another value transactionally
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put p1 = new Put(rowId);
p1.addColumn(fam, qual, Bytes.toBytes("testValue-1"));
txTable.put(tx1, p1);
tm.commit(tx1);
// create the second hfile
manualFlush(TEST_TABLE);
// Write yet another value transactionally
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put p2 = new Put(rowId);
p2.addColumn(fam, qual, Bytes.toBytes("testValue-2"));
txTable.put(tx2, p2);
tm.commit(tx2);
// create a third hfile
manualFlush(TEST_TABLE);
// Then perform a non-transactional Delete
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
table.delete(d);
// create the fourth hfile
manualFlush(TEST_TABLE);
// Trigger the minor compaction
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
admin.compact(TableName.valueOf(TEST_TABLE));
Thread.sleep(5000);
// Then perform a non-tx (raw) scan...
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scannerResults = table.getScanner(scan);
// ...and test the deleted cell is still there
int count = 0;
Result scanResult;
List<Cell> listOfCellsScanned = new ArrayList<>();
while ((scanResult = scannerResults.next()) != null) {
listOfCellsScanned = scanResult.listCells(); // equivalent to rawCells()
count++;
}
assertEquals(count, 1, "There should be only one result in scan results");
assertEquals(listOfCellsScanned.size(), 3, "There should be 3 cell entries in scan results (2 puts, 1 del)");
boolean wasDeletedCellFound = false;
int numberOfDeletedCellsFound = 0;
for (Cell cell : listOfCellsScanned) {
if (CellUtil.isDelete(cell)) {
wasDeletedCellFound = true;
numberOfDeletedCellsFound++;
}
}
assertTrue(wasDeletedCellFound, "We should have found a non-transactionally deleted cell");
assertEquals(numberOfDeletedCellsFound, 1, "There should be only only one deleted cell");
table.close();
}
/**
* Test that when a minor compaction runs, tombstones are not cleaned up
*/
@Test(timeOut = 60_000)
public void testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs() throws Throwable {
String TEST_TABLE = "testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
// Configure the environment to create a minor compaction
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case1");
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
txTable.put(tx0, p);
tm.commit(tx0);
// create the first hfile
manualFlush(TEST_TABLE);
// Create the tombstone
HBaseTransaction deleteTx = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(deleteTx, d);
tm.commit(deleteTx);
// create the second hfile
manualFlush(TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put p1 = new Put(rowId);
p1.addColumn(fam, qual, Bytes.toBytes("testValue-11"));
txTable.put(tx1, p1);
tm.commit(tx1);
// create the third hfile
manualFlush(TEST_TABLE);
HBaseTransaction lastTx = (HBaseTransaction) tm.begin();
Put p2 = new Put(rowId);
p2.addColumn(fam, qual, Bytes.toBytes("testValue-222"));
txTable.put(lastTx, p2);
tm.commit(lastTx);
// Trigger the minor compaction
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
admin.compact(TableName.valueOf(TEST_TABLE));
Thread.sleep(5000);
// Checks on results after compaction
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx0.getStartTimestamp(), getter), "Put cell should be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx0.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter), "Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, deleteTx.getStartTimestamp(), getter),
"Delete cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, deleteTx.getStartTimestamp(), getter),
"Delete shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, lastTx.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, lastTx.getStartTimestamp(), getter),
"Put shadow cell should be there");
}
/**
* Test that when compaction runs, tombstones are cleaned up case1: 1 put (ts < lwm) then tombstone (ts > lwm)
*/
@Test(timeOut = 60_000)
public void testTombstonesAreCleanedUpCase1() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase1";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case1");
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(tx2, d);
tm.commit(tx2);
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete shadow cell should be there");
}
/**
* Test that when compaction runs, tombstones are cleaned up case2: 1 put (ts < lwm) then tombstone (ts < lwm)
*/
@Test(timeOut = 60_000)
public void testTombstonesAreCleanedUpCase2() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase2";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case2");
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(tx2, d);
tm.commit(tx2);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
}
//Cell level conflict detection
@Test(timeOut = 60_000)
public void testFamiliyDeleteTombstonesAreCleanedUpCellCF() throws Exception {
String TEST_TABLE = "testFamiliyDeleteTombstonesAreCleanedUpCellCF";
byte[] fam2 = Bytes.toBytes("2");
byte[] fam3 = Bytes.toBytes("3");
byte[] fam4 = Bytes.toBytes("4");
createTableIfNotExists(TEST_TABLE, fam2, fam3, fam4);
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case2");
byte[] qual2 = Bytes.toBytes("qual2");
Put p = new Put(rowId);
p.addColumn(fam2, qual, Bytes.toBytes("testValue"));
p.addColumn(fam2, qual2 , Bytes.toBytes("testValue"));
p.addColumn(fam3, qual, Bytes.toBytes("testValue"));
p.addColumn(fam3, qual2 , Bytes.toBytes("testValue"));
p.addColumn(fam4, qual, Bytes.toBytes("testValue"));
p.addColumn(fam4, qual2 , Bytes.toBytes("testValue"));
txTable.put(tx1, p);
byte[] rowId2 = Bytes.toBytes("case22");
Put p2 = new Put(rowId2);
p2.addColumn(fam3, qual, Bytes.toBytes("testValue2"));
txTable.put(tx1, p2);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addFamily(fam3);
txTable.delete(tx2, d);
tm.commit(tx2);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam3, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete shadow cell should be there");
//Do major compaction
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
assertTrue(CellUtils.hasCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertFalse(CellUtils.hasCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertFalse(CellUtils.hasCell(rowId, fam3, qual2, tx1.getStartTimestamp(), getter),
"Put cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam3, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertFalse(CellUtils.hasCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
}
//Row level conflict detection
@Test(timeOut = 60_000)
public void testFamiliyDeleteTombstonesAreCleanedUpRowCF() throws Exception {
String TEST_TABLE = "testFamiliyDeleteTombstonesAreCleanedUpRowCF";
((HBaseTransactionManager) tm).setConflictDetectionLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
byte[] fam2 = Bytes.toBytes("2");
byte[] fam3 = Bytes.toBytes("3");
byte[] fam4 = Bytes.toBytes("4");
createTableIfNotExists(TEST_TABLE, fam2, fam3, fam4);
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case2");
byte[] qual2 = Bytes.toBytes("qual2");
Put p = new Put(rowId);
p.addColumn(fam2, qual, Bytes.toBytes("testValue"));
p.addColumn(fam2, qual2 , Bytes.toBytes("testValue"));
p.addColumn(fam3, qual, Bytes.toBytes("testValue"));
p.addColumn(fam3, qual2 , Bytes.toBytes("testValue"));
p.addColumn(fam4, qual, Bytes.toBytes("testValue"));
p.addColumn(fam4, qual2 , Bytes.toBytes("testValue"));
txTable.put(tx1, p);
byte[] rowId2 = Bytes.toBytes("case22");
Put p2 = new Put(rowId2);
p2.addColumn(fam3, qual, Bytes.toBytes("testValue2"));
txTable.put(tx1, p2);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addFamily(fam3);
txTable.delete(tx2, d);
tm.commit(tx2);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam3, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete shadow cell should be there");
//Do major compaction
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
assertTrue(CellUtils.hasCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam2, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertFalse(CellUtils.hasCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam3, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertFalse(CellUtils.hasCell(rowId, fam3, qual2, tx1.getStartTimestamp(), getter),
"Put cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam3, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertTrue(CellUtils.hasCell(rowId, fam4, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam4, qual2, tx1.getStartTimestamp(), getter),
"Put shadow cell should be there");
assertFalse(CellUtils.hasCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam3, CellUtils.FAMILY_DELETE_QUALIFIER, tx2.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
}
/**
* Test that when compaction runs, tombstones are cleaned up case3: 1 put (ts < lwm) then tombstone (ts < lwm) not
* committed
*/
@Test(timeOut = 60_000)
public void testTombstonesAreCleanedUpCase3() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase3";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case3");
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(tx2, d);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
}
/**
* Test that when compaction runs, tombstones are cleaned up case4: 1 put (ts < lwm) then tombstone (ts > lwm) not
* committed
*/
@Test(timeOut = 60_000)
public void testTombstonesAreCleanedUpCase4() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase4";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case4");
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(tx2, d);
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual,tx2.getStartTimestamp(), getter),
"Delete cell should be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
}
/**
* Test that when compaction runs, tombstones are cleaned up case5: tombstone (ts < lwm)
*/
@Test(timeOut = 60_000)
public void testTombstonesAreCleanedUpCase5() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase5";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case5");
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(tx1, d);
tm.commit(tx1);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Delete cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
}
/**
* Test that when compaction runs, tombstones are cleaned up case6: tombstone (ts < lwm), then put (ts < lwm)
*/
@Test(timeOut = 60_000)
public void testTombstonesAreCleanedUpCase6() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase6";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
byte[] rowId = Bytes.toBytes("case6");
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
d.addColumn(fam, qual);
txTable.delete(tx1, d);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx2, p);
tm.commit(tx2);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
assertFalse(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Delete cell shouldn't be there");
assertFalse(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
"Delete shadow cell shouldn't be there");
assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put cell should be there");
assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
"Put shadow cell shouldn't be there");
}
@Test(timeOut = 60_000)
public void testCommitTableNoInvalidation() throws Exception {
String TEST_TABLE = "testCommitTableInvalidation";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable txTable = new TTable(connection, TEST_TABLE);
byte[] rowId = Bytes.toBytes("row");
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put p = new Put(rowId);
p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
try {
//give compaction time to invalidate
Thread.sleep(1000);
tm.commit(tx1);
} catch (RollbackException e) {
fail(" Should have not been invalidated");
}
}
private void setCompactorLWM(long lwm, String tableName) throws Exception {
OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
.getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
CommitTable commitTable = injector.getInstance(CommitTable.class);
CommitTable.Client commitTableClient = spy(commitTable.getClient());
SettableFuture<Long> f = SettableFuture.create();
f.set(lwm);
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClient = commitTableClient;
}
private void compactEverything(String tableName) throws Exception {
compactWithLWM(Long.MAX_VALUE, tableName);
}
private void compactWithLWM(long lwm, String tableName) throws Exception {
admin.flush(TableName.valueOf(tableName));
LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
setCompactorLWM(lwm, tableName);
LOG.info("Compacting table {}", tableName);
admin.majorCompact(TableName.valueOf(tableName));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
LOG.info("Waking up after 3 secs");
}
private static long rowCount(String tableName, byte[] family) throws Throwable {
Scan scan = new Scan();
scan.addFamily(family);
Table table = connection.getTable(TableName.valueOf(tableName));
try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
while (scanner.next() != null) {
count++;
}
return count;
}
}
}