blob: 2cfc77e09c12c2998ef69d5d309989ef4b2d7d1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.fail;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Guice;
import com.google.inject.Injector;
public class TestSnapshotFilter {
private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilter.class);
private static final String TEST_FAMILY = "test-fam";
private static final int MAX_VERSIONS = 3;
private AbstractTransactionManager tm;
private Injector injector;
private Admin admin;
private Configuration hbaseConf;
private HBaseTestingUtility hbaseTestUtil;
private MiniHBaseCluster hbaseCluster;
private TSOServer tso;
private CommitTable commitTable;
private PostCommitActions syncPostCommitter;
private Connection connection;
@BeforeClass
public void setupTestSnapshotFilter() throws Exception {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setPort(5679);
tsoConfig.setConflictMapSize(1);
tsoConfig.setWaitStrategy("LOW_CPU");
injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
hbaseConf = injector.getInstance(Configuration.class);
hbaseConf.setBoolean("omid.server.side.filter", true);
hbaseConf.setInt("hbase.hconnection.threads.core", 5);
hbaseConf.setInt("hbase.hconnection.threads.max", 10);
// Tunn down handler threads in regionserver
hbaseConf.setInt("hbase.regionserver.handler.count", 10);
// Set to random port
hbaseConf.setInt("hbase.master.port", 0);
hbaseConf.setInt("hbase.master.info.port", 0);
hbaseConf.setInt("hbase.regionserver.port", 0);
hbaseConf.setInt("hbase.regionserver.info.port", 0);
HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
setupHBase();
connection = ConnectionFactory.createConnection(hbaseConf);
admin = connection.getAdmin();
createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
setupTSO();
commitTable = injector.getInstance(CommitTable.class);
}
private void setupHBase() throws Exception {
LOG.info("--------------------------------------------------------------------------------------------------");
LOG.info("Setting up HBase");
LOG.info("--------------------------------------------------------------------------------------------------");
hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
LOG.info("--------------------------------------------------------------------------------------------------");
LOG.info("Creating HBase MiniCluster");
LOG.info("--------------------------------------------------------------------------------------------------");
hbaseCluster = hbaseTestUtil.startMiniCluster(1);
}
private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
}
private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
if (!admin.tableExists(TableName.valueOf(tableName))) {
LOG.info("Creating {} table...", tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : families) {
HColumnDescriptor datafam = new HColumnDescriptor(family);
datafam.setMaxVersions(MAX_VERSIONS);
desc.addFamily(datafam);
}
int priority = Coprocessor.PRIORITY_HIGHEST;
desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
admin.createTable(desc);
try {
hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void setupTSO() throws IOException, InterruptedException {
tso = injector.getInstance(TSOServer.class);
tso.startAndWait();
TestUtils.waitForSocketListening("localhost", 5679, 100);
Thread.currentThread().setName("UnitTest(s) thread");
}
@AfterClass
public void cleanupTestSnapshotFilter() throws Exception {
teardownTSO();
hbaseCluster.shutdown();
}
private void teardownTSO() throws IOException, InterruptedException {
tso.stopAndWait();
TestUtils.waitForSocketNotListening("localhost", 5679, 1000);
}
@BeforeMethod
public void setupTestSnapshotFilterIndividualTest() throws Exception {
tm = spy((AbstractTransactionManager) newTransactionManager());
}
private TransactionManager newTransactionManager() throws Exception {
HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
hbaseOmidClientConf.setConnectionString("localhost:5679");
hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
CommitTable.Client commitTableClient = commitTable.getClient();
syncPostCommitter =
spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
return HBaseTransactionManager.builder(hbaseOmidClientConf)
.postCommitter(syncPostCommitter)
.commitTableClient(commitTableClient)
.build();
}
@Test(timeOut = 60_000)
public void testGetFirstResult() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testGetFirstResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Get get = new Get(rowName1);
Result result = tt.get(tx2, get);
assertTrue(!result.isEmpty(), "Result should not be empty!");
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
tm.commit(tx2);
Transaction tx3 = tm.begin();
Put put3 = new Put(rowName1);
put3.addColumn(famName1, colName1, dataValue1);
tt.put(tx3, put3);
tm.commit(tx3);
Transaction tx4 = tm.begin();
Get get2 = new Get(rowName1);
Result result2 = tt.get(tx4, get2);
assertTrue(!result2.isEmpty(), "Result should not be empty!");
long tsRow2 = result2.rawCells()[0].getTimestamp();
assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version");
tm.commit(tx4);
tt.close();
}
// This test will fail if filtering is done before snapshot filtering
@Test(timeOut = 60_000)
public void testServerSideSnapshotFiltering() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
byte[] dataValue2 = Bytes.toBytes("testWrite-2");
String TEST_TABLE = "testServerSideSnapshotFiltering";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName1, colName1, dataValue2);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
Get get = new Get(rowName1);
// If snapshot filtering is not done in the server then the first value is
// "testWrite-2" and the whole row will be filtered out.
SingleColumnValueFilter filter = new SingleColumnValueFilter(
famName1,
colName1,
CompareFilter.CompareOp.EQUAL,
new SubstringComparator("testWrite-1"));
get.setFilter(filter);
Result results = tt.get(tx3, get);
assertTrue(results.size() == 1);
}
// This test will fail if filtering is done before snapshot filtering
@Test(timeOut = 60_000)
public void testServerSideSnapshotScannerFiltering() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
byte[] dataValue2 = Bytes.toBytes("testWrite-2");
String TEST_TABLE = "testServerSideSnapshotFiltering";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName1, colName1, dataValue2);
// tt.put(tx2, put2);
Transaction tx3 = tm.begin();
// If snapshot filtering is not done in the server then the first value is
// "testWrite-2" and the whole row will be filtered out.
SingleColumnValueFilter filter = new SingleColumnValueFilter(
famName1,
colName1,
CompareFilter.CompareOp.EQUAL,
new SubstringComparator("testWrite-1"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner iterableRS = tt.getScanner(tx3, scan);
Result result = iterableRS.next();
assertTrue(result.size() == 1);
}
@Test(timeOut = 60_000)
public void testGetWithFamilyDelete() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] famName2 = Bytes.toBytes("test-fam2");
byte[] colName1 = Bytes.toBytes("col1");
byte[] colName2 = Bytes.toBytes("col2");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testGetWithFamilyDelete";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName2, colName2, dataValue1);
tt.put(tx2, put2);
tm.commit(tx2);
Transaction tx3 = tm.begin();
Delete d = new Delete(rowName1);
d.addFamily(famName2);
tt.delete(tx3, d);
Transaction tx4 = tm.begin();
Get get = new Get(rowName1);
Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
get.setFilter(filter1);
Result result = tt.get(tx4, get);
assertTrue(result.size() == 2, "Result should be 2");
try {
tm.commit(tx3);
} catch (RollbackException e) {
if (!tm.isLowLatency())
fail();
}
Transaction tx5 = tm.begin();
result = tt.get(tx5, get);
if (!tm.isLowLatency())
assertTrue(result.size() == 1, "Result should be 1");
tt.close();
}
@Test(timeOut = 60_000)
public void testReadFromCommitTable() throws Exception {
final byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
final String TEST_TABLE = "testReadFromCommitTable";
final byte[] famName2 = Bytes.toBytes("test-fam2");
final CountDownLatch readAfterCommit = new CountDownLatch(1);
final CountDownLatch postCommitBegin = new CountDownLatch(1);
final AtomicBoolean readFailed = new AtomicBoolean(false);
final AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager();
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
doAnswer(new Answer<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Releasing readAfterCommit barrier");
readAfterCommit.countDown();
LOG.info("Waiting postCommitBegin barrier");
postCommitBegin.await();
ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
return result;
}
}).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
Thread readThread = new Thread("Read Thread") {
@Override
public void run() {
try {
LOG.info("Waiting readAfterCommit barrier");
readAfterCommit.await();
Transaction tx4 = tm.begin();
TTable tt = new TTable(connection, TEST_TABLE);
Get get = new Get(rowName1);
Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
get.setFilter(filter1);
Result result = tt.get(tx4, get);
if (result.size() == 2) {
readFailed.set(false);
}
else {
readFailed.set(false);
}
postCommitBegin.countDown();
} catch (Throwable e) {
readFailed.set(false);
LOG.error("Error whilst reading", e);
}
}
};
readThread.start();
TTable table = new TTable(connection, TEST_TABLE);
final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
table.put(t1, put1);
tm.commit(t1);
readThread.join();
assertFalse(readFailed.get(), "Read should have succeeded");
}
@Test(timeOut = 60_000)
public void testGetWithFilter() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] famName2 = Bytes.toBytes("test-fam2");
byte[] colName1 = Bytes.toBytes("col1");
byte[] colName2 = Bytes.toBytes("col2");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testGetWithFilter";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName2, colName2, dataValue1);
tt.put(tx2, put2);
tm.commit(tx2);
Transaction tx3 = tm.begin();
Get get = new Get(rowName1);
Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
get.setFilter(filter1);
Result result = tt.get(tx3, get);
assertTrue(result.size() == 2, "Result should be 2");
Filter filter2 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))));
get.setFilter(filter2);
result = tt.get(tx3, get);
assertTrue(result.size() == 1, "Result should be 2");
tm.commit(tx3);
tt.close();
}
@Test(timeOut = 60_000)
public void testGetSecondResult() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testGetSecondResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName1, colName1, dataValue1);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
Get get = new Get(rowName1);
Result result = tt.get(tx3, get);
assertTrue(!result.isEmpty(), "Result should not be empty!");
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
tm.commit(tx3);
tt.close();
}
@Test(timeOut = 60_000)
public void testScanFirstResult() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testScanFirstResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1));
Result result = iterableRS.next();
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
assertFalse(iterableRS.next() != null);
tm.commit(tx2);
Transaction tx3 = tm.begin();
Put put3 = new Put(rowName1);
put3.addColumn(famName1, colName1, dataValue1);
tt.put(tx3, put3);
tm.commit(tx3);
Transaction tx4 = tm.begin();
ResultScanner iterableRS2 = tt.getScanner(tx4, new Scan().setStartRow(rowName1).setStopRow(rowName1));
Result result2 = iterableRS2.next();
long tsRow2 = result2.rawCells()[0].getTimestamp();
assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version");
assertFalse(iterableRS2.next() != null);
tm.commit(tx4);
tt.close();
}
@Test(timeOut = 60_000)
public void testScanWithFilter() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] famName2 = Bytes.toBytes("test-fam2");
byte[] colName1 = Bytes.toBytes("col1");
byte[] colName2 = Bytes.toBytes("col2");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testScanWithFilter";
createTableIfNotExists(TEST_TABLE, famName1, famName2);
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName2, colName2, dataValue1);
tt.put(tx2, put2);
tm.commit(tx2);
Transaction tx3 = tm.begin();
Scan scan = new Scan();
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE,
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY)))));
scan.setStartRow(rowName1).setStopRow(rowName1);
ResultScanner iterableRS = tt.getScanner(tx3, scan);
Result result = iterableRS.next();
assertTrue(result.containsColumn(famName1, colName1));
assertFalse(result.containsColumn(famName2, colName2));
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE,
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2))));
iterableRS = tt.getScanner(tx3, scan);
result = iterableRS.next();
assertTrue(result.containsColumn(famName1, colName1));
assertTrue(result.containsColumn(famName2, colName2));
tm.commit(tx3);
tt.close();
}
@Test(timeOut = 60_000)
public void testScanSecondResult() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
String TEST_TABLE = "testScanSecondResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName1, colName1, dataValue1);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName1));
Result result = iterableRS.next();
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
assertFalse(iterableRS.next() != null);
tm.commit(tx3);
tt.close();
}
@Test (timeOut = 60_000)
public void testScanFewResults() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] rowName2 = Bytes.toBytes("row2");
byte[] rowName3 = Bytes.toBytes("row3");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] colName2 = Bytes.toBytes("col2");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
byte[] dataValue2 = Bytes.toBytes("testWrite-2");
String TEST_TABLE = "testScanFewResults";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName2);
put2.addColumn(famName, colName2, dataValue2);
tt.put(tx2, put2);
tm.commit(tx2);
Transaction tx3 = tm.begin();
ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
Result result = iterableRS.next();
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
result = iterableRS.next();
tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version");
assertFalse(iterableRS.next() != null);
tm.commit(tx3);
tt.close();
}
@Test (timeOut = 60_000)
public void testScanFewResultsDifferentTransaction() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] rowName2 = Bytes.toBytes("row2");
byte[] rowName3 = Bytes.toBytes("row3");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] colName2 = Bytes.toBytes("col2");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
byte[] dataValue2 = Bytes.toBytes("testWrite-2");
String TEST_TABLE = "testScanFewResultsDifferentTransaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName, colName1, dataValue1);
tt.put(tx1, put1);
Put put2 = new Put(rowName2);
put2.addColumn(famName, colName2, dataValue2);
tt.put(tx1, put2);
tm.commit(tx1);
Transaction tx2 = tm.begin();
put2 = new Put(rowName2);
put2.addColumn(famName, colName2, dataValue2);
tt.put(tx2, put2);
tm.commit(tx2);
Transaction tx3 = tm.begin();
ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
Result result = iterableRS.next();
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
result = iterableRS.next();
tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version");
assertFalse(iterableRS.next() != null);
tm.commit(tx3);
tt.close();
}
@Test (timeOut = 60_000)
public void testScanFewResultsSameTransaction() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] rowName2 = Bytes.toBytes("row2");
byte[] rowName3 = Bytes.toBytes("row3");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] colName2 = Bytes.toBytes("col2");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
byte[] dataValue2 = Bytes.toBytes("testWrite-2");
String TEST_TABLE = "testScanFewResultsSameTransaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName, colName1, dataValue1);
tt.put(tx1, put1);
Put put2 = new Put(rowName2);
put2.addColumn(famName, colName2, dataValue2);
tt.put(tx1, put2);
tm.commit(tx1);
Transaction tx2 = tm.begin();
put2 = new Put(rowName2);
put2.addColumn(famName, colName2, dataValue2);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
Result result = iterableRS.next();
long tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
result = iterableRS.next();
tsRow = result.rawCells()[0].getTimestamp();
assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
assertFalse(iterableRS.next() != null);
tm.commit(tx3);
tt.close();
}
@Test (timeOut = 60_000)
public void testFilterCommitCacheInSnapshot() throws Throwable {
String TEST_TABLE = "testFilterCommitCacheInSnapshot";
byte[] rowName = Bytes.toBytes("row1");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
createTableIfNotExists(TEST_TABLE, famName);
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put = new Put(rowName);
for (int i = 0; i < 200; ++i) {
byte[] dataValue1 = Bytes.toBytes("some data");
byte[] colName = Bytes.toBytes("col" + i);
put.addColumn(famName, colName, dataValue1);
}
tt.put(tx1, put);
tm.commit(tx1);
Transaction tx3 = tm.begin();
Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
tm.getCommitTableClient()));
Filter newFilter = TransactionFilters.getVisibilityFilter(null,
snapshotFilter, (HBaseTransaction) tx3);
Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
Scan scan = new Scan();
ResultScanner scanner = rawTable.getScanner(scan);
for(Result row: scanner) {
for(Cell cell: row.rawCells()) {
newFilter.filterKeyValue(cell);
}
}
verify(snapshotFilter, Mockito.times(0))
.getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
tm.commit(tx3);
tt.close();
}
@Test (timeOut = 60_000)
public void testFilterCommitCacheNotInSnapshot() throws Throwable {
String TEST_TABLE = "testFilterCommitCacheNotInSnapshot";
byte[] rowName = Bytes.toBytes("row1");
byte[] famName = Bytes.toBytes(TEST_FAMILY);
createTableIfNotExists(TEST_TABLE, famName);
TTable tt = new TTable(connection, TEST_TABLE);
//add some uncommitted values
Transaction tx1 = tm.begin();
Put put = new Put(rowName);
for (int i = 0; i < 200; ++i) {
byte[] dataValue1 = Bytes.toBytes("some data");
byte[] colName = Bytes.toBytes("col" + i);
put.addColumn(famName, colName, dataValue1);
}
tt.put(tx1, put);
//try to scan from tx
Transaction tx = tm.begin();
Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
tm.getCommitTableClient()));
Filter newFilter = TransactionFilters.getVisibilityFilter(null,
snapshotFilter, (HBaseTransaction) tx);
Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
Scan scan = new Scan();
ResultScanner scanner = rawTable.getScanner(scan);
for(Result row: scanner) {
for(Cell cell: row.rawCells()) {
newFilter.filterKeyValue(cell);
}
}
verify(snapshotFilter, Mockito.times(1))
.getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
tt.close();
}
}