blob: 6edbcb5fc9d66afe78ded8459023ef271e994053 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.NetworkUtils;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.InMemoryCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
import org.apache.omid.tools.hbase.OmidTableManager;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.omid.tso.client.TSOClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.AfterGroups;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeGroups;
import org.testng.annotations.BeforeMethod;
import com.google.inject.Guice;
import com.google.inject.Injector;
public abstract class OmidTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OmidTestBase.class);
static HBaseTestingUtility hBaseUtils;
private static MiniHBaseCluster hbaseCluster;
static Configuration hbaseConf;
static Connection connection;
protected static final String TEST_TABLE = "test";
protected static final String TEST_FAMILY = "data";
static final String TEST_FAMILY2 = "data2";
public static int port;
private HBaseCommitTableConfig hBaseCommitTableConfig;
@BeforeMethod(alwaysRun = true)
public void beforeClass(Method method) throws Exception {
Thread.currentThread().setName("UnitTest-" + method.getName());
}
@BeforeGroups(groups = "sharedHBase")
public void beforeGroups(ITestContext context) throws Exception {
// TSO Setup
TSOServerConfig tsoConfig = new TSOServerConfig();
port = NetworkUtils.getFreePort();
tsoConfig.setPort(port);
tsoConfig.setConflictMapSize(1000);
tsoConfig.setWaitStrategy("LOW_CPU");
tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
LOG.info("Starting TSO");
TSOServer tso = injector.getInstance(TSOServer.class);
hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
tso.startAsync();
tso.awaitRunning();
TestUtils.waitForSocketListening("localhost", port, 100);
LOG.info("Finished loading TSO");
context.setAttribute("tso", tso);
OmidClientConfiguration clientConf = new OmidClientConfiguration();
clientConf.setConnectionString("localhost:" + port);
context.setAttribute("clientConf", clientConf);
InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
context.setAttribute("commitTable", commitTable);
// Create the associated Handler
TSOClient client = TSOClient.newInstance(clientConf);
context.setAttribute("client", client);
// ------------------------------------------------------------------------------------------------------------
// HBase setup
// ------------------------------------------------------------------------------------------------------------
LOG.info("Creating HBase minicluster");
hbaseConf = HBaseConfiguration.create();
hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
File tempFile = File.createTempFile("OmidTest", "");
tempFile.deleteOnExit();
hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
hBaseUtils = new HBaseTestingUtility(hbaseConf);
hbaseCluster = hBaseUtils.startMiniCluster(1);
connection = ConnectionFactory.createConnection(hbaseConf);
hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
Integer.MAX_VALUE);
createTestTable();
createCommitTable();
LOG.info("HBase minicluster is up");
}
protected void createTestTable() throws IOException {
HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
datafam.setMaxVersions(Integer.MAX_VALUE);
datafam2.setMaxVersions(Integer.MAX_VALUE);
test_table_desc.addFamily(datafam);
test_table_desc.addFamily(datafam2);
admin.createTable(test_table_desc);
}
private void createCommitTable() throws IOException {
String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
OmidTableManager omidTableManager = new OmidTableManager(args);
omidTableManager.executeActionsOnHBase(hbaseConf);
}
private TSOServer getTSO(ITestContext context) {
return (TSOServer) context.getAttribute("tso");
}
TSOClient getClient(ITestContext context) {
return (TSOClient) context.getAttribute("client");
}
InMemoryCommitTable getCommitTable(ITestContext context) {
return (InMemoryCommitTable) context.getAttribute("commitTable");
}
protected TransactionManager newTransactionManager(ITestContext context) throws Exception {
return newTransactionManager(context, getClient(context));
}
protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.postCommitter(postCommitActions)
.commitTableClient(getCommitTable(context).getClient())
.commitTableWriter(getCommitTable(context).getWriter())
.tsoClient(getClient(context)).build();
}
protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.commitTableClient(getCommitTable(context).getClient())
.commitTableWriter(getCommitTable(context).getWriter())
.tsoClient(tsoClient).build();
}
protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient)
throws Exception {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
clientConf.setConnectionString("localhost:" + port);
clientConf.setHBaseConfiguration(hbaseConf);
return HBaseTransactionManager.builder(clientConf)
.commitTableClient(commitTableClient)
.commitTableWriter(getCommitTable(context).getWriter())
.tsoClient(getClient(context)).build();
}
@AfterGroups(groups = "sharedHBase")
public void afterGroups(ITestContext context) throws Exception {
LOG.info("Tearing down OmidTestBase...");
if (hbaseCluster != null) {
hBaseUtils.shutdownMiniCluster();
}
getClient(context).close().get();
getTSO(context).stopAsync();
getTSO(context).awaitTerminated();
TestUtils.waitForSocketNotListening("localhost", port, 1000);
}
@AfterMethod(groups = "sharedHBase", timeOut = 60_000)
public void afterMethod() {
try {
LOG.info("tearing Down");
Admin admin = hBaseUtils.getHBaseAdmin();
deleteTable(admin, TableName.valueOf(TEST_TABLE));
createTestTable();
if (hBaseCommitTableConfig != null) {
deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
}
createCommitTable();
} catch (Exception e) {
LOG.error("Error tearing down", e);
}
}
void deleteTable(Admin admin, TableName tableName) throws IOException {
if (admin.tableExists(tableName)) {
if (admin.isTableDisabled(tableName)) {
admin.deleteTable(tableName);
} else {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}
}
static boolean verifyValue(Table table, byte[] row,
byte[] fam, byte[] col, byte[] value) {
try {
Get g = new Get(row).setMaxVersions(1);
Result r = table.get(g);
Cell cell = r.getColumnLatestCell(fam, col);
if (LOG.isTraceEnabled()) {
LOG.trace("Value for " + table.getName().getNameAsString() + ":"
+ Bytes.toString(row) + ":" + Bytes.toString(fam)
+ Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell))
+ " (" + Bytes.toString(value) + " expected)");
}
return Bytes.equals(CellUtil.cloneValue(cell), value);
} catch (IOException e) {
LOG.error("Error reading row " + table.getName().getNameAsString() + ":"
+ Bytes.toString(row) + ":" + Bytes.toString(fam)
+ Bytes.toString(col), e);
return false;
}
}
}