blob: 4c1b6b459391204464724c900d2e4544b5624c0e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.pherf.util;
import com.google.gson.Gson;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.Column;
import org.apache.phoenix.pherf.configuration.DataTypeMapping;
import org.apache.phoenix.pherf.configuration.Ddl;
import org.apache.phoenix.pherf.configuration.Query;
import org.apache.phoenix.pherf.configuration.QuerySet;
import org.apache.phoenix.pherf.configuration.Scenario;
import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
import org.apache.phoenix.pherf.rules.DataValue;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
public class PhoenixUtil {
public static final String ASYNC_KEYWORD = "ASYNC";
public static final Gson GSON = new Gson();
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixUtil.class);
private static String zookeeper;
private static int rowCountOverride = 0;
private boolean testEnabled;
private static PhoenixUtil instance;
private static boolean useThinDriver;
private static String queryServerUrl;
private static final int ONE_MIN_IN_MS = 60000;
private static String CurrentSCN = null;
private PhoenixUtil() {
this(false);
}
private PhoenixUtil(final boolean testEnabled) {
this.testEnabled = testEnabled;
}
public static PhoenixUtil create() {
return create(false);
}
public static PhoenixUtil create(final boolean testEnabled) {
instance = instance != null ? instance : new PhoenixUtil(testEnabled);
return instance;
}
public static void useThinDriver(String queryServerUrl) {
PhoenixUtil.useThinDriver = true;
PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl);
}
public static String getQueryServerUrl() {
return PhoenixUtil.queryServerUrl;
}
public static boolean isThinDriver() {
return PhoenixUtil.useThinDriver;
}
public static Gson getGSON() {
return GSON;
}
public Connection getConnection() throws Exception {
return getConnection(null);
}
public Connection getConnection(String tenantId) throws Exception {
return getConnection(tenantId, testEnabled, null);
}
public Connection getConnection(String tenantId,
Properties properties) throws Exception {
Map<String, String> propertyHashMap = getPropertyHashMap(properties);
return getConnection(tenantId, testEnabled, propertyHashMap);
}
public Connection getConnection(String tenantId,
Map<String, String> propertyHashMap) throws Exception {
return getConnection(tenantId, testEnabled, propertyHashMap);
}
public Connection getConnection(String tenantId, boolean testEnabled,
Map<String, String> propertyHashMap) throws Exception {
if (useThinDriver) {
if (null == queryServerUrl) {
throw new IllegalArgumentException("QueryServer URL must be set before" +
" initializing connection");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
LOGGER.debug("\nSetting tenantId to " + tenantId);
}
String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF";
return DriverManager.getConnection(url, props);
} else {
if (null == zookeeper) {
throw new IllegalArgumentException(
"Zookeeper must be set before initializing connection!");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
LOGGER.debug("\nSetting tenantId to " + tenantId);
}
if (propertyHashMap != null) {
for (Map.Entry<String, String> phxProperty: propertyHashMap.entrySet()) {
props.setProperty(phxProperty.getKey(), phxProperty.getValue());
LOGGER.debug("Setting connection property "
+ phxProperty.getKey() + " to "
+ phxProperty.getValue());
}
}
String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
return DriverManager.getConnection(url, props);
}
}
private Map<String, String> getPropertyHashMap(Properties props) {
Map<String, String> propsMaps = new HashMap<>();
for (String prop : props.stringPropertyNames()) {
propsMaps.put(prop, props.getProperty(prop));
}
return propsMaps;
}
public boolean executeStatement(String sql, Scenario scenario) throws Exception {
Connection connection = null;
boolean result = false;
try {
connection = getConnection(scenario.getTenantId());
result = executeStatement(sql, connection);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* Execute statement
*
* @param sql
* @param connection
* @return
* @throws SQLException
*/
public boolean executeStatementThrowException(String sql, Connection connection)
throws SQLException {
boolean result = false;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
result = preparedStatement.execute();
connection.commit();
} finally {
if(preparedStatement != null) {
preparedStatement.close();
}
}
return result;
}
public boolean executeStatement(String sql, Connection connection) throws SQLException{
boolean result = false;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
result = preparedStatement.execute();
connection.commit();
} finally {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return result;
}
@SuppressWarnings("unused")
public boolean executeStatement(PreparedStatement preparedStatement, Connection connection) {
boolean result = false;
try {
result = preparedStatement.execute();
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
}
return result;
}
/**
* Delete existing tables with schema name set as {@link PherfConstants#PHERF_SCHEMA_NAME} with regex comparison
*
* @param regexMatch
* @throws SQLException
* @throws Exception
*/
public void deleteTables(String regexMatch) throws Exception {
regexMatch = regexMatch.toUpperCase().replace("ALL", ".*");
Connection conn = getConnection();
try {
ResultSet resultSet = getTableMetaData(PherfConstants.PHERF_SCHEMA_NAME, null, conn);
while (resultSet.next()) {
String tableName = resultSet.getString(TABLE_SCHEM) == null ? resultSet
.getString(TABLE_NAME) : resultSet
.getString(TABLE_SCHEM)
+ "."
+ resultSet.getString(TABLE_NAME);
if (tableName.matches(regexMatch)) {
LOGGER.info("\nDropping " + tableName);
try {
executeStatementThrowException("DROP TABLE "
+ tableName + " CASCADE", conn);
} catch (org.apache.phoenix.schema.TableNotFoundException tnf) {
LOGGER.error("Table might be already be deleted via cascade. Schema: "
+ tnf.getSchemaName()
+ " Table: "
+ tnf.getTableName());
}
}
}
} finally {
conn.close();
}
}
public void dropChildView(RegionCoprocessorEnvironment taskRegionEnvironment, int depth) {
TaskRegionObserver.SelfHealingTask task =
new TaskRegionObserver.SelfHealingTask(
taskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
for (int i = 0; i < depth; i++) {
task.run();
}
}
public ResultSet getTableMetaData(String schemaName, String tableName, Connection connection)
throws SQLException {
DatabaseMetaData dbmd = connection.getMetaData();
ResultSet resultSet = dbmd.getTables(null, schemaName, tableName, null);
return resultSet;
}
public ResultSet getColumnsMetaData(String schemaName, String tableName, Connection connection)
throws SQLException {
DatabaseMetaData dbmd = connection.getMetaData();
ResultSet resultSet = dbmd.getColumns(null, schemaName.toUpperCase(), tableName.toUpperCase(), null);
return resultSet;
}
public synchronized List<Column> getColumnsFromPhoenix(String schemaName, String tableName,
Connection connection) throws SQLException {
List<Column> columnList = new ArrayList<>();
ResultSet resultSet = null;
try {
resultSet = getColumnsMetaData(schemaName, tableName, connection);
while (resultSet.next()) {
Column column = new Column();
column.setName(resultSet.getString("COLUMN_NAME"));
column.setType(DataTypeMapping.valueOf(resultSet.getString("TYPE_NAME").replace(" ", "_")));
column.setLength(resultSet.getInt("COLUMN_SIZE"));
columnList.add(column);
LOGGER.debug(String.format("getColumnsMetaData for column name : %s", column.getName()));
}
} finally {
if (null != resultSet) {
resultSet.close();
}
}
return Collections.unmodifiableList(columnList);
}
/**
* Execute all querySet DDLs first based on tenantId if specified. This is executed
* first since we don't want to run DDLs in parallel to executing queries.
*
* @param querySet
* @throws Exception
*/
public void executeQuerySetDdls(QuerySet querySet) throws Exception {
for (Query query : querySet.getQuery()) {
if (null != query.getDdl()) {
Connection conn = null;
try {
LOGGER.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query
.getTenantId());
executeStatement(query.getDdl(),
conn = getConnection(query.getTenantId()));
} finally {
if (null != conn) {
conn.close();
}
}
}
}
}
/**
* Executes any ddl defined at the scenario level. This is executed before we commence
* the data load.
*
* @throws Exception
*/
public void executeScenarioDdl(List<Ddl> ddls, String tenantId, DataLoadTimeSummary dataLoadTimeSummary) throws Exception {
if (null != ddls) {
Connection conn = null;
try {
for (Ddl ddl : ddls) {
LOGGER.info("\nExecuting DDL:" + ddl + " on tenantId:" +tenantId);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
executeStatement(ddl.toString(), conn = getConnection(tenantId));
if (ddl.getStatement().toUpperCase().contains(ASYNC_KEYWORD)) {
waitForAsyncIndexToFinish(ddl.getTableName());
}
dataLoadTimeSummary.add(ddl.getTableName(), 0,
(int)(EnvironmentEdgeManager.currentTimeMillis() - startTime));
}
} finally {
if (null != conn) {
conn.close();
}
}
}
}
/**
* Waits for ASYNC index to build
* @param tableName
* @throws InterruptedException
*/
public void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
//Wait for up to 15 mins for ASYNC index build to start
boolean jobStarted = false;
for (int i=0; i<15; i++) {
if (isYarnJobInProgress(tableName)) {
jobStarted = true;
break;
}
Thread.sleep(ONE_MIN_IN_MS);
}
if (jobStarted == false) {
throw new IllegalStateException("ASYNC index build did not start within 15 mins");
}
// Wait till ASYNC index job finishes to get approximate job E2E time
for (;;) {
if (!isYarnJobInProgress(tableName))
break;
Thread.sleep(ONE_MIN_IN_MS);
}
}
/**
* Checks if a YARN job with the specific table name is in progress
* @param tableName
* @return
*/
boolean isYarnJobInProgress(String tableName) {
try {
LOGGER.info("Fetching YARN apps...");
Set<String> response = new PhoenixMRJobSubmitter().getSubmittedYarnApps();
for (String str : response) {
LOGGER.info("Runnng YARN app: " + str);
if (str.toUpperCase().contains(tableName.toUpperCase())) {
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public static String getZookeeper() {
return zookeeper;
}
public static void setZookeeper(String zookeeper) {
LOGGER.info("Setting zookeeper: " + zookeeper);
useThickDriver(zookeeper);
}
public static void useThickDriver(String zookeeper) {
PhoenixUtil.useThinDriver = false;
PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper);
}
public static int getRowCountOverride() {
return rowCountOverride;
}
public static void setRowCountOverride(int rowCountOverride) {
PhoenixUtil.rowCountOverride = rowCountOverride;
}
/**
* Update Phoenix table stats
*
* @param tableName
* @throws Exception
*/
public void updatePhoenixStats(String tableName, Scenario scenario) throws Exception {
LOGGER.info("Updating stats for " + tableName);
executeStatement("UPDATE STATISTICS " + tableName, scenario);
}
public String getExplainPlan(Query query) throws SQLException {
return getExplainPlan(query, null, null);
}
/**
* Get explain plan for a query
*
* @param query
* @param ruleApplier
* @param scenario
* @return
* @throws SQLException
*/
public String getExplainPlan(Query query, Scenario scenario, RulesApplier ruleApplier) throws SQLException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement statement = null;
StringBuilder buf = new StringBuilder();
try {
conn = getConnection(query.getTenantId());
String explainQuery;
if (scenario != null && ruleApplier != null) {
explainQuery = query.getDynamicStatement(ruleApplier, scenario);
}
else {
explainQuery = query.getStatement();
}
statement = conn.prepareStatement("EXPLAIN " + explainQuery);
rs = statement.executeQuery();
while (rs.next()) {
buf.append(rs.getString(1).trim().replace(",", "-"));
}
statement.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (rs != null) rs.close();
if (statement != null) statement.close();
if (conn != null) conn.close();
}
return buf.toString();
}
public PreparedStatement buildStatement(RulesApplier rulesApplier, Scenario scenario, List<Column> columns,
PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception {
int count = 1;
for (Column column : columns) {
DataValue dataValue = rulesApplier.getDataForRule(scenario, column);
switch (column.getType()) {
case VARCHAR:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.VARCHAR);
} else {
statement.setString(count, dataValue.getValue());
}
break;
case JSON:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.VARBINARY);
} else {
statement.setString(count, dataValue.getValue());
}
break;
case CHAR:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.CHAR);
} else {
statement.setString(count, dataValue.getValue());
}
break;
case DECIMAL:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.DECIMAL);
} else {
statement.setBigDecimal(count, new BigDecimal(dataValue.getValue()));
}
break;
case INTEGER:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.INTEGER);
} else {
statement.setInt(count, Integer.parseInt(dataValue.getValue()));
}
break;
case UNSIGNED_LONG:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.OTHER);
} else {
statement.setLong(count, Long.parseLong(dataValue.getValue()));
}
break;
case BIGINT:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.BIGINT);
} else {
statement.setLong(count, Long.parseLong(dataValue.getValue()));
}
break;
case TINYINT:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.TINYINT);
} else {
statement.setLong(count, Integer.parseInt(dataValue.getValue()));
}
break;
case DATE:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.DATE);
} else {
Date
date =
new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime());
statement.setDate(count, date);
}
break;
case VARCHAR_ARRAY:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.ARRAY);
} else {
Array
arr =
statement.getConnection().createArrayOf("VARCHAR", dataValue.getValue().split(","));
statement.setArray(count, arr);
}
break;
case VARBINARY:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.VARBINARY);
} else {
statement.setBytes(count, dataValue.getValue().getBytes());
}
break;
case TIMESTAMP:
if (dataValue.getValue().equals("")) {
statement.setNull(count, Types.TIMESTAMP);
} else {
java.sql.Timestamp
ts =
new java.sql.Timestamp(simpleDateFormat.parse(dataValue.getValue()).getTime());
statement.setTimestamp(count, ts);
}
break;
default:
break;
}
count++;
}
return statement;
}
public String buildSql(final List<Column> columns, final String tableName) {
StringBuilder builder = new StringBuilder();
builder.append("upsert into ");
builder.append(tableName);
builder.append(" (");
int count = 1;
for (Column column : columns) {
builder.append(column.getName());
if (count < columns.size()) {
builder.append(",");
} else {
builder.append(")");
}
count++;
}
builder.append(" VALUES (");
for (int i = 0; i < columns.size(); i++) {
if (i < columns.size() - 1) {
builder.append("?,");
} else {
builder.append("?)");
}
}
return builder.toString();
}
public org.apache.hadoop.hbase.util.Pair<Long, Long> getResults(
Query query,
ResultSet rs,
String queryIteration,
boolean isSelectCountStatement,
Long queryStartTime) throws Exception {
Long resultRowCount = 0L;
while (rs.next()) {
if (isSelectCountStatement) {
resultRowCount = rs.getLong(1);
} else {
resultRowCount++;
}
long queryElapsedTime = EnvironmentEdgeManager.currentTimeMillis() - queryStartTime;
if (queryElapsedTime >= query.getTimeoutDuration()) {
LOGGER.error("Query " + queryIteration + " exceeded timeout of "
+ query.getTimeoutDuration() + " ms at " + queryElapsedTime + " ms.");
return new org.apache.hadoop.hbase.util.Pair(resultRowCount, queryElapsedTime);
}
}
return new org.apache.hadoop.hbase.util.Pair(resultRowCount, EnvironmentEdgeManager.currentTimeMillis() - queryStartTime);
}
}