blob: e680f99f164192b34c657f3cf601e5731f53e5e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.DEFAULT_COMMIT_TABLE_CF_NAME;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.hbase.KeyGenerator;
import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.omid.tso.client.TSOClient;
import org.testng.ITestContext;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.omid.TestUtils;
import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
import org.apache.omid.tools.hbase.OmidTableManager;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
public class TestOmidLLRaces {
static HBaseTestingUtility hBaseUtils;
private static MiniHBaseCluster hbaseCluster;
static Configuration hbaseConf;
static Connection connection;
private static final String TEST_FAMILY = "data";
static final String TEST_FAMILY2 = "data2";
private static final String TEST_TABLE = "test";
private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
private static final byte[] family = Bytes.toBytes("data");
private static final byte[] qualifier = Bytes.toBytes("testdata");
private static final byte[] data1 = Bytes.toBytes("testWrite-1");
private static final Logger LOG = LoggerFactory.getLogger(TestOmidLLRaces.class);
private TSOClient client;
@BeforeClass
public void setup() throws Exception {
// TSO Setup
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setPort(1234);
tsoConfig.setConflictMapSize(1000);
tsoConfig.setLowLatency(true);
tsoConfig.setWaitStrategy("LOW_CPU");
Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
TSOServer tso = injector.getInstance(TSOServer.class);
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
tso.startAsync();
tso.awaitRunning();
TestUtils.waitForSocketListening("localhost", 1234, 100);
LOG.info("Finished loading TSO");
OmidClientConfiguration clientConf = new OmidClientConfiguration();
clientConf.setConnectionString("localhost:1234");
// Create the associated Handler
client = TSOClient.newInstance(clientConf);
// ------------------------------------------------------------------------------------------------------------
// HBase setup
// ------------------------------------------------------------------------------------------------------------
LOG.info("Creating HBase minicluster");
hbaseConf = HBaseConfiguration.create();
hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
File tempFile = File.createTempFile("OmidTest", "");
tempFile.deleteOnExit();
hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
hBaseUtils = new HBaseTestingUtility(hbaseConf);
hbaseCluster = hBaseUtils.startMiniCluster(1);
connection = ConnectionFactory.createConnection(hbaseConf);
hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
Integer.MAX_VALUE);
createTestTable();
createCommitTable();
LOG.info("HBase minicluster is up");
}
private void createCommitTable() throws IOException {
String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
OmidTableManager omidTableManager = new OmidTableManager(args);
omidTableManager.executeActionsOnHBase(hbaseConf);
}
private void createTestTable() throws IOException {
HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
datafam.setMaxVersions(Integer.MAX_VALUE);
datafam2.setMaxVersions(Integer.MAX_VALUE);
test_table_desc.addFamily(datafam);
test_table_desc.addFamily(datafam2);
admin.createTable(test_table_desc);
}
protected TransactionManager newTransactionManagerHBaseCommitTable(TSOClient tsoClient) throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
clientConf.setConnectionString("localhost:1234");
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.tsoClient(tsoClient).build();
}
@Test(timeOut = 30_000)
public void testIsCommitted() throws Exception {
AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
tm.getCommitTableClient());
TTable table = spy(new TTable(htable, snapshotFilter, false));
HBaseTransaction t1 = (HBaseTransaction) tm.begin();
Put put = new Put(row1);
put.addColumn(family, qualifier, data1);
table.put(t1, put);
tm.commit(t1);
HBaseTransaction t2 = (HBaseTransaction) tm.begin();
put = new Put(row2);
put.addColumn(family, qualifier, data1);
table.put(t2, put);
table.flushCommits();
HBaseTransaction t3 = (HBaseTransaction) tm.begin();
put = new Put(row2);
put.addColumn(family, qualifier, data1);
table.put(t3, put);
tm.commit(t3);
HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
assertTrue(tm.isLowLatency());
}
@Test(timeOut = 30_000)
public void testInvalidation(ITestContext context) throws Exception {
AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
tm.getCommitTableClient());
TTable table = spy(new TTable(htable, snapshotFilter, false));
HBaseTransaction t1 = (HBaseTransaction) tm.begin();
Put put = new Put(row1);
put.addColumn(family, qualifier, data1);
table.put(t1, put);
HBaseTransaction t2 = (HBaseTransaction) tm.begin();
Get get = new Get(row1);
get.addColumn(family, qualifier);
table.get(t2,get);
//assert there is an invalidation marker:
Table commitTable = connection.getTable(TableName.valueOf("OMID_COMMIT_TABLE"));
KeyGenerator keygen = KeyGeneratorImplementations.defaultKeyGenerator();
byte[] row = keygen.startTimestampToKey(t1.getStartTimestamp());
Get getInvalidation = new Get(row);
getInvalidation.addColumn(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME),"IT".getBytes(UTF_8));
Result res = commitTable.get(getInvalidation);
int val = Bytes.toInt(res.getValue(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME), "IT".getBytes(UTF_8)));
assertTrue(val == 1);
boolean gotInvalidated = false;
try {
tm.commit(t1);
} catch (RollbackException e) {
gotInvalidated = true;
}
assertTrue(gotInvalidated);
tm.commit(t2);
Thread.sleep(1000);
res = commitTable.get(getInvalidation);
assertTrue(res.isEmpty());
assertTrue(tm.isLowLatency());
}
}