blob: d6541380201bf3c387dee90abc84c2a696524d9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.pherf.util;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.jmx.MonitorManager;
import org.apache.phoenix.pherf.result.DataLoadThreadTime;
import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.pherf.util.GoogleChartGenerator.Node;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
public class PhoenixUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixUtil.class);
private static String zookeeper;
private static int rowCountOverride = 0;
private boolean testEnabled;
private static PhoenixUtil instance;
private static boolean useThinDriver;
private static String queryServerUrl;
private static final String ASYNC_KEYWORD = "ASYNC";
private static final int ONE_MIN_IN_MS = 60000;
private static String CurrentSCN = null;
private PhoenixUtil() {
this(false);
}
private PhoenixUtil(final boolean testEnabled) {
this.testEnabled = testEnabled;
}
public static PhoenixUtil create() {
return create(false);
}
public static PhoenixUtil create(final boolean testEnabled) {
instance = instance != null ? instance : new PhoenixUtil(testEnabled);
return instance;
}
public static void useThinDriver(String queryServerUrl) {
PhoenixUtil.useThinDriver = true;
PhoenixUtil.queryServerUrl = Objects.requireNonNull(queryServerUrl);
}
public static String getQueryServerUrl() {
return PhoenixUtil.queryServerUrl;
}
public static boolean isThinDriver() {
return PhoenixUtil.useThinDriver;
}
public Connection getConnection() throws Exception {
return getConnection(null);
}
public Connection getConnection(String tenantId) throws Exception {
return getConnection(tenantId, testEnabled, null);
}
public Connection getConnection(String tenantId, Map<String, String> phoenixProperty) throws Exception {
return getConnection(tenantId, testEnabled, phoenixProperty);
}
public Connection getConnection(String tenantId, boolean testEnabled, Map<String, String> phoenixProperty) throws Exception {
if (useThinDriver) {
if (null == queryServerUrl) {
throw new IllegalArgumentException("QueryServer URL must be set before" +
" initializing connection");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
LOGGER.debug("\nSetting tenantId to " + tenantId);
}
String url = "jdbc:phoenix:thin:url=" + queryServerUrl + ";serialization=PROTOBUF";
return DriverManager.getConnection(url, props);
} else {
if (null == zookeeper) {
throw new IllegalArgumentException(
"Zookeeper must be set before initializing connection!");
}
Properties props = new Properties();
if (null != tenantId) {
props.setProperty("TenantId", tenantId);
LOGGER.debug("\nSetting tenantId to " + tenantId);
}
if (phoenixProperty != null) {
for (Map.Entry<String, String> phxProperty: phoenixProperty.entrySet()) {
props.setProperty(phxProperty.getKey(), phxProperty.getValue());
System.out.println("Setting connection property "
+ phxProperty.getKey() + " to "
+ phxProperty.getValue());
}
}
String url = "jdbc:phoenix:" + zookeeper + (testEnabled ? ";test=true" : "");
return DriverManager.getConnection(url, props);
}
}
public boolean executeStatement(String sql, Scenario scenario) throws Exception {
Connection connection = null;
boolean result = false;
try {
connection = getConnection(scenario.getTenantId());
result = executeStatement(sql, connection);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* Execute statement
*
* @param sql
* @param connection
* @return
* @throws SQLException
*/
public boolean executeStatementThrowException(String sql, Connection connection)
throws SQLException {
boolean result = false;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
result = preparedStatement.execute();
connection.commit();
} finally {
if(preparedStatement != null) {
preparedStatement.close();
}
}
return result;
}
public boolean executeStatement(String sql, Connection connection) throws SQLException{
boolean result = false;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
result = preparedStatement.execute();
connection.commit();
} finally {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return result;
}
@SuppressWarnings("unused")
public boolean executeStatement(PreparedStatement preparedStatement, Connection connection) {
boolean result = false;
try {
result = preparedStatement.execute();
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
}
return result;
}
/**
* Delete existing tables with schema name set as {@link PherfConstants#PHERF_SCHEMA_NAME} with regex comparison
*
* @param regexMatch
* @throws SQLException
* @throws Exception
*/
public void deleteTables(String regexMatch) throws Exception {
regexMatch = regexMatch.toUpperCase().replace("ALL", ".*");
Connection conn = getConnection();
try {
ResultSet resultSet = getTableMetaData(PherfConstants.PHERF_SCHEMA_NAME, null, conn);
while (resultSet.next()) {
String tableName = resultSet.getString(TABLE_SCHEM) == null ? resultSet
.getString(TABLE_NAME) : resultSet
.getString(TABLE_SCHEM)
+ "."
+ resultSet.getString(TABLE_NAME);
if (tableName.matches(regexMatch)) {
LOGGER.info("\nDropping " + tableName);
try {
executeStatementThrowException("DROP TABLE "
+ tableName + " CASCADE", conn);
} catch (org.apache.phoenix.schema.TableNotFoundException tnf) {
LOGGER.error("Table might be already be deleted via cascade. Schema: "
+ tnf.getSchemaName()
+ " Table: "
+ tnf.getTableName());
}
}
}
} finally {
conn.close();
}
}
public ResultSet getTableMetaData(String schemaName, String tableName, Connection connection)
throws SQLException {
DatabaseMetaData dbmd = connection.getMetaData();
ResultSet resultSet = dbmd.getTables(null, schemaName, tableName, null);
return resultSet;
}
public ResultSet getColumnsMetaData(String schemaName, String tableName, Connection connection)
throws SQLException {
DatabaseMetaData dbmd = connection.getMetaData();
ResultSet resultSet = dbmd.getColumns(null, schemaName.toUpperCase(), tableName.toUpperCase(), null);
return resultSet;
}
public synchronized List<Column> getColumnsFromPhoenix(String schemaName, String tableName,
Connection connection) throws SQLException {
List<Column> columnList = new ArrayList<>();
ResultSet resultSet = null;
try {
resultSet = getColumnsMetaData(schemaName, tableName, connection);
while (resultSet.next()) {
Column column = new Column();
column.setName(resultSet.getString("COLUMN_NAME"));
column.setType(DataTypeMapping.valueOf(resultSet.getString("TYPE_NAME").replace(" ", "_")));
column.setLength(resultSet.getInt("COLUMN_SIZE"));
columnList.add(column);
}
} finally {
if (null != resultSet) {
resultSet.close();
}
}
return Collections.unmodifiableList(columnList);
}
/**
* Execute all querySet DDLs first based on tenantId if specified. This is executed
* first since we don't want to run DDLs in parallel to executing queries.
*
* @param querySet
* @throws Exception
*/
public void executeQuerySetDdls(QuerySet querySet) throws Exception {
for (Query query : querySet.getQuery()) {
if (null != query.getDdl()) {
Connection conn = null;
try {
LOGGER.info("\nExecuting DDL:" + query.getDdl() + " on tenantId:" + query
.getTenantId());
executeStatement(query.getDdl(),
conn = getConnection(query.getTenantId()));
} finally {
if (null != conn) {
conn.close();
}
}
}
}
}
/**
* Executes any ddl defined at the scenario level. This is executed before we commence
* the data load.
*
* @throws Exception
*/
public void executeScenarioDdl(List<Ddl> ddls, String tenantId, DataLoadTimeSummary dataLoadTimeSummary) throws Exception {
if (null != ddls) {
Connection conn = null;
try {
for (Ddl ddl : ddls) {
LOGGER.info("\nExecuting DDL:" + ddl + " on tenantId:" +tenantId);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
executeStatement(ddl.toString(), conn = getConnection(tenantId));
if (ddl.getStatement().toUpperCase().contains(ASYNC_KEYWORD)) {
waitForAsyncIndexToFinish(ddl.getTableName());
}
dataLoadTimeSummary.add(ddl.getTableName(), 0,
(int)(EnvironmentEdgeManager.currentTimeMillis() - startTime));
}
} finally {
if (null != conn) {
conn.close();
}
}
}
}
/**
* Waits for ASYNC index to build
* @param tableName
* @throws InterruptedException
*/
private void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
//Wait for up to 15 mins for ASYNC index build to start
boolean jobStarted = false;
for (int i=0; i<15; i++) {
if (isYarnJobInProgress(tableName)) {
jobStarted = true;
break;
}
Thread.sleep(ONE_MIN_IN_MS);
}
if (jobStarted == false) {
throw new IllegalStateException("ASYNC index build did not start within 15 mins");
}
// Wait till ASYNC index job finishes to get approximate job E2E time
for (;;) {
if (!isYarnJobInProgress(tableName))
break;
Thread.sleep(ONE_MIN_IN_MS);
}
}
/**
* Checks if a YARN job with the specific table name is in progress
* @param tableName
* @return
*/
boolean isYarnJobInProgress(String tableName) {
try {
LOGGER.info("Fetching YARN apps...");
Set<String> response = new PhoenixMRJobSubmitter().getSubmittedYarnApps();
for (String str : response) {
LOGGER.info("Runnng YARN app: " + str);
if (str.toUpperCase().contains(tableName.toUpperCase())) {
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public static String getZookeeper() {
return zookeeper;
}
public static void setZookeeper(String zookeeper) {
LOGGER.info("Setting zookeeper: " + zookeeper);
useThickDriver(zookeeper);
}
public static void useThickDriver(String zookeeper) {
PhoenixUtil.useThinDriver = false;
PhoenixUtil.zookeeper = Objects.requireNonNull(zookeeper);
}
public static int getRowCountOverride() {
return rowCountOverride;
}
public static void setRowCountOverride(int rowCountOverride) {
PhoenixUtil.rowCountOverride = rowCountOverride;
}
/**
* Update Phoenix table stats
*
* @param tableName
* @throws Exception
*/
public void updatePhoenixStats(String tableName, Scenario scenario) throws Exception {
LOGGER.info("Updating stats for " + tableName);
executeStatement("UPDATE STATISTICS " + tableName, scenario);
}
public String getExplainPlan(Query query) throws SQLException {
return getExplainPlan(query, null, null);
}
/**
* Get explain plan for a query
*
* @param query
* @param ruleApplier
* @param scenario
* @return
* @throws SQLException
*/
public String getExplainPlan(Query query, Scenario scenario, RulesApplier ruleApplier) throws SQLException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement statement = null;
StringBuilder buf = new StringBuilder();
try {
conn = getConnection(query.getTenantId());
String explainQuery;
if (scenario != null && ruleApplier != null) {
explainQuery = query.getDynamicStatement(ruleApplier, scenario);
}
else {
explainQuery = query.getStatement();
}
statement = conn.prepareStatement("EXPLAIN " + explainQuery);
rs = statement.executeQuery();
while (rs.next()) {
buf.append(rs.getString(1).trim().replace(",", "-"));
}
statement.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (rs != null) rs.close();
if (statement != null) statement.close();
if (conn != null) conn.close();
}
return buf.toString();
}
}