blob: d369e3d65e6855353f3592f0fc572d45264d712e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore.utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLTransactionRollbackException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Scanner;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.IMetaStoreSchemaInfo;
import org.apache.hadoop.hive.metastore.MetaStoreSchemaInfoFactory;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct;
/**
* Utility methods for creating and destroying txn database/schema, plus methods for
* querying against metastore tables.
* Placed here in a separate class so it can be shared across unit tests.
*/
public final class TestTxnDbUtil {
private static final Logger LOG = LoggerFactory.getLogger(TestTxnDbUtil.class.getName());
private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
private static int deadlockCnt = 0;
private TestTxnDbUtil() {
throw new UnsupportedOperationException("Can't initialize class");
}
/**
* Set up the configuration so it will use the DbTxnManager, concurrency will be set to true,
* and the JDBC configs will be set for putting the transaction and lock info in the embedded
* metastore.
*
* @param conf HiveConf to add these values to
*/
public static void setConfValues(Configuration conf) {
MetastoreConf.setVar(conf, ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER);
MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
}
/**
* Prepares the metastore database for unit tests.
* Runs the latest init schema against the database configured in the CONNECT_URL_KEY param.
* Ignores any duplication (table, index etc.) So it can be called multiple times for the same database.
* @param conf Metastore configuration
* @throws Exception Initialization failure
*/
public static synchronized void prepDb(Configuration conf) throws Exception {
LOG.info("Creating transactional tables");
Connection conn = null;
Statement stmt = null;
try {
conn = getConnection(conf);
String s = conn.getMetaData().getDatabaseProductName();
DatabaseProduct dbProduct = determineDatabaseProduct(s, conf);
stmt = conn.createStatement();
if (checkDbPrepared(stmt)) {
return;
}
String schemaRootPath = getSchemaRootPath();
IMetaStoreSchemaInfo metaStoreSchemaInfo =
MetaStoreSchemaInfoFactory.get(conf, schemaRootPath, dbProduct.getHiveSchemaPostfix());
String initFile = metaStoreSchemaInfo.generateInitFileName(null);
try (InputStream is = new FileInputStream(
metaStoreSchemaInfo.getMetaStoreScriptDir() + File.separator + initFile)) {
LOG.info("Reinitializing the metastore db with {} on the database {}", initFile,
MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY));
importSQL(stmt, is);
}
} catch (SQLException e) {
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException re) {
LOG.error("Error rolling back: " + re.getMessage());
}
// This might be a deadlock, if so, let's retry
if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
LOG.warn("Caught deadlock, retrying db creation");
prepDb(conf);
} else {
throw e;
}
} finally {
deadlockCnt = 0;
closeResources(conn, stmt, null);
}
}
private static boolean checkDbPrepared(Statement stmt) {
/*
* If the transactional tables are already there we don't want to run everything again
*/
try {
stmt.execute("SELECT * FROM \"TXNS\"");
} catch (SQLException e) {
return false;
}
return true;
}
private static void importSQL(Statement stmt, InputStream in) throws SQLException {
Set<String> knownErrors = getAlreadyExistsErrorCodes();
Scanner s = new Scanner(in, "UTF-8");
s.useDelimiter("(;(\r)?\n)|(--.*\n)");
while (s.hasNext()) {
String line = s.next();
if (line.trim().length() > 0) {
try {
stmt.execute(line);
} catch (SQLException e) {
if (knownErrors.contains(e.getSQLState())) {
LOG.debug("Ignoring sql error {}", e.getMessage());
} else {
throw e;
}
}
}
}
}
private static Set<String> getAlreadyExistsErrorCodes() {
// function already exists, table already exists, index already exists, duplicate key
Set<String> knownErrors = new HashSet<>();
// derby
knownErrors.addAll(Arrays.asList("X0Y68", "X0Y32", "X0Y44", "42Z93", "23505"));
// postgres
knownErrors.addAll(Arrays.asList("42P07", "42P16", "42710"));
// mssql
knownErrors.addAll(Arrays.asList("S0000", "S0001", "23000"));
// mysql
knownErrors.addAll(Arrays.asList("42S01", "HY000"));
// oracle
knownErrors.addAll(Arrays.asList("42000"));
return knownErrors;
}
private static Set<String> getTableNotExistsErrorCodes() {
Set<String> knownErrors = new HashSet<>();
knownErrors.addAll(Arrays.asList("42X05", "42P01", "42S02", "S0002", "42000"));
return knownErrors;
}
private static String getSchemaRootPath() {
String hiveRoot = System.getProperty("hive.root");
if (StringUtils.isNotEmpty(hiveRoot)) {
return ensurePathEndsInSlash(hiveRoot) + "standalone-metastore/metastore-server/target/tmp/";
} else {
return ensurePathEndsInSlash(System.getProperty("test.tmp.dir", "target/tmp"));
}
}
private static String ensurePathEndsInSlash(String path) {
if (path == null) {
throw new NullPointerException("Path cannot be null");
}
if (path.endsWith(File.separator)) {
return path;
} else {
return path + File.separator;
}
}
public static void cleanDb(Configuration conf) throws Exception {
LOG.info("Cleaning transactional tables");
boolean success = true;
Connection conn = null;
Statement stmt = null;
try {
conn = getConnection(conf);
stmt = conn.createStatement();
if (!checkDbPrepared(stmt)){
// Nothing to clean
return;
}
// We want to try these, whether they succeed or fail.
success &= truncateTable(conn, conf, stmt, "TXN_COMPONENTS");
success &= truncateTable(conn, conf, stmt, "COMPLETED_TXN_COMPONENTS");
success &= truncateTable(conn, conf, stmt, "MIN_HISTORY_WRITE_ID");
success &= truncateTable(conn, conf, stmt, "TXNS");
success &= truncateTable(conn, conf, stmt, "TXN_TO_WRITE_ID");
success &= truncateTable(conn, conf, stmt, "NEXT_WRITE_ID");
success &= truncateTable(conn, conf, stmt, "HIVE_LOCKS");
success &= truncateTable(conn, conf, stmt, "NEXT_LOCK_ID");
success &= truncateTable(conn, conf, stmt, "COMPACTION_QUEUE");
success &= truncateTable(conn, conf, stmt, "NEXT_COMPACTION_QUEUE_ID");
success &= truncateTable(conn, conf, stmt, "COMPLETED_COMPACTIONS");
success &= truncateTable(conn, conf, stmt, "AUX_TABLE");
success &= truncateTable(conn, conf, stmt, "WRITE_SET");
success &= truncateTable(conn, conf, stmt, "REPL_TXN_MAP");
success &= truncateTable(conn, conf, stmt, "MATERIALIZATION_REBUILD_LOCKS");
success &= truncateTable(conn, conf, stmt, "MIN_HISTORY_LEVEL");
success &= truncateTable(conn, conf, stmt, "COMPACTION_METRICS_CACHE");
try {
String dbProduct = conn.getMetaData().getDatabaseProductName();
DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf);
try {
resetTxnSequence(databaseProduct, stmt);
stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)");
stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)");
} catch (SQLException e) {
if (!databaseProduct.isTableNotExistsError(e)) {
LOG.error("Error initializing sequence values", e);
throw e;
}
}
} catch (SQLException e) {
LOG.error("Unable determine database product ", e);
throw e;
}
/*
* Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other
* table which are not txn related to generate primary key. So if these tables are dropped
* and other tables are not dropped, then it will create key duplicate error while inserting
* to other table.
*/
} finally {
closeResources(conn, stmt, null);
}
if(success) {
return;
}
throw new RuntimeException("Failed to clean up txn tables");
}
private static void resetTxnSequence(DatabaseProduct databaseProduct, Statement stmt) throws SQLException {
for (String s : databaseProduct.getResetTxnSequenceStmts()) {
stmt.execute(s);
}
}
private static boolean truncateTable(Connection conn, Configuration conf, Statement stmt, String name) throws SQLException {
String dbProduct = conn.getMetaData().getDatabaseProductName();
DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf);
try {
// We can not use actual truncate due to some foreign keys, but we don't expect much data during tests
String s = databaseProduct.getTruncateStatement(name);
stmt.execute(s);
LOG.debug("Successfully truncated table " + name);
return true;
} catch (SQLException e) {
if (databaseProduct.isTableNotExistsError(e)) {
LOG.debug("Not truncating " + name + " because it doesn't exist");
return true;
}
LOG.error("Unable to truncate table " + name, e);
}
return false;
}
/**
* A tool to count the number of partitions, tables,
* and databases locked by a particular lockId.
*
* @param lockId lock id to look for lock components
*
* @return number of components, or 0 if there is no lock
*/
public static int countLockComponents(Configuration conf, long lockId) throws Exception {
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
conn = getConnection(conf);
stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?");
stmt.setLong(1, lockId);
rs = stmt.executeQuery();
if (!rs.next()) {
return 0;
}
return rs.getInt(1);
} finally {
closeResources(conn, stmt, rs);
}
}
/**
* Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables
* @param countQuery countQuery text
* @return count countQuery result
* @throws Exception
*/
public static int countQueryAgent(Configuration conf, String countQuery) throws Exception {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = getConnection(conf);
stmt = conn.createStatement();
rs = stmt.executeQuery(countQuery);
if (!rs.next()) {
return 0;
}
return rs.getInt(1);
} finally {
closeResources(conn, stmt, rs);
}
}
public static String queryToString(Configuration conf, String query) throws Exception {
return queryToString(conf, query, true);
}
public static String queryToString(Configuration conf, String query, boolean includeHeader) throws Exception {
return queryToString(conf, query, includeHeader, " ");
}
public static String queryToCsv(Configuration conf, String query) throws Exception {
return queryToString(conf, query, true, ",");
}
public static String queryToCsv(Configuration conf, String query, boolean includeHeader) throws Exception {
return queryToString(conf, query, includeHeader, ",");
}
public static String queryToString(Configuration conf, String query, boolean includeHeader, String columnSeparator)
throws Exception {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
StringBuilder sb = new StringBuilder();
try {
conn = getConnection(conf);
stmt = conn.createStatement();
rs = stmt.executeQuery(query);
ResultSetMetaData rsmd = rs.getMetaData();
if(includeHeader) {
for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
sb.append(rsmd.getColumnName(colPos)).append(columnSeparator);
}
sb.append('\n');
}
while(rs.next()) {
for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
sb.append(rs.getObject(colPos)).append(columnSeparator);
}
sb.append('\n');
}
} finally {
closeResources(conn, stmt, rs);
}
return sb.toString();
}
/**
* This is only for testing, it does not use the connectionPool from TxnHandler!
* @param conf
* @param query
* @throws Exception
*/
public static void executeUpdate(Configuration conf, String query)
throws Exception {
Connection conn = null;
Statement stmt = null;
try {
conn = getConnection(conf);
stmt = conn.createStatement();
stmt.executeUpdate(query);
} finally {
closeResources(conn, stmt, null);
}
}
public static Connection getConnection(Configuration conf) throws Exception {
String jdbcDriver = MetastoreConf.getVar(conf, ConfVars.CONNECTION_DRIVER);
Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
Properties prop = new Properties();
String driverUrl = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY);
String user = MetastoreConf.getVar(conf, ConfVars.CONNECTION_USER_NAME);
String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
prop.setProperty("user", user);
prop.setProperty("password", passwd);
Connection conn = driver.connect(driverUrl, prop);
conn.setAutoCommit(true);
DatabaseProduct dbProduct = determineDatabaseProduct(conn.getMetaData().getDatabaseProductName(), conf);
String initSql = dbProduct.getPrepareTxnStmt();
if (initSql != null) {
try (Statement stmt = conn.createStatement()) {
stmt.execute(initSql);
}
}
return conn;
}
public static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
LOG.error("Error closing ResultSet: " + e.getMessage());
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
System.err.println("Error closing Statement: " + e.getMessage());
}
}
if (conn != null) {
try {
conn.rollback();
} catch (SQLException e) {
System.err.println("Error rolling back: " + e.getMessage());
}
try {
conn.close();
} catch (SQLException e) {
System.err.println("Error closing Connection: " + e.getMessage());
}
}
}
}