blob: 0ef792ecefe6650dbc7c84933f651d5e985ada11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.commons.lang3.StringUtils.normalizeSpace;
import static org.apache.commons.lang3.StringUtils.repeat;
import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.throwMetaOrRuntimeException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import javax.jdo.Transaction;
import javax.jdo.datastore.JDOConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.DatabaseType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.HiveObjectType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.model.MConstraint;
import org.apache.hadoop.hive.metastore.model.MCreationMetadata;
import org.apache.hadoop.hive.metastore.model.MDatabase;
import org.apache.hadoop.hive.metastore.model.MNotificationLog;
import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hive.common.util.BloomFilter;
import org.datanucleus.store.rdbms.query.ForwardQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* This class contains the optimizations for MetaStore that rely on direct SQL access to
* the underlying database. It should use ANSI SQL and be compatible with common databases
* such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc.
*
* As of now, only the partition retrieval is done this way to improve job startup time;
* JDOQL partition retrieval is still present so as not to limit the ORM solution we have
* to SQL stores only. There's always a way to do without direct SQL.
*/
class MetaStoreDirectSql {
private static final int NO_BATCHING = -1, DETECT_BATCHING = 0;
private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class);
private final PersistenceManager pm;
private final Configuration conf;
private final String schema;
/**
* We want to avoid db-specific code in this class and stick with ANSI SQL. However:
* 1) mysql and postgres are differently ansi-incompatible (mysql by default doesn't support
* quoted identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case).
* MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will
* use that. MySQL detection is done by actually issuing the set-ansi-quotes command;
*
* Use sparingly, we don't want to devolve into another DataNucleus...
*/
private final DatabaseProduct dbType;
private final int batchSize;
private final boolean convertMapNullsToEmptyStrings;
private final String defaultPartName;
/**
* Whether direct SQL can be used with the current datastore backing {@link #pm}.
*/
private final boolean isCompatibleDatastore;
private final boolean isAggregateStatsCacheEnabled;
private final ImmutableMap<String, String> fieldnameToTableName;
private AggregateStatsCache aggrStatsCache;
private DirectSqlUpdateStat updateStat;
/**
* This method returns a comma separated string consisting of String values of a given list.
* This is used for preparing "SOMETHING_ID in (...)" to use in SQL queries.
* @param objectIds the objectId collection
* @return The concatenated list
* @throws MetaException If the list contains wrong data
*/
public static <T> String getIdListForIn(List<T> objectIds) throws MetaException {
return objectIds.stream()
.map(i -> i.toString())
.collect(Collectors.joining(","));
}
@java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
@java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
private @interface TableName {}
// Table names with schema name, if necessary
@TableName
private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS,
SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES,
SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS,
TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS,
TBL_COL_PRIVS;
public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) {
this.pm = pm;
this.conf = conf;
this.schema = schema;
DatabaseProduct dbType = null;
dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm), conf);
this.dbType = dbType;
int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
if (batchSize == DETECT_BATCHING) {
batchSize = dbType.needsInBatching() ? 1000 : NO_BATCHING;
}
this.batchSize = batchSize;
this.updateStat = new DirectSqlUpdateStat(pm, conf, dbType, batchSize);
ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
new ImmutableMap.Builder<>();
for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) {
if (f.getAnnotation(TableName.class) == null) {
continue;
}
try {
String value = getFullyQualifiedName(schema, f.getName());
f.set(this, value);
fieldNameToTableNameBuilder.put(f.getName(), value);
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException("Internal error, cannot set " + f.getName());
}
}
convertMapNullsToEmptyStrings =
MetastoreConf.getBoolVar(conf, ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS);
defaultPartName = MetastoreConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
String jdoIdFactory = MetastoreConf.getVar(conf, ConfVars.IDENTIFIER_FACTORY);
if (! ("datanucleus1".equalsIgnoreCase(jdoIdFactory))){
LOG.warn("Underlying metastore does not use 'datanucleus1' for its ORM naming scheme."
+ " Disabling directSQL as it uses hand-hardcoded SQL with that assumption.");
isCompatibleDatastore = false;
} else {
boolean isInTest = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
isCompatibleDatastore = (!isInTest || ensureDbInit()) && runTestQuery();
if (isCompatibleDatastore) {
LOG.debug("Using direct SQL, underlying DB is " + dbType);
}
}
isAggregateStatsCacheEnabled = MetastoreConf.getBoolVar(
conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED);
if (isAggregateStatsCacheEnabled) {
aggrStatsCache = AggregateStatsCache.getInstance(conf);
}
// now use the tableanames to create the mapping
// note that some of the optional single-valued fields are not present
fieldnameToTableName =
fieldNameToTableNameBuilder
.put("createTime", PARTITIONS + ".\"CREATE_TIME\"")
.put("lastAccessTime", PARTITIONS + ".\"LAST_ACCESS_TIME\"")
.put("writeId", PARTITIONS + ".\"WRITE_ID\"")
.put("sd.location", SDS + ".\"LOCATION\"")
.put("sd.inputFormat", SDS + ".\"INPUT_FORMAT\"")
.put("sd.outputFormat", SDS + ".\"OUTPUT_FORMAT\"")
.put("sd.storedAsSubDirectories", SDS + ".\"IS_STOREDASSUBDIRECTORIES\"")
.put("sd.compressed", SDS + ".\"IS_COMPRESSED\"")
.put("sd.numBuckets", SDS + ".\"NUM_BUCKETS\"")
.put("sd.serdeInfo.name", SERDES + ".\"NAME\"")
.put("sd.serdeInfo.serializationLib", SERDES + ".\"SLIB\"")
.put("PART_ID", PARTITIONS + ".\"PART_ID\"")
.put("SD_ID", SDS + ".\"SD_ID\"")
.put("SERDE_ID", SERDES + ".\"SERDE_ID\"")
.put("CD_ID", SDS + ".\"CD_ID\"")
.build();
}
private static String getFullyQualifiedName(String schema, String tblName) {
return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + "\".\"")
+ "\"" + tblName + "\"";
}
public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) {
this(pm, conf, "");
}
static String getProductName(PersistenceManager pm) {
JDOConnection jdoConn = pm.getDataStoreConnection();
try {
return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
} catch (Throwable t) {
LOG.warn("Error retrieving product name", t);
return null;
} finally {
jdoConn.close(); // We must release the connection before we call other pm methods.
}
}
private boolean ensureDbInit() {
Transaction tx = pm.currentTransaction();
boolean doCommit = false;
if (!tx.isActive()) {
tx.begin();
doCommit = true;
}
LinkedList<Query> initQueries = new LinkedList<>();
try {
// Force the underlying db to initialize.
initQueries.add(pm.newQuery(MDatabase.class, "name == ''"));
initQueries.add(pm.newQuery(MTableColumnStatistics.class, "dbName == ''"));
initQueries.add(pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''"));
initQueries.add(pm.newQuery(MConstraint.class, "childIntegerIndex < 0"));
initQueries.add(pm.newQuery(MNotificationLog.class, "dbName == ''"));
initQueries.add(pm.newQuery(MNotificationNextId.class, "nextEventId < -1"));
initQueries.add(pm.newQuery(MWMResourcePlan.class, "name == ''"));
initQueries.add(pm.newQuery(MCreationMetadata.class, "dbName == ''"));
initQueries.add(pm.newQuery(MPartitionPrivilege.class, "principalName == ''"));
initQueries.add(pm.newQuery(MPartitionColumnPrivilege.class, "principalName == ''"));
for (Query q : initQueries) {
q.execute();
}
return true;
} catch (Exception ex) {
doCommit = false;
LOG.warn("Database initialization failed; direct SQL is disabled", ex);
tx.rollback();
return false;
} finally {
if (doCommit) {
tx.commit();
}
for (Query q : initQueries) {
try {
q.closeAll();
} catch (Throwable t) {
}
}
}
}
private boolean runTestQuery() {
Transaction tx = pm.currentTransaction();
boolean doCommit = false;
if (!tx.isActive()) {
tx.begin();
doCommit = true;
}
Query query = null;
// Run a self-test query. If it doesn't work, we will self-disable. What a PITA...
String selfTestQuery = "select \"DB_ID\" from " + DBS + "";
try {
prepareTxn();
query = pm.newQuery("javax.jdo.query.SQL", selfTestQuery);
query.execute();
return true;
} catch (Throwable t) {
doCommit = false;
LOG.warn("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", t);
tx.rollback();
return false;
} finally {
if (doCommit) {
tx.commit();
}
if (query != null) {
query.closeAll();
}
}
}
public String getSchema() {
return schema;
}
public boolean isCompatibleDatastore() {
return isCompatibleDatastore;
}
private void executeNoResult(final String queryText) throws SQLException {
JDOConnection jdoConn = pm.getDataStoreConnection();
Statement statement = null;
boolean doTrace = LOG.isDebugEnabled();
try {
long start = doTrace ? System.nanoTime() : 0;
statement = ((Connection)jdoConn.getNativeConnection()).createStatement();
statement.execute(queryText);
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
} finally {
if(statement != null){
statement.close();
}
jdoConn.close(); // We must release the connection before we call other pm methods.
}
}
public Database getDatabase(String catName, String dbName) throws MetaException{
Query queryDbSelector = null;
Query queryDbParams = null;
try {
dbName = dbName.toLowerCase();
catName = catName.toLowerCase();
String queryTextDbSelector= "select "
+ "\"DB_ID\", \"NAME\", \"DB_LOCATION_URI\", \"DESC\", "
+ "\"OWNER_NAME\", \"OWNER_TYPE\", \"CTLG_NAME\" , \"CREATE_TIME\", \"DB_MANAGED_LOCATION_URI\", "
+ "\"TYPE\", \"DATACONNECTOR_NAME\", \"REMOTE_DBNAME\""
+ "FROM "+ DBS
+ " where \"NAME\" = ? and \"CTLG_NAME\" = ? ";
Object[] params = new Object[] { dbName, catName };
queryDbSelector = pm.newQuery("javax.jdo.query.SQL", queryTextDbSelector);
if (LOG.isTraceEnabled()) {
LOG.trace("getDatabase:query instantiated : " + queryTextDbSelector
+ " with param [" + params[0] + "]");
}
List<Object[]> sqlResult = executeWithArray(
queryDbSelector, params, queryTextDbSelector);
if ((sqlResult == null) || sqlResult.isEmpty()) {
return null;
}
assert(sqlResult.size() == 1);
if (sqlResult.get(0) == null) {
return null;
}
Object[] dbline = sqlResult.get(0);
Long dbid = MetastoreDirectSqlUtils.extractSqlLong(dbline[0]);
String queryTextDbParams = "select \"PARAM_KEY\", \"PARAM_VALUE\" "
+ " from " + DATABASE_PARAMS + " "
+ " WHERE \"DB_ID\" = ? "
+ " AND \"PARAM_KEY\" IS NOT NULL";
params[0] = dbid;
queryDbParams = pm.newQuery("javax.jdo.query.SQL", queryTextDbParams);
if (LOG.isTraceEnabled()) {
LOG.trace("getDatabase:query2 instantiated : " + queryTextDbParams
+ " with param [" + params[0] + "]");
}
Map<String,String> dbParams = new HashMap<String,String>();
List<Object[]> sqlResult2 = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryDbParams, params, queryTextDbParams));
if (!sqlResult2.isEmpty()) {
for (Object[] line : sqlResult2) {
dbParams.put(MetastoreDirectSqlUtils.extractSqlString(line[0]), MetastoreDirectSqlUtils
.extractSqlString(line[1]));
}
}
Database db = new Database();
db.setName(MetastoreDirectSqlUtils.extractSqlString(dbline[1]));
db.setLocationUri(MetastoreDirectSqlUtils.extractSqlString(dbline[2]));
db.setDescription(MetastoreDirectSqlUtils.extractSqlString(dbline[3]));
db.setOwnerName(MetastoreDirectSqlUtils.extractSqlString(dbline[4]));
String type = MetastoreDirectSqlUtils.extractSqlString(dbline[5]);
db.setOwnerType(
(null == type || type.trim().isEmpty()) ? null : PrincipalType.valueOf(type));
db.setCatalogName(MetastoreDirectSqlUtils.extractSqlString(dbline[6]));
if (dbline[7] != null) {
db.setCreateTime(MetastoreDirectSqlUtils.extractSqlInt(dbline[7]));
}
db.setManagedLocationUri(MetastoreDirectSqlUtils.extractSqlString(dbline[8]));
String dbType = MetastoreDirectSqlUtils.extractSqlString(dbline[9]);
if (dbType != null && dbType.equalsIgnoreCase(DatabaseType.REMOTE.name())) {
db.setType(DatabaseType.REMOTE);
db.setConnector_name(MetastoreDirectSqlUtils.extractSqlString(dbline[10]));
db.setRemote_dbname(MetastoreDirectSqlUtils.extractSqlString(dbline[11]));
} else {
db.setType(DatabaseType.NATIVE);
}
db.setParameters(MetaStoreServerUtils.trimMapNulls(dbParams,convertMapNullsToEmptyStrings));
if (LOG.isDebugEnabled()){
LOG.debug("getDatabase: directsql returning db " + db.getName()
+ " locn["+db.getLocationUri() +"] desc [" +db.getDescription()
+ "] owner [" + db.getOwnerName() + "] ownertype ["+ db.getOwnerType() +"]");
}
return db;
} finally {
if (queryDbSelector != null){
queryDbSelector.closeAll();
}
if (queryDbParams != null){
queryDbParams.closeAll();
}
}
}
/**
* Get table names by using direct SQL queries.
* @param catName catalog name
* @param dbName Metastore database namme
* @param tableType Table type, or null if we want to get all tables
* @return list of table names
*/
public List<String> getTables(String catName, String dbName, TableType tableType, int limit)
throws MetaException {
String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
+ " FROM " + TBLS + " "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " WHERE " + DBS + ".\"NAME\" = ? AND " + DBS + ".\"CTLG_NAME\" = ? "
+ (tableType == null ? "" : "AND " + TBLS + ".\"TBL_TYPE\" = ? ") ;
List<String> pms = new ArrayList<>();
pms.add(dbName);
pms.add(catName);
if (tableType != null) {
pms.add(tableType.toString());
}
Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<String> tableNames = executeWithArray(
queryParams, pms.toArray(), queryText, limit);
List<String> results = new ArrayList<String>(tableNames);
queryParams.closeAll();
return results;
}
/**
* Get table names by using direct SQL queries.
*
* @param dbName Metastore database namme
* @return list of table names
*/
public List<String> getMaterializedViewsForRewriting(String dbName) throws MetaException {
String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
+ " FROM " + TBLS + " "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " WHERE " + DBS + ".\"NAME\" = ? AND " + TBLS + ".\"TBL_TYPE\" = ? " ;
List<String> pms = new ArrayList<String>();
pms.add(dbName);
pms.add(TableType.MATERIALIZED_VIEW.toString());
Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<String> mvs = executeWithArray(
queryParams, pms.toArray(), queryText);
List<String> results = new ArrayList<String>(mvs);
queryParams.closeAll();
return results;
}
/**
* Get partition names by using direct SQL queries.
* @param filter filter to use with direct sql
* @param partitionKeys partition columns
* @param defaultPartName default partition name
* @param order the specification for ordering partition names
* @param max maximum number of partition names to return
* @return list of partition names
*/
public List<String> getPartitionNamesViaSql(SqlFilterForPushdown filter, List<FieldSchema> partitionKeys,
String defaultPartName, String order, Integer max) throws MetaException {
boolean doTrace = LOG.isDebugEnabled();
List<Object[]> orderSpecs = MetaStoreUtils.makeOrderSpecs(order);
String catName = filter.catName.toLowerCase(), dbName = filter.dbName.toLowerCase(),
tblName = filter.tableName.toLowerCase(), sqlFilter = filter.filter;
List<Object> paramsForFilter = filter.params;
List<String> joins = filter.joins;
if (joins.isEmpty()) {
for (int i = 0; i < partitionKeys.size(); i++) {
joins.add(null);
}
}
StringBuilder orderColumns = new StringBuilder(), orderClause = new StringBuilder();
int i = 0;
List<Object> paramsForOrder = new ArrayList<Object>();
boolean dbHasJoinCastBug = dbType.hasJoinOperationOrderBug();
for (Object[] orderSpec: orderSpecs) {
int partColIndex = (int)orderSpec[0];
String orderAlias = "ODR" + (i++);
String tableValue, tableAlias;
if (joins.get(partColIndex) == null) {
tableAlias = "ORDER" + partColIndex;
joins.set(partColIndex, "inner join " + PARTITION_KEY_VALS + " \"" + tableAlias
+ "\" on \"" + tableAlias + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " and \"" + tableAlias + "\".\"INTEGER_IDX\" = " + partColIndex);
tableValue = " \"" + tableAlias + "\".\"PART_KEY_VAL\" ";
} else {
tableAlias = "FILTER" + partColIndex;
tableValue = " \"" + tableAlias + "\".\"PART_KEY_VAL\" ";
}
String tableColumn = tableValue;
String colType = partitionKeys.get(partColIndex).getType();
PartitionFilterGenerator.FilterType type =
PartitionFilterGenerator.FilterType.fromType(colType);
if (type == PartitionFilterGenerator.FilterType.Date) {
tableValue = dbType.toDate(tableValue);
} else if (type == PartitionFilterGenerator.FilterType.Integral) {
tableValue = "CAST(" + tableColumn + " AS decimal(21,0))";
}
String tableValue0 = tableValue;
tableValue = " (case when " + tableColumn + " <> ?";
paramsForOrder.add(defaultPartName);
if (dbHasJoinCastBug) {
tableValue += (" and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and "
+ DBS + ".\"CTLG_NAME\" = ? and "
+ "\"" + tableAlias + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\" and "
+ "\"" + tableAlias + "\".\"INTEGER_IDX\" = " + partColIndex);
paramsForOrder.add(tblName);
paramsForOrder.add(dbName);
paramsForOrder.add(catName);
}
tableValue += " then " + tableValue0 + " else null end) AS \"" + orderAlias + "\" ";
orderColumns.append(tableValue).append(",");
orderClause.append(" \"").append(orderAlias).append("\" ")
.append((String)orderSpec[1]).append(",");
}
for (int j = 0; j < joins.size(); j++) {
if (joins.get(j) == null) {
joins.remove(j--);
}
}
if (orderClause.length() > 0) {
orderClause.setLength(orderClause.length() - 1);
orderColumns.setLength(orderColumns.length() - 1);
}
String orderCls = " order by " +
(orderClause.length() > 0 ? orderClause.toString() : "\"PART_NAME\" asc");
String columns = orderColumns.length() > 0 ? (", " + orderColumns.toString()) : "";
String queryText =
"select " + PARTITIONS + ".\"PART_NAME\"" + columns + " from " + PARTITIONS + " "
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
+ " and " + TBLS + ".\"TBL_NAME\" = ? "
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " and " + DBS + ".\"NAME\" = ? "
+ join(joins, ' ')
+ " where " + DBS + ".\"CTLG_NAME\" = ? "
+ (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + orderCls;
Object[] params = new Object[paramsForFilter.size() + paramsForOrder.size() + 3];
i = 0;
for (; i < paramsForOrder.size(); i++) {
params[i] = paramsForOrder.get(i);
}
params[i] = tblName;
params[i+1] = dbName;
params[i+2] = catName;
for (int j = 0; j < paramsForFilter.size(); j++) {
params[i + j + 3] = paramsForFilter.get(j);
}
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
List<String> partNames = new LinkedList<String>();
int limit = (max == null ? -1 : max);
try {
long start = doTrace ? System.nanoTime() : 0;
List<Object> sqlResult = executeWithArray(query, params, queryText, limit);
long queryTime = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
for (Object result : sqlResult) {
Object obj = !columns.isEmpty() ? ((Object[]) result)[0] : result;
partNames.add((String)obj);
}
} finally {
query.closeAll();
}
return partNames;
}
/**
* Gets partitions by using direct SQL queries.
* @param catName Metastore catalog name.
* @param dbName Metastore db name.
* @param tblName Metastore table name.
* @param partNames Partition names to get.
* @return List of partitions.
*/
public List<Partition> getPartitionsViaSqlFilter(final String catName, final String dbName,
final String tblName, List<String> partNames)
throws MetaException {
if (partNames.isEmpty()) {
return Collections.emptyList();
}
return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() {
@Override
public List<Partition> run(List<String> input) throws MetaException {
String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
filter, input, Collections.<String>emptyList(), null);
if (partitionIds.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds, Collections.emptyList());
}
});
}
/**
* Gets partitions by using direct SQL queries.
* @param filter The filter.
* @param max The maximum number of partitions to return.
* @param isAcidTable True if the table is ACID
* @return List of partitions.
*/
public List<Partition> getPartitionsViaSqlFilter(String catName, String dbName, String tableName,
SqlFilterForPushdown filter, Integer max, boolean isAcidTable) throws MetaException {
List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName,
dbName, tableName, filter.filter, filter.params,
filter.joins, max);
if (partitionIds.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return getPartitionsFromPartitionIds(catName, dbName,
tableName, null, input, Collections.emptyList(), isAcidTable);
}
});
}
/**
* This method can be used to return "partially-filled" partitions when clients are only interested in
* some fields of the Partition objects. The partitionFields parameter is a list of dot separated
* partition field names. For example, if a client is interested in only partition location,
* serializationLib, values and parameters it can specify sd.location, sd.serdeInfo.serializationLib,
* values, parameters in the partitionFields list. In such a case all the returned partitions will have
* only the requested fields set and the rest of the fields will remain unset. The implementation of this method
* runs queries only for the fields which are requested and pushes down the projection to the database to improve
* performance.
*
* @param tbl Table whose partitions are being requested
* @param partitionFields List of dot separated field names. Each dot separated string represents nested levels. For
* instance sd.serdeInfo.serializationLib represents the serializationLib field of the StorageDescriptor
* for a the partition
* @param includeParamKeyPattern The SQL regex pattern which is used to include the parameter keys. Can include _ or %
* When this pattern is set, only the partition parameter key-value pairs where the key matches
* the pattern will be returned. This is applied in conjunction with excludeParamKeyPattern if it is set.
* @param excludeParamKeyPattern The SQL regex paterrn which is used to exclude the parameter keys. Can include _ or %
* When this pattern is set, all the partition parameters where key is NOT LIKE the pattern
* are returned. This is applied in conjunction with the includeParamKeyPattern if it is set.
* @param filterSpec The filterSpec from <code>GetPartitionsRequest</code> which includes the filter mode (BY_EXPR, BY_VALUES or BY_NAMES)
* and the list of filter strings to be used to filter the results
* @param filter SqlFilterForPushDown which is set in the <code>canUseDirectSql</code> method before this method is called.
* The filter is used only when the mode is BY_EXPR
* @return
* @throws MetaException
*/
public List<Partition> getPartitionsUsingProjectionAndFilterSpec(Table tbl,
final List<String> partitionFields, final String includeParamKeyPattern,
final String excludeParamKeyPattern, GetPartitionsFilterSpec filterSpec, SqlFilterForPushdown filter)
throws MetaException {
final String tblName = tbl.getTableName();
final String dbName = tbl.getDbName();
final String catName = tbl.getCatName();
List<Long> partitionIds = null;
if (filterSpec.isSetFilterMode()) {
List<String> filters = filterSpec.getFilters();
if (filters == null || filters.isEmpty()) {
throw new MetaException("Invalid filter expressions in the filter spec");
}
switch(filterSpec.getFilterMode()) {
case BY_EXPR:
partitionIds =
getPartitionIdsViaSqlFilter(catName, dbName, tblName, filter.filter, filter.params,
filter.joins, null);
break;
case BY_NAMES:
String partNamesFilter =
"" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(filterSpec.getFilters().size())
+ ")";
partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName, partNamesFilter,
filterSpec.getFilters(), Collections.EMPTY_LIST, null);
break;
case BY_VALUES:
// we are going to use the SQL regex pattern in the LIKE clause below. So the default string
// is _% and not .*
String partNameMatcher = MetaStoreUtils.makePartNameMatcher(tbl, filters, "_%");
String partNamesLikeFilter =
"" + PARTITIONS + ".\"PART_NAME\" LIKE (?)";
partitionIds =
getPartitionIdsViaSqlFilter(catName, dbName, tblName, partNamesLikeFilter, Arrays.asList(partNameMatcher),
Collections.EMPTY_LIST, null);
break;
default:
throw new MetaException("Unsupported filter mode " + filterSpec.getFilterMode());
}
} else {
// there is no filter mode. Fetch all the partition ids
partitionIds =
getPartitionIdsViaSqlFilter(catName, dbName, tblName, null, Collections.EMPTY_LIST,
Collections.EMPTY_LIST, null);
}
if (partitionIds.isEmpty()) {
return Collections.emptyList();
}
// check if table object has table type as view
Boolean isView = isViewTable(tbl);
if (isView == null) {
isView = isViewTable(catName, dbName, tblName);
}
PartitionProjectionEvaluator projectionEvaluator =
new PartitionProjectionEvaluator(pm, fieldnameToTableName, partitionFields,
convertMapNullsToEmptyStrings, isView, includeParamKeyPattern, excludeParamKeyPattern);
// Get full objects. For Oracle/etc. do it in batches.
return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return projectionEvaluator.getPartitionsUsingProjectionList(input);
}
});
}
public static class SqlFilterForPushdown {
private final List<Object> params = new ArrayList<>();
private final List<String> joins = new ArrayList<>();
private String filter;
private String catName;
private String dbName;
private String tableName;
// whether should compact null elements in joins when generating sql filter.
private boolean compactJoins = true;
SqlFilterForPushdown() {
}
SqlFilterForPushdown(Table table, boolean compactJoins) {
this.catName = table.getCatName();
this.dbName = table.getDbName();
this.tableName = table.getTableName();
this.compactJoins = compactJoins;
}
}
public boolean generateSqlFilterForPushdown(String catName, String dbName, String tableName,
List<FieldSchema> partitionKeys, ExpressionTree tree, String defaultPartitionName,
SqlFilterForPushdown result) throws MetaException {
// Derby and Oracle do not interpret filters ANSI-properly in some cases and need a workaround.
assert partitionKeys != null;
boolean dbHasJoinCastBug = dbType.hasJoinOperationOrderBug();
result.tableName = tableName;
result.dbName = dbName;
result.catName = catName;
result.filter = PartitionFilterGenerator.generateSqlFilter(catName, dbName, tableName,
partitionKeys, tree, result.params, result.joins, dbHasJoinCastBug,
((defaultPartitionName == null) ? defaultPartName : defaultPartitionName),
dbType, schema, result.compactJoins);
return result.filter != null;
}
/**
* Gets all partitions of a table by using direct SQL queries.
* @param catName Metastore catalog name.
* @param dbName Metastore db name.
* @param tblName Metastore table name.
* @param max The maximum number of partitions to return.
* @return List of partitions.
*/
public List<Partition> getPartitions(String catName,
String dbName, String tblName, Integer max) throws MetaException {
List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName,
tblName, null, Collections.<String>emptyList(), Collections.<String>emptyList(), max);
if (partitionIds.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
// Get full objects. For Oracle/etc. do it in batches.
List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input, Collections.emptyList());
}
});
return result;
}
private static Boolean isViewTable(Table t) {
return t.isSetTableType() ?
t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null;
}
private boolean isViewTable(String catName, String dbName, String tblName) throws MetaException {
Query query = null;
try {
String queryText = "select \"TBL_TYPE\" from " + TBLS + "" +
" inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " +
" where " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
Object[] params = new Object[] { tblName, dbName, catName };
query = pm.newQuery("javax.jdo.query.SQL", queryText);
query.setUnique(true);
Object result = executeWithArray(query, params, queryText);
return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString());
} finally {
if (query != null) {
query.closeAll();
}
}
}
/**
* Get partition ids for the query using direct SQL queries, to avoid bazillion
* queries created by DN retrieving stuff for each object individually.
* @param catName MetaStore catalog name
* @param dbName MetaStore db name
* @param tblName MetaStore table name
* @param sqlFilter SQL filter to use. Better be SQL92-compliant.
* @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
* @param joinsForFilter if the filter needs additional join statement, they must be in
* this list. Better be SQL92-compliant.
* @param max The maximum number of partitions to return.
* @return List of partition objects.
*/
private List<Long> getPartitionIdsViaSqlFilter(
String catName, String dbName, String tblName, String sqlFilter,
List<? extends Object> paramsForFilter, List<String> joinsForFilter, Integer max)
throws MetaException {
boolean doTrace = LOG.isDebugEnabled();
final String dbNameLcase = dbName.toLowerCase();
final String tblNameLcase = tblName.toLowerCase();
final String catNameLcase = normalizeSpace(catName).toLowerCase();
// We have to be mindful of order during filtering if we are not returning all partitions.
String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : "";
String queryText =
"select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + ""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
+ " and " + TBLS + ".\"TBL_NAME\" = ? "
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " and " + DBS + ".\"NAME\" = ? "
+ join(joinsForFilter, ' ')
+ " where " + DBS + ".\"CTLG_NAME\" = ? "
+ (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + orderForFilter;
Object[] params = new Object[paramsForFilter.size() + 3];
params[0] = tblNameLcase;
params[1] = dbNameLcase;
params[2] = catNameLcase;
for (int i = 0; i < paramsForFilter.size(); ++i) {
params[i + 3] = paramsForFilter.get(i);
}
long start = doTrace ? System.nanoTime() : 0;
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object> sqlResult = executeWithArray(query, params, queryText, ((max == null) ? -1 : max.intValue()));
long queryTime = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
if (sqlResult.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
List<Long> result = new ArrayList<>(sqlResult.size());
for (Object fields : sqlResult) {
result.add(MetastoreDirectSqlUtils.extractSqlLong(fields));
}
query.closeAll();
return result;
}
/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName,
Boolean isView, List<Long> partIdList, List<String> projectionFields) throws MetaException {
return getPartitionsFromPartitionIds(catName, dbName, tblName, isView, partIdList, projectionFields, false);
}
/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName,
Boolean isView, List<Long> partIdList, List<String> projectionFields,
boolean isAcidTable) throws MetaException {
boolean doTrace = LOG.isDebugEnabled();
int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma
int sbCapacity = partIdList.size() * idStringWidth;
// Get most of the fields for the IDs provided.
// Assume db and table names are the same for all partition, as provided in arguments.
String partIds = getIdListForIn(partIdList);
String queryText =
"select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\"," + " "
+ SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\"," + " " + PARTITIONS
+ ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\","
+ " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS
+ ".\"NUM_BUCKETS\"," + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", "
+ SERDES + ".\"SLIB\", " + PARTITIONS + ".\"WRITE_ID\"" + " from " + PARTITIONS + ""
+ " left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS
+ ".\"SD_ID\" " + " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = "
+ SERDES + ".\"SERDE_ID\" " + "where \"PART_ID\" in (" + partIds
+ ") order by \"PART_NAME\" asc";
long start = doTrace ? System.nanoTime() : 0;
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = executeWithArray(query, null, queryText);
long queryTime = doTrace ? System.nanoTime() : 0;
Deadline.checkTimeout();
// Read all the fields and create partitions, SDs and serdes.
TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>();
TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>();
TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>();
TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>();
// Keep order by name, consistent with JDO.
ArrayList<Partition> orderedResult = new ArrayList<Partition>(partIdList.size());
// Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries.
StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity);
StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema.
tblName = tblName.toLowerCase();
dbName = dbName.toLowerCase();
catName = normalizeSpace(catName).toLowerCase();
partitions.navigableKeySet();
for (Object[] fields : sqlResult) {
// Here comes the ugly part...
long partitionId = MetastoreDirectSqlUtils.extractSqlLong(fields[0]);
Long sdId = MetastoreDirectSqlUtils.extractSqlLong(fields[1]);
Long colId = MetastoreDirectSqlUtils.extractSqlLong(fields[2]);
Long serdeId = MetastoreDirectSqlUtils.extractSqlLong(fields[3]);
Partition part = new Partition();
orderedResult.add(part);
// Set the collection fields; some code might not check presence before accessing them.
part.setParameters(new HashMap<>());
part.setValues(new ArrayList<String>());
part.setCatName(catName);
part.setDbName(dbName);
part.setTableName(tblName);
if (fields[4] != null) {
part.setCreateTime(MetastoreDirectSqlUtils.extractSqlInt(fields[4]));
}
if (fields[5] != null) {
part.setLastAccessTime(MetastoreDirectSqlUtils.extractSqlInt(fields[5]));
}
Long writeId = MetastoreDirectSqlUtils.extractSqlLong(fields[14]);
if (writeId != null && writeId>0) {
part.setWriteId(writeId);
} else {
part.setWriteId(-1L);
}
partitions.put(partitionId, part);
if (sdId == null) {
continue; // Probably a view.
}
assert serdeId != null;
// We assume each partition has an unique SD.
StorageDescriptor sd = new StorageDescriptor();
StorageDescriptor oldSd = sds.put(sdId, sd);
if (oldSd != null) {
throw new MetaException("Partitions reuse SDs; we don't expect that");
}
// Set the collection fields; some code might not check presence before accessing them.
sd.setSortCols(new ArrayList<Order>());
sd.setBucketCols(new ArrayList<String>());
sd.setParameters(new HashMap<String, String>());
sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
new ArrayList<List<String>>(), new HashMap<List<String>, String>()));
sd.setInputFormat((String)fields[6]);
Boolean tmpBoolean = MetastoreDirectSqlUtils.extractSqlBoolean(fields[7]);
if (tmpBoolean != null) {
sd.setCompressed(tmpBoolean);
}
tmpBoolean = MetastoreDirectSqlUtils.extractSqlBoolean(fields[8]);
if (tmpBoolean != null) {
sd.setStoredAsSubDirectories(tmpBoolean);
}
sd.setLocation((String)fields[9]);
if (fields[10] != null) {
sd.setNumBuckets(MetastoreDirectSqlUtils.extractSqlInt(fields[10]));
}
sd.setOutputFormat((String)fields[11]);
sdSb.append(sdId).append(",");
part.setSd(sd);
if (colId != null) {
List<FieldSchema> cols = colss.get(colId);
// We expect that colId will be the same for all (or many) SDs.
if (cols == null) {
cols = new ArrayList<FieldSchema>();
colss.put(colId, cols);
colsSb.append(colId).append(",");
}
sd.setCols(cols);
}
// We assume each SD has an unique serde.
SerDeInfo serde = new SerDeInfo();
SerDeInfo oldSerde = serdes.put(serdeId, serde);
if (oldSerde != null) {
throw new MetaException("SDs reuse serdes; we don't expect that");
}
serde.setParameters(new HashMap<String, String>());
serde.setName((String)fields[12]);
serde.setSerializationLib((String)fields[13]);
serdeSb.append(serdeId).append(",");
sd.setSerdeInfo(serde);
Deadline.checkTimeout();
}
query.closeAll();
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
// Now get all the one-to-many things. Start with partitions.
MetastoreDirectSqlUtils
.setPartitionParameters(PARTITION_PARAMS, convertMapNullsToEmptyStrings, pm, partIds, partitions);
MetastoreDirectSqlUtils.setPartitionValues(PARTITION_KEY_VALS, pm, partIds, partitions);
// Prepare IN (blah) lists for the following queries. Cut off the final ','s.
if (sdSb.length() == 0) {
assert serdeSb.length() == 0 && colsSb.length() == 0;
return orderedResult; // No SDs, probably a view.
}
String sdIds = trimCommaList(sdSb);
String serdeIds = trimCommaList(serdeSb);
String colIds = trimCommaList(colsSb);
if (!isAcidTable) {
// Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
MetastoreDirectSqlUtils.setSDParameters(SD_PARAMS, convertMapNullsToEmptyStrings, pm, sds, sdIds);
}
boolean hasSkewedColumns = false;
if (!isAcidTable) {
MetastoreDirectSqlUtils.setSDSortCols(SORT_COLS, pm, sds, sdIds);
}
MetastoreDirectSqlUtils.setSDBucketCols(BUCKETING_COLS, pm, sds, sdIds);
if (!isAcidTable) {
// Skewed columns stuff.
hasSkewedColumns = MetastoreDirectSqlUtils.setSkewedColNames(SKEWED_COL_NAMES, pm, sds, sdIds);
}
// Assume we don't need to fetch the rest of the skewed column data if we have no columns.
if (hasSkewedColumns) {
// We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
MetastoreDirectSqlUtils
.setSkewedColValues(SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, pm, sds, sdIds);
// We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
MetastoreDirectSqlUtils
.setSkewedColLocationMaps(SKEWED_COL_VALUE_LOC_MAP, SKEWED_STRING_LIST_VALUES, pm, sds, sdIds);
} // if (hasSkewedColumns)
// Get FieldSchema stuff if any.
if (!colss.isEmpty()) {
// We are skipping the CDS table here, as it seems to be totally useless.
MetastoreDirectSqlUtils.setSDCols(COLUMNS_V2, pm, colss, colIds);
}
// Finally, get all the stuff for serdes - just the params.
if (!isAcidTable) {
MetastoreDirectSqlUtils.setSerdeParams(SERDE_PARAMS, convertMapNullsToEmptyStrings, pm, serdes, serdeIds);
}
return orderedResult;
}
public int getNumPartitionsViaSqlFilter(SqlFilterForPushdown filter) throws MetaException {
boolean doTrace = LOG.isDebugEnabled();
String catName = filter.catName.toLowerCase();
String dbName = filter.dbName.toLowerCase();
String tblName = filter.tableName.toLowerCase();
// Get number of partitions by doing count on PART_ID.
String queryText = "select count(" + PARTITIONS + ".\"PART_ID\") from " + PARTITIONS + ""
+ " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
+ " and " + TBLS + ".\"TBL_NAME\" = ? "
+ " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " and " + DBS + ".\"NAME\" = ? "
+ join(filter.joins, ' ')
+ " where " + DBS + ".\"CTLG_NAME\" = ? "
+ (filter.filter == null || filter.filter.trim().isEmpty() ? "" : (" and " + filter.filter));
Object[] params = new Object[filter.params.size() + 3];
params[0] = tblName;
params[1] = dbName;
params[2] = catName;
for (int i = 0; i < filter.params.size(); ++i) {
params[i + 3] = filter.params.get(i);
}
long start = doTrace ? System.nanoTime() : 0;
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
query.setUnique(true);
int sqlResult = MetastoreDirectSqlUtils.extractSqlInt(query.executeWithArray(params));
long queryTime = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
query.closeAll();
return sqlResult;
}
private static String trimCommaList(StringBuilder sb) {
if (sb.length() > 0) {
sb.setLength(sb.length() - 1);
}
return sb.toString();
}
private static class PartitionFilterGenerator extends TreeVisitor {
private final String catName;
private final String dbName;
private final String tableName;
private final List<FieldSchema> partitionKeys;
private final FilterBuilder filterBuffer;
private final List<Object> params;
private final List<String> joins;
private final boolean dbHasJoinCastBug;
private final String defaultPartName;
private final DatabaseProduct dbType;
private final String PARTITION_KEY_VALS, PARTITIONS, DBS, TBLS;
private PartitionFilterGenerator(String catName, String dbName, String tableName,
List<FieldSchema> partitionKeys, List<Object> params, List<String> joins,
boolean dbHasJoinCastBug, String defaultPartName, DatabaseProduct dbType, String schema) {
this.catName = catName;
this.dbName = dbName;
this.tableName = tableName;
this.partitionKeys = partitionKeys;
this.params = params;
this.joins = joins;
this.dbHasJoinCastBug = dbHasJoinCastBug;
this.filterBuffer = new FilterBuilder(false);
this.defaultPartName = defaultPartName;
this.dbType = dbType;
this.PARTITION_KEY_VALS = getFullyQualifiedName(schema, "PARTITION_KEY_VALS");
this.PARTITIONS = getFullyQualifiedName(schema, "PARTITIONS");
this.DBS = getFullyQualifiedName(schema, "DBS");
this.TBLS = getFullyQualifiedName(schema, "TBLS");
}
/**
* Generate the ANSI SQL92 filter for the given expression tree
* @param catName catalog name
* @param dbName db name
* @param tableName table name
* @param partitionKeys partition keys
* @param params the ordered parameters for the resulting expression
* @param joins the joins necessary for the resulting expression
* @return the string representation of the expression tree
*/
private static String generateSqlFilter(String catName, String dbName, String tableName,
List<FieldSchema> partitionKeys, ExpressionTree tree, List<Object> params,
List<String> joins, boolean dbHasJoinCastBug, String defaultPartName,
DatabaseProduct dbType, String schema, boolean compactJoins) throws MetaException {
if (tree == null) {
// consistent with other APIs like makeExpressionTree, null is returned to indicate that
// the filter could not pushed down due to parsing issue etc
return null;
}
if (tree.getRoot() == null) {
return "";
}
PartitionFilterGenerator visitor = new PartitionFilterGenerator(
catName, dbName, tableName, partitionKeys,
params, joins, dbHasJoinCastBug, defaultPartName, dbType, schema);
tree.accept(visitor);
if (visitor.filterBuffer.hasError()) {
LOG.info("Unable to push down SQL filter: " + visitor.filterBuffer.getErrorMessage());
return null;
}
// Some joins might be null (see processNode for LeafNode), clean them up.
if (compactJoins) {
for (int i = 0; i < joins.size(); ++i) {
if (joins.get(i) != null) {
continue;
}
joins.remove(i--);
}
}
return "(" + visitor.filterBuffer.getFilter() + ")";
}
@Override
protected void beginTreeNode(TreeNode node) throws MetaException {
filterBuffer.append(" (");
}
@Override
protected void midTreeNode(TreeNode node) throws MetaException {
filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or ");
}
@Override
protected void endTreeNode(TreeNode node) throws MetaException {
filterBuffer.append(") ");
}
@Override
protected boolean shouldStop() {
return filterBuffer.hasError();
}
private static enum FilterType {
Integral,
String,
Date,
Invalid;
static FilterType fromType(String colTypeStr) {
if (colTypeStr.equals(ColumnType.STRING_TYPE_NAME)) {
return FilterType.String;
} else if (colTypeStr.equals(ColumnType.DATE_TYPE_NAME)) {
return FilterType.Date;
} else if (ColumnType.IntegralTypes.contains(colTypeStr)) {
return FilterType.Integral;
}
return FilterType.Invalid;
}
public static FilterType fromClass(Object value) {
if (value instanceof String) {
return FilterType.String;
} else if (value instanceof Long) {
return FilterType.Integral;
} else if (value instanceof java.sql.Date) {
return FilterType.Date;
}
return FilterType.Invalid;
}
}
@Override
public void visit(LeafNode node) throws MetaException {
int partColCount = partitionKeys.size();
int partColIndex = node.getPartColIndexForFilter(partitionKeys, filterBuffer);
if (filterBuffer.hasError()) {
return;
}
String colTypeStr = partitionKeys.get(partColIndex).getType();
FilterType colType = FilterType.fromType(colTypeStr);
if (colType == FilterType.Invalid) {
filterBuffer.setError("Filter pushdown not supported for type " + colTypeStr);
return;
}
FilterType valType = FilterType.fromClass(node.value);
Object nodeValue = node.value;
if (valType == FilterType.Invalid) {
filterBuffer.setError("Filter pushdown not supported for value " + node.value.getClass());
return;
}
// if Filter.g does date parsing for quoted strings, we'd need to verify there's no
// type mismatch when string col is filtered by a string that looks like date.
if (colType == FilterType.Date && valType == FilterType.String) {
// Filter.g cannot parse a quoted date; try to parse date here too.
try {
nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().parse((String)nodeValue);
valType = FilterType.Date;
} catch (ParseException pe) { // do nothing, handled below - types will mismatch
}
}
// We format it so we are sure we are getting the right value
if (valType == FilterType.Date) {
// Format
nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().format(nodeValue);
}
boolean isDefaultPartition = (valType == FilterType.String) && defaultPartName.equals(nodeValue);
if ((colType != valType) && (!isDefaultPartition)) {
// It's not clear how filtering for e.g. "stringCol > 5" should work (which side is
// to be coerced?). Let the expression evaluation sort this one out, not metastore.
filterBuffer.setError("Cannot push down filter for "
+ colTypeStr + " column and value " + nodeValue.getClass());
return;
}
if (joins.isEmpty()) {
// There's a fixed number of partition cols that we might have filters on. To avoid
// joining multiple times for one column (if there are several filters on it), we will
// keep numCols elements in the list, one for each column; we will fill it with nulls,
// put each join at a corresponding index when necessary, and remove nulls in the end.
for (int i = 0; i < partColCount; ++i) {
joins.add(null);
}
}
if (joins.get(partColIndex) == null) {
joins.set(partColIndex, "inner join " + PARTITION_KEY_VALS + " \"FILTER" + partColIndex
+ "\" on \"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
+ " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
}
// Build the filter and add parameters linearly; we are traversing leaf nodes LTR.
String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\"";
String nodeValue0 = "?";
if (node.isReverseOrder) {
params.add(nodeValue);
}
String tableColumn = tableValue;
if ((colType != FilterType.String) && (!isDefaultPartition)) {
// The underlying database field is varchar, we need to compare numbers.
if (colType == FilterType.Integral) {
tableValue = "cast(" + tableValue + " as decimal(21,0))";
} else if (colType == FilterType.Date) {
tableValue = dbType.toDate(tableValue);
}
// Workaround for HIVE_DEFAULT_PARTITION - ignore it like JDO does, for now.
String tableValue0 = tableValue;
tableValue = "(case when " + tableColumn + " <> ?";
params.add(defaultPartName);
if (dbHasJoinCastBug) {
// This is a workaround for DERBY-6358 and Oracle bug; it is pretty horrible.
tableValue += (" and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and "
+ DBS + ".\"CTLG_NAME\" = ? and "
+ "\"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\" and "
+ "\"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
params.add(tableName.toLowerCase());
params.add(dbName.toLowerCase());
params.add(catName.toLowerCase());
}
tableValue += " then " + tableValue0 + " else null end)";
if (valType == FilterType.Date) {
tableValue = dbType.toDate(tableValue);
}
}
if (!node.isReverseOrder) {
params.add(nodeValue);
}
// The following syntax is required for using LIKE clause wildcards '_' and '%' as literals.
if (node.operator == Operator.LIKE) {
nodeValue0 = nodeValue0 + " ESCAPE '\\' ";
}
filterBuffer.append(node.isReverseOrder
? "(" + nodeValue0 + " " + node.operator.getSqlOp() + " " + tableValue + ")"
: "(" + tableValue + " " + node.operator.getSqlOp() + " " + nodeValue0 + ")");
}
}
/**
* Retrieve the column statistics for the specified columns of the table. NULL
* is returned if the columns are not provided.
* @param catName the catalog name of the table
* @param dbName the database name of the table
* @param tableName the table name
* @param colNames the list of the column names
* @param engine engine making the request
* @return the column statistics for the specified columns
* @throws MetaException
*/
public ColumnStatistics getTableStats(final String catName, final String dbName,
final String tableName, List<String> colNames, String engine,
boolean enableBitVector) throws MetaException {
if (colNames == null || colNames.isEmpty()) {
return null;
}
final boolean doTrace = LOG.isDebugEnabled();
final String queryText0 = "select " + getStatsList(enableBitVector) + " from " + TAB_COL_STATS
+ " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+ " and \"ENGINE\" = ? and \"COLUMN_NAME\" in (";
Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
@Override
public List<Object[]> run(List<String> input) throws MetaException {
String queryText = queryText0 + makeParams(input.size()) + ")";
Object[] params = new Object[input.size() + 4];
params[0] = catName;
params[1] = dbName;
params[2] = tableName;
params[3] = engine;
for (int i = 0; i < input.size(); ++i) {
params[i + 4] = input.get(i);
}
long start = doTrace ? System.nanoTime() : 0;
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
Object qResult = executeWithArray(query, params, queryText);
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0));
if (qResult == null) {
query.closeAll();
return null;
}
addQueryAfterUse(query);
return MetastoreDirectSqlUtils.ensureList(qResult);
}
};
List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
if (list.isEmpty()) {
return null;
}
ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
csd.setCatName(catName);
ColumnStatistics result = makeColumnStats(list, csd, 0, engine);
b.closeAllQueries();
return result;
}
public List<HiveObjectPrivilege> getTableAllColumnGrants(String catName, String dbName,
String tableName, String authorizer) throws MetaException {
Query query = null;
// These constants should match the SELECT clause of the query.
final int authorizerIndex = 0;
final int columnNameIndex = 1;
final int createTimeIndex = 2;
final int grantOptionIndex = 3;
final int grantorIndex = 4;
final int grantorTypeIndex = 5;
final int principalNameIndex = 6;
final int principalTypeIndex = 7;
final int privilegeIndex = 8;
// Retrieve the privileges from the object store. Just grab only the required fields.
String queryText = "select " +
TBL_COL_PRIVS + ".\"AUTHORIZER\", " +
TBL_COL_PRIVS + ".\"COLUMN_NAME\", " +
TBL_COL_PRIVS + ".\"CREATE_TIME\", " +
TBL_COL_PRIVS + ".\"GRANT_OPTION\", " +
TBL_COL_PRIVS + ".\"GRANTOR\", " +
TBL_COL_PRIVS + ".\"GRANTOR_TYPE\", " +
TBL_COL_PRIVS + ".\"PRINCIPAL_NAME\", " +
TBL_COL_PRIVS + ".\"PRINCIPAL_TYPE\", " +
TBL_COL_PRIVS + ".\"TBL_COL_PRIV\" " +
"FROM " + TBL_COL_PRIVS + " JOIN " + TBLS +
" ON " + TBL_COL_PRIVS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" +
" JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " +
" WHERE " + TBLS + ".\"TBL_NAME\" = ?" +
" AND " + DBS + ".\"NAME\" = ?" +
" AND " + DBS + ".\"CTLG_NAME\" = ?";
// Build the parameters, they should match the WHERE clause of the query.
int numParams = authorizer != null ? 4 : 3;
Object[] params = new Object[numParams];
params[0] = tableName;
params[1] = dbName;
params[2] = catName;
if (authorizer != null) {
queryText = queryText + " AND " + TBL_COL_PRIVS + ".\"AUTHORIZER\" = ?";
params[3] = authorizer;
}
// Collect the results into a list that the caller can consume.
List<HiveObjectPrivilege> result = new ArrayList<>();
final boolean doTrace = LOG.isDebugEnabled();
long start = doTrace ? System.nanoTime() : 0;
query = pm.newQuery("javax.jdo.query.SQL", queryText);
try {
List<Object[]> queryResult = MetastoreDirectSqlUtils.ensureList(
executeWithArray(query, params, queryText));
long end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
// If there is some result convert it into HivePrivilege bag and return.
for (Object[] privLine : queryResult) {
String privAuthorizer = MetastoreDirectSqlUtils.extractSqlString(privLine[authorizerIndex]);
String principalName = MetastoreDirectSqlUtils.extractSqlString(privLine[principalNameIndex]);
PrincipalType ptype = PrincipalType.valueOf(
MetastoreDirectSqlUtils.extractSqlString(privLine[principalTypeIndex]));
String columnName = MetastoreDirectSqlUtils.extractSqlString(privLine[columnNameIndex]);
String privilege = MetastoreDirectSqlUtils.extractSqlString(privLine[privilegeIndex]);
int createTime = MetastoreDirectSqlUtils.extractSqlInt(privLine[createTimeIndex]);
String grantor = MetastoreDirectSqlUtils.extractSqlString(privLine[grantorIndex]);
PrincipalType grantorType =
PrincipalType.valueOf(
MetastoreDirectSqlUtils.extractSqlString(privLine[grantorTypeIndex]));
boolean grantOption = MetastoreDirectSqlUtils.extractSqlBoolean(privLine[grantOptionIndex]);
HiveObjectRef objectRef = new HiveObjectRef(HiveObjectType.COLUMN, dbName, tableName, null,
columnName);
objectRef.setCatName(catName);
PrivilegeGrantInfo grantInfo = new PrivilegeGrantInfo(privilege, createTime, grantor,
grantorType, grantOption);
result.add(new HiveObjectPrivilege(objectRef, principalName, ptype, grantInfo,
privAuthorizer));
}
} finally {
query.closeAll();
}
return result;
}
public AggrStats aggrColStatsForPartitions(String catName, String dbName, String tableName,
List<String> partNames, List<String> colNames, String engine,
boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector)
throws MetaException {
if (colNames.isEmpty() || partNames.isEmpty()) {
LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval");
return new AggrStats(Collections.<ColumnStatisticsObj>emptyList(), 0); // Nothing to aggregate
}
long partsFound = 0;
List<ColumnStatisticsObj> colStatsList;
// Try to read from the cache first
if (isAggregateStatsCacheEnabled
&& (partNames.size() < aggrStatsCache.getMaxPartsPerCacheNode())) {
AggrColStats colStatsAggrCached;
List<ColumnStatisticsObj> colStatsAggrFromDB;
int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
double fpp = aggrStatsCache.getFalsePositiveProbability();
colStatsList = new ArrayList<ColumnStatisticsObj>();
// Bloom filter for the new node that we will eventually add to the cache
BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames);
boolean computePartsFound = true;
for (String colName : colNames) {
// Check the cache first
colStatsAggrCached = aggrStatsCache.get(catName, dbName, tableName, colName, partNames);
if (colStatsAggrCached != null) {
colStatsList.add(colStatsAggrCached.getColStats());
partsFound = colStatsAggrCached.getNumPartsCached();
} else {
if (computePartsFound) {
partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames, engine);
computePartsFound = false;
}
List<String> colNamesForDB = new ArrayList<>();
colNamesForDB.add(colName);
// Read aggregated stats for one column
colStatsAggrFromDB =
columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNamesForDB, engine,
partsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
if (!colStatsAggrFromDB.isEmpty()) {
ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
colStatsList.add(colStatsAggr);
// Update the cache to add this new aggregate node
aggrStatsCache.add(catName, dbName, tableName, colName, partsFound, colStatsAggr, bloomFilter);
}
}
}
} else {
partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames, engine);
colStatsList =
columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNames, engine, partsFound,
useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
}
LOG.debug("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation
+ "\npartsFound = " + partsFound + "\nColumnStatisticsObj = "
+ Arrays.toString(colStatsList.toArray()));
return new AggrStats(colStatsList, partsFound);
}
private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, double fpp,
List<String> partNames) {
BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp);
for (String partName : partNames) {
bloomFilter.add(partName.getBytes());
}
return bloomFilter;
}
private long partsFoundForPartitions(
final String catName, final String dbName, final String tableName,
final List<String> partNames, List<String> colNames, String engine) throws MetaException {
assert !colNames.isEmpty() && !partNames.isEmpty();
final boolean doTrace = LOG.isDebugEnabled();
final String queryText0 = "select count(\"COLUMN_NAME\") from " + PART_COL_STATS + ""
+ " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+ " and \"COLUMN_NAME\" in (%1$s) and \"PARTITION_NAME\" in (%2$s)"
+ " and \"ENGINE\" = ? "
+ " group by \"PARTITION_NAME\"";
List<Long> allCounts = Batchable.runBatched(batchSize, colNames, new Batchable<String, Long>() {
@Override
public List<Long> run(final List<String> inputColName) throws MetaException {
return Batchable.runBatched(batchSize, partNames, new Batchable<String, Long>() {
@Override
public List<Long> run(List<String> inputPartNames) throws MetaException {
long partsFound = 0;
String queryText = String.format(queryText0,
makeParams(inputColName.size()), makeParams(inputPartNames.size()));
long start = doTrace ? System.nanoTime() : 0;
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
try {
Object qResult = executeWithArray(query, prepareParams(
catName, dbName, tableName, inputPartNames, inputColName, engine), queryText);
long end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
ForwardQueryResult<?> fqr = (ForwardQueryResult<?>) qResult;
Iterator<?> iter = fqr.iterator();
while (iter.hasNext()) {
if (MetastoreDirectSqlUtils.extractSqlLong(iter.next()) == inputColName.size()) {
partsFound++;
}
}
return Lists.<Long>newArrayList(partsFound);
} finally {
query.closeAll();
}
}
});
}
});
long partsFound = 0;
for (Long val : allCounts) {
partsFound += val;
}
return partsFound;
}
private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
final String catName, final String dbName, final String tableName, final List<String> partNames,
List<String> colNames, String engine, long partsFound, final boolean useDensityFunctionForNDVEstimation,
final double ndvTuner, final boolean enableBitVector) throws MetaException {
final boolean areAllPartsFound = (partsFound == partNames.size());
return Batchable.runBatched(batchSize, colNames, new Batchable<String, ColumnStatisticsObj>() {
@Override
public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException {
return Batchable.runBatched(batchSize, partNames, new Batchable<String, ColumnStatisticsObj>() {
@Override
public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException {
return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, inputPartNames,
inputColNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
}
});
}
});
}
public List<ColStatsObjWithSourceInfo> getColStatsForAllTablePartitions(String catName, String dbName,
boolean enableBitVector) throws MetaException {
String queryText = "select \"TABLE_NAME\", \"PARTITION_NAME\", " + getStatsList(enableBitVector)
+ " from " + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"CAT_NAME\" = ?";
long start = 0;
long end = 0;
Query query = null;
boolean doTrace = LOG.isDebugEnabled();
Object qResult = null;
start = doTrace ? System.nanoTime() : 0;
List<ColStatsObjWithSourceInfo> colStatsForDB = new ArrayList<ColStatsObjWithSourceInfo>();
try {
query = pm.newQuery("javax.jdo.query.SQL", queryText);
qResult = executeWithArray(query, new Object[] { dbName, catName }, queryText);
if (qResult == null) {
query.closeAll();
return colStatsForDB;
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
for (Object[] row : list) {
String tblName = (String) row[0];
String partName = (String) row[1];
ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, catName, dbName, tblName, partName));
Deadline.checkTimeout();
}
} finally {
query.closeAll();
}
return colStatsForDB;
}
/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String catName, String dbName,
String tableName, List<String> partNames, List<String> colNames, String engine,
boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector)
throws MetaException {
if (enableBitVector) {
return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound,
useDensityFunctionForNDVEstimation, ndvTuner);
} else {
return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound,
useDensityFunctionForNDVEstimation, ndvTuner);
}
}
private List<ColumnStatisticsObj> aggrStatsUseJava(String catName, String dbName, String tableName,
List<String> partNames, List<String> colNames, String engine, boolean areAllPartsFound,
boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
// 1. get all the stats for colNames in partNames;
List<ColumnStatistics> partStats =
getPartitionStats(catName, dbName, tableName, partNames, colNames, engine, true);
// 2. use util function to aggr stats
return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName, tableName, partNames, colNames,
areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
}
private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String dbName,
String tableName, List<String> partNames, List<String> colNames, String engine,
boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
// TODO: all the extrapolation logic should be moved out of this class,
// only mechanical data retrieval should remain here.
String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+ "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+ "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+ "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+ "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
// The following data is used to compute a partitioned table's NDV based
// on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
// accurately derived from partition NDVs, because the domain of column value two partitions
// can overlap. If there is no overlap then global NDV is just the sum
// of partition NDVs (UpperBound). But if there is some overlay then
// global NDV can be anywhere between sum of partition NDVs (no overlap)
// and same as one of the partition NDV (domain of column value in all other
// partitions is subset of the domain value in one of the partition)
// (LowerBound).But under uniform distribution, we can roughly estimate the global
// NDV by leveraging the min/max values.
// And, we also guarantee that the estimation makes sense by comparing it to the
// UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
// and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+ "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+ "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+ "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
+ " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? ";
String queryText = null;
long start = 0;
long end = 0;
boolean doTrace = LOG.isDebugEnabled();
ForwardQueryResult<?> fqr = null;
// Check if the status of all the columns of all the partitions exists
// Extrapolation is not needed.
if (areAllPartsFound) {
queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
+ " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
start = doTrace ? System.nanoTime() : 0;
try (Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
Object qResult = executeWithArray(query,
prepareParams(catName, dbName, tableName, partNames, colNames,
engine), queryText);
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
List<ColumnStatisticsObj> colStats =
new ArrayList<ColumnStatisticsObj>(list.size());
for (Object[] row : list) {
colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
return colStats;
} catch (Exception e) {
throwMetaOrRuntimeException(e);
return Collections.emptyList();
}
} else {
// Extrapolation is needed for some columns.
// In this case, at least a column status for a partition is missing.
// We need to extrapolate this partition based on the other partitions
List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") "
+ " from " + PART_COL_STATS
+ " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+ " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
+ " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
start = doTrace ? System.nanoTime() : 0;
List<String> noExtraColumnNames = new ArrayList<String>();
Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>();
try(Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
Object qResult = executeWithArray(query,
prepareParams(catName, dbName, tableName, partNames, colNames,
engine), queryText);
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
List<Object[]> list = MetastoreDirectSqlUtils.ensureList(qResult);
for (Object[] row : list) {
String colName = (String) row[0];
String colType = (String) row[1];
// Extrapolation is not needed for this column if
// count(\"PARTITION_NAME\")==partNames.size()
// Or, extrapolation is not possible for this column if
// count(\"PARTITION_NAME\")<2
Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
if (count == partNames.size() || count < 2) {
noExtraColumnNames.add(colName);
} else {
extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)});
}
Deadline.checkTimeout();
}
} catch (Exception e) {
throwMetaOrRuntimeException(e);
}
// Extrapolation is not needed for columns noExtraColumnNames
List<Object[]> list;
if (noExtraColumnNames.size() != 0) {
queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
+ makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in ("
+ makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
start = doTrace ? System.nanoTime() : 0;
try (Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
Object qResult = executeWithArray(query,
prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText);
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
list = MetastoreDirectSqlUtils.ensureList(qResult);
for (Object[] row : list) {
colStats.add(prepareCSObjWithAdjustedNDV(row, 0,
useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
} catch (Exception e) {
throwMetaOrRuntimeException(e);
}
}
// Extrapolation is needed for extraColumnNames.
// give a sequence number for all the partitions
if (extraColumnNameTypeParts.size() != 0) {
Map<String, Integer> indexMap = new HashMap<String, Integer>();
for (int index = 0; index < partNames.size(); index++) {
indexMap.put(partNames.get(index), index);
}
// get sum for all columns to reduce the number of queries
Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>();
queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
+ " from " + PART_COL_STATS + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
+ " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")"
+ " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\"";
start = doTrace ? System.nanoTime() : 0;
try (Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
List<String> extraColumnNames = new ArrayList<String>();
extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
Object qResult = executeWithArray(query,
prepareParams(catName, dbName, tableName, partNames,
extraColumnNames, engine), queryText);
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
list = MetastoreDirectSqlUtils.ensureList(qResult);
// see the indexes for colstats in IExtrapolatePartStatus
Integer[] sumIndex = new Integer[] {6, 10, 11, 15};
for (Object[] row : list) {
Map<Integer, Object> indexToObject = new HashMap<Integer, Object>();
for (int ind = 1; ind < row.length; ind++) {
indexToObject.put(sumIndex[ind - 1], row[ind]);
}
// row[0] is the column name
sumMap.put((String) row[0], indexToObject);
Deadline.checkTimeout();
}
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
} catch (Exception e) {
throwMetaOrRuntimeException(e);
}
for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) {
Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2];
String colName = entry.getKey();
String colType = entry.getValue()[0];
Long sumVal = Long.parseLong(entry.getValue()[1]);
// fill in colname
row[0] = colName;
// fill in coltype
row[1] = colType;
// use linear extrapolation. more complicated one can be added in the
// future.
IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus();
// fill in colstatus
Integer[] index = null;
boolean decimal = false;
if (colType.toLowerCase().startsWith("decimal")) {
index = IExtrapolatePartStatus.indexMaps.get("decimal");
decimal = true;
} else {
index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
}
// if the colType is not the known type, long, double, etc, then get
// all index.
if (index == null) {
index = IExtrapolatePartStatus.indexMaps.get("default");
}
for (int colStatIndex : index) {
String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex];
// if the aggregation type is sum, we do a scale-up
if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) {
Object o = sumMap.get(colName).get(colStatIndex);
if (o == null) {
row[2 + colStatIndex] = null;
} else {
Long val = MetastoreDirectSqlUtils.extractSqlLong(o);
row[2 + colStatIndex] = val / sumVal * (partNames.size());
}
} else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min
|| IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) {
// if the aggregation type is min/max, we extrapolate from the
// left/right borders
if (!decimal) {
queryText = "select \"" + colStatName
+ "\",\"PARTITION_NAME\" from " + PART_COL_STATS
+ " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?"
+ " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " order by \"" + colStatName + "\"";
} else {
queryText = "select \"" + colStatName
+ "\",\"PARTITION_NAME\" from " + PART_COL_STATS
+ " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?"
+ " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " order by cast(\"" + colStatName + "\" as decimal)";
}
start = doTrace ? System.nanoTime() : 0;
try (Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
Object qResult = executeWithArray(query,
prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText);
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
fqr = (ForwardQueryResult<?>) qResult;
Object[] min = (Object[]) (fqr.get(0));
Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
query.closeAll();
if (min[0] == null || max[0] == null) {
row[2 + colStatIndex] = null;
} else {
row[2 + colStatIndex] = extrapolateMethod
.extrapolate(min, max, colStatIndex, indexMap);
}
} catch (Exception e) {
throwMetaOrRuntimeException(e);
}
} else {
// if the aggregation type is avg, we use the average on the existing ones.
queryText = "select "
+ "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+ "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
+ " from " + PART_COL_STATS + "" + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
+ " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in ("
+ makeParams(partNames.size()) + ")"
+ " and \"ENGINE\" = ? "
+ " group by \"COLUMN_NAME\"";
start = doTrace ? System.nanoTime() : 0;
try(Query query = pm.newQuery("javax.jdo.query.SQL", queryText)) {
Object qResult = executeWithArray(query,
prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText);
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
fqr = (ForwardQueryResult<?>) qResult;
Object[] avg = (Object[]) (fqr.get(0));
// colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
// "AVG_DECIMAL"
row[2 + colStatIndex] = avg[colStatIndex - 12];
end = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end);
query.closeAll();
} catch (Exception e) {
throwMetaOrRuntimeException(e);
}
}
}
colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
}
return colStats;
}
}
private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException {
ColumnStatisticsData data = new ColumnStatisticsData();
ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data);
Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++],
declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], bitVector = row[i++],
avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++];
StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector, avglen, maxlen, trues, falses);
return cso;
}
private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i,
boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
ColumnStatisticsData data = new ColumnStatisticsData();
ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data);
Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], avgDecimal = row[i++], sumDist = row[i++];
StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh,
declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble,
avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner);
return cso;
}
private Object[] prepareParams(String catName, String dbName, String tableName,
List<String> partNames, List<String> colNames, String engine) throws MetaException {
Object[] params = new Object[colNames.size() + partNames.size() + 4];
int paramI = 0;
params[paramI++] = catName;
params[paramI++] = dbName;
params[paramI++] = tableName;
for (String colName : colNames) {
params[paramI++] = colName;
}
for (String partName : partNames) {
params[paramI++] = partName;
}
params[paramI++] = engine;
return params;
}
public List<ColumnStatistics> getPartitionStats(
final String catName, final String dbName, final String tableName, final List<String> partNames,
List<String> colNames, String engine, boolean enableBitVector) throws MetaException {
if (colNames.isEmpty() || partNames.isEmpty()) {
return Collections.emptyList();
}
final boolean doTrace = LOG.isDebugEnabled();
final String queryText0 = "select \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from "
+ " " + PART_COL_STATS + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and " +
"\"COLUMN_NAME\""
+ " in (%1$s) AND \"PARTITION_NAME\" in (%2$s) "
+ " and \"ENGINE\" = ? "
+ " order by \"PARTITION_NAME\"";
Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
@Override
public List<Object[]> run(final List<String> inputColNames) throws MetaException {
Batchable<String, Object[]> b2 = new Batchable<String, Object[]>() {
@Override
public List<Object[]> run(List<String> inputPartNames) throws MetaException {
String queryText = String.format(queryText0,
makeParams(inputColNames.size()), makeParams(inputPartNames.size()));
long start = doTrace ? System.nanoTime() : 0;
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
Object qResult = executeWithArray(query, prepareParams(
catName, dbName, tableName, inputPartNames, inputColNames, engine), queryText);
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, (doTrace ? System.nanoTime() : 0));
if (qResult == null) {
query.closeAll();
return Collections.emptyList();
}
addQueryAfterUse(query);
return MetastoreDirectSqlUtils.ensureList(qResult);
}
};
try {
return Batchable.runBatched(batchSize, partNames, b2);
} finally {
addQueryAfterUse(b2);
}
}
};
List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
List<ColumnStatistics> result = new ArrayList<ColumnStatistics>(
Math.min(list.size(), partNames.size()));
String lastPartName = null;
int from = 0;
try {
for (int i = 0; i <= list.size(); ++i) {
boolean isLast = i == list.size();
String partName = isLast ? null : (String) list.get(i)[0];
if (!isLast && partName.equals(lastPartName)) {
continue;
} else if (from != i) {
ColumnStatisticsDesc csd =
new ColumnStatisticsDesc(false, dbName, tableName);
csd.setCatName(catName);
csd.setPartName(lastPartName);
result.add(makeColumnStats(list.subList(from, i), csd, 1, engine));
}
lastPartName = partName;
from = i;
Deadline.checkTimeout();
}
} finally {
b.closeAllQueries();
}
return result;
}
/** The common query part for table and partition stats */
private final String getStatsList(boolean enableBitVector) {
return "\"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", \"LONG_HIGH_VALUE\", "
+ "\"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", \"BIG_DECIMAL_LOW_VALUE\", "
+ "\"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", \"NUM_DISTINCTS\", "
+ (enableBitVector ? "\"BIT_VECTOR\", " : "\'\', ") + "\"AVG_COL_LEN\", "
+ "\"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\" ";
}
private ColumnStatistics makeColumnStats(
List<Object[]> list, ColumnStatisticsDesc csd, int offset, String engine) throws MetaException {
ColumnStatistics result = new ColumnStatistics();
result.setStatsDesc(csd);
List<ColumnStatisticsObj> csos = new ArrayList<ColumnStatisticsObj>(list.size());
for (Object[] row : list) {
// LastAnalyzed is stored per column but thrift has it per several;
// get the lowest for now as nobody actually uses this field.
Object laObj = row[offset + 15];
if (laObj != null && (!csd.isSetLastAnalyzed() || csd.getLastAnalyzed() > MetastoreDirectSqlUtils
.extractSqlLong(laObj))) {
csd.setLastAnalyzed(MetastoreDirectSqlUtils.extractSqlLong(laObj));
}
csos.add(prepareCSObj(row, offset));
Deadline.checkTimeout();
}
result.setStatsObj(csos);
result.setEngine(engine);
return result;
}
private String makeParams(int size) {
// W/ size 0, query will fail, but at least we'd get to see the query in debug output.
return (size == 0) ? "" : repeat(",?", size).substring(1);
}
@SuppressWarnings("unchecked")
private <T> T executeWithArray(Query query, Object[] params, String sql) throws MetaException {
return executeWithArray(query, params, sql, -1);
}
@SuppressWarnings("unchecked")
private <T> T executeWithArray(Query query, Object[] params, String sql, int limit) throws MetaException {
return MetastoreDirectSqlUtils.executeWithArray(query, params, sql, limit);
}
/**
* This run the necessary logic to prepare for queries. It should be called once, after the
* txn on DataNucleus connection is opened, and before any queries are issued. What it does
* currently is run db-specific logic, e.g. setting ansi quotes mode for MySQL. The reason it
* must be used inside of the txn is connection pooling; there's no way to guarantee that the
* effect will apply to the connection that is executing the queries otherwise.
*/
public void prepareTxn() throws MetaException {
String stmt = dbType.getPrepareTxnStmt();
if (stmt == null) {
return;
}
try {
assert pm.currentTransaction().isActive(); // must be inside tx together with queries
executeNoResult(stmt);
} catch (SQLException sqlEx) {
throw new MetaException("Error setting ansi quotes: " + sqlEx.getMessage());
}
}
public List<SQLForeignKey> getForeignKeys(String catName, String parent_db_name,
String parent_tbl_name, String foreign_db_name,
String foreign_tbl_name) throws MetaException {
List<SQLForeignKey> ret = new ArrayList<>();
String queryText =
"SELECT \"D2\".\"NAME\", \"T2\".\"TBL_NAME\", "
+ "CASE WHEN \"C2\".\"COLUMN_NAME\" IS NOT NULL THEN \"C2\".\"COLUMN_NAME\" "
+ "ELSE \"P2\".\"PKEY_NAME\" END, "
+ "" + DBS + ".\"NAME\", " + TBLS + ".\"TBL_NAME\", "
+ "CASE WHEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" IS NOT NULL THEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" "
+ "ELSE " + PARTITION_KEYS + ".\"PKEY_NAME\" END, "
+ "" + KEY_CONSTRAINTS + ".\"POSITION\", " + KEY_CONSTRAINTS + ".\"UPDATE_RULE\", " + KEY_CONSTRAINTS + ".\"DELETE_RULE\", "
+ "" + KEY_CONSTRAINTS + ".\"CONSTRAINT_NAME\" , \"KEY_CONSTRAINTS2\".\"CONSTRAINT_NAME\", " + KEY_CONSTRAINTS + ".\"ENABLE_VALIDATE_RELY\" "
+ " from " + TBLS + " "
+ " INNER JOIN " + KEY_CONSTRAINTS + " ON " + TBLS + ".\"TBL_ID\" = " + KEY_CONSTRAINTS + ".\"CHILD_TBL_ID\" "
+ " INNER JOIN " + KEY_CONSTRAINTS + " \"KEY_CONSTRAINTS2\" ON \"KEY_CONSTRAINTS2\".\"PARENT_TBL_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" "
+ " AND \"KEY_CONSTRAINTS2\".\"PARENT_CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " \"KEY_CONSTRAINTS2\".\"PARENT_INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " INNER JOIN " + TBLS + " \"T2\" ON " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" = \"T2\".\"TBL_ID\" "
+ " INNER JOIN " + DBS + " \"D2\" ON \"T2\".\"DB_ID\" = \"D2\".\"DB_ID\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " ON " + COLUMNS_V2 + ".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"CHILD_CD_ID\" AND "
+ " " + COLUMNS_V2 + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"CHILD_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " ON " + TBLS + ".\"TBL_ID\" = " + PARTITION_KEYS + ".\"TBL_ID\" AND "
+ " " + PARTITION_KEYS + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"CHILD_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " \"C2\" ON \"C2\".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " \"C2\".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " \"P2\" ON \"P2\".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" AND "
+ " \"P2\".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " WHERE " + KEY_CONSTRAINTS + ".\"CONSTRAINT_TYPE\" = " + MConstraint.FOREIGN_KEY_CONSTRAINT
+ " AND \"KEY_CONSTRAINTS2\".\"CONSTRAINT_TYPE\" = " + MConstraint.PRIMARY_KEY_CONSTRAINT + " AND"
+ " " + DBS + ".\"CTLG_NAME\" = ? AND"
+ (foreign_db_name == null ? "" : " " + DBS + ".\"NAME\" = ? AND")
+ (foreign_tbl_name == null ? "" : " " + TBLS + ".\"TBL_NAME\" = ? AND")
+ (parent_tbl_name == null ? "" : " \"T2\".\"TBL_NAME\" = ? AND")
+ (parent_db_name == null ? "" : " \"D2\".\"NAME\" = ?") ;
queryText = queryText.trim();
if (queryText.endsWith("AND")) {
queryText = queryText.substring(0, queryText.length()-3);
}
List<String> pms = new ArrayList<String>();
pms.add(catName);
if (foreign_db_name != null) {
pms.add(foreign_db_name);
}
if (foreign_tbl_name != null) {
pms.add(foreign_tbl_name);
}
if (parent_tbl_name != null) {
pms.add(parent_tbl_name);
}
if (parent_db_name != null) {
pms.add(parent_db_name);
}
Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryParams, pms.toArray(), queryText));
if (!sqlResult.isEmpty()) {
for (Object[] line : sqlResult) {
int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[11]);
boolean enable = (enableValidateRely & 4) != 0;
boolean validate = (enableValidateRely & 2) != 0;
boolean rely = (enableValidateRely & 1) != 0;
SQLForeignKey currKey = new SQLForeignKey(
MetastoreDirectSqlUtils.extractSqlString(line[0]),
MetastoreDirectSqlUtils.extractSqlString(line[1]),
MetastoreDirectSqlUtils.extractSqlString(line[2]),
MetastoreDirectSqlUtils.extractSqlString(line[3]),
MetastoreDirectSqlUtils.extractSqlString(line[4]),
MetastoreDirectSqlUtils.extractSqlString(line[5]),
MetastoreDirectSqlUtils.extractSqlInt(line[6]),
MetastoreDirectSqlUtils.extractSqlInt(line[7]),
MetastoreDirectSqlUtils.extractSqlInt(line[8]),
MetastoreDirectSqlUtils.extractSqlString(line[9]),
MetastoreDirectSqlUtils.extractSqlString(line[10]),
enable,
validate,
rely
);
currKey.setCatName(catName);
ret.add(currKey);
}
}
queryParams.closeAll();
return ret;
}
public List<SQLPrimaryKey> getPrimaryKeys(String catName, String db_name, String tbl_name)
throws MetaException {
List<SQLPrimaryKey> ret = new ArrayList<>();
String queryText =
"SELECT " + DBS + ".\"NAME\", " + TBLS + ".\"TBL_NAME\", "
+ "CASE WHEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" IS NOT NULL THEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" "
+ "ELSE " + PARTITION_KEYS + ".\"PKEY_NAME\" END, " + KEY_CONSTRAINTS + ".\"POSITION\", "
+ KEY_CONSTRAINTS + ".\"CONSTRAINT_NAME\", " + KEY_CONSTRAINTS + ".\"ENABLE_VALIDATE_RELY\", "
+ DBS + ".\"CTLG_NAME\""
+ " from " + TBLS + " "
+ " INNER JOIN " + KEY_CONSTRAINTS + " ON " + TBLS + ".\"TBL_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " ON " + COLUMNS_V2 + ".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " " + COLUMNS_V2 + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " ON " + TBLS + ".\"TBL_ID\" = " + PARTITION_KEYS + ".\"TBL_ID\" AND "
+ " " + PARTITION_KEYS + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " WHERE " + KEY_CONSTRAINTS + ".\"CONSTRAINT_TYPE\" = "+ MConstraint.PRIMARY_KEY_CONSTRAINT + " AND"
+ " " + DBS + ".\"CTLG_NAME\" = ? AND"
+ (db_name == null ? "" : " " + DBS + ".\"NAME\" = ? AND")
+ (tbl_name == null ? "" : " " + TBLS + ".\"TBL_NAME\" = ? ") ;
queryText = queryText.trim();
if (queryText.endsWith("AND")) {
queryText = queryText.substring(0, queryText.length()-3);
}
List<String> pms = new ArrayList<>();
pms.add(catName);
if (db_name != null) {
pms.add(db_name);
}
if (tbl_name != null) {
pms.add(tbl_name);
}
Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryParams, pms.toArray(), queryText));
if (!sqlResult.isEmpty()) {
for (Object[] line : sqlResult) {
int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[5]);
boolean enable = (enableValidateRely & 4) != 0;
boolean validate = (enableValidateRely & 2) != 0;
boolean rely = (enableValidateRely & 1) != 0;
SQLPrimaryKey currKey = new SQLPrimaryKey(
MetastoreDirectSqlUtils.extractSqlString(line[0]),
MetastoreDirectSqlUtils.extractSqlString(line[1]),
MetastoreDirectSqlUtils.extractSqlString(line[2]),
MetastoreDirectSqlUtils.extractSqlInt(line[3]), MetastoreDirectSqlUtils.extractSqlString(line[4]),
enable,
validate,
rely);
currKey.setCatName(MetastoreDirectSqlUtils.extractSqlString(line[6]));
ret.add(currKey);
}
}
queryParams.closeAll();
return ret;
}
public List<SQLUniqueConstraint> getUniqueConstraints(String catName, String db_name, String tbl_name)
throws MetaException {
List<SQLUniqueConstraint> ret = new ArrayList<SQLUniqueConstraint>();
String queryText =
"SELECT " + DBS + ".\"NAME\", " + TBLS + ".\"TBL_NAME\", "
+ "CASE WHEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" IS NOT NULL THEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" "
+ "ELSE " + PARTITION_KEYS + ".\"PKEY_NAME\" END, " + KEY_CONSTRAINTS + ".\"POSITION\", "
+ KEY_CONSTRAINTS + ".\"CONSTRAINT_NAME\", " + KEY_CONSTRAINTS + ".\"ENABLE_VALIDATE_RELY\" "
+ " from " + TBLS + " "
+ " INNER JOIN " + KEY_CONSTRAINTS + " ON " + TBLS + ".\"TBL_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " ON " + COLUMNS_V2 + ".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " " + COLUMNS_V2 + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " ON " + TBLS + ".\"TBL_ID\" = " + PARTITION_KEYS + ".\"TBL_ID\" AND "
+ " " + PARTITION_KEYS + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " WHERE " + KEY_CONSTRAINTS + ".\"CONSTRAINT_TYPE\" = "+ MConstraint.UNIQUE_CONSTRAINT + " AND"
+ " " + DBS + ".\"CTLG_NAME\" = ? AND"
+ (db_name == null ? "" : " " + DBS + ".\"NAME\" = ? AND")
+ (tbl_name == null ? "" : " " + TBLS + ".\"TBL_NAME\" = ? ") ;
queryText = queryText.trim();
if (queryText.endsWith("AND")) {
queryText = queryText.substring(0, queryText.length()-3);
}
List<String> pms = new ArrayList<String>();
pms.add(catName);
if (db_name != null) {
pms.add(db_name);
}
if (tbl_name != null) {
pms.add(tbl_name);
}
Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryParams, pms.toArray(), queryText));
if (!sqlResult.isEmpty()) {
for (Object[] line : sqlResult) {
int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[5]);
boolean enable = (enableValidateRely & 4) != 0;
boolean validate = (enableValidateRely & 2) != 0;
boolean rely = (enableValidateRely & 1) != 0;
ret.add(new SQLUniqueConstraint(
catName,
MetastoreDirectSqlUtils.extractSqlString(line[0]),
MetastoreDirectSqlUtils.extractSqlString(line[1]),
MetastoreDirectSqlUtils.extractSqlString(line[2]),
MetastoreDirectSqlUtils.extractSqlInt(line[3]), MetastoreDirectSqlUtils.extractSqlString(line[4]),
enable,
validate,
rely));
}
}
queryParams.closeAll();
return ret;
}
public List<SQLNotNullConstraint> getNotNullConstraints(String catName, String db_name, String tbl_name)
throws MetaException {
List<SQLNotNullConstraint> ret = new ArrayList<>();
String queryText =
"SELECT " + DBS + ".\"NAME\", " + TBLS + ".\"TBL_NAME\","
+ "CASE WHEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" IS NOT NULL THEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" "
+ "ELSE " + PARTITION_KEYS + ".\"PKEY_NAME\" END, "
+ "" + KEY_CONSTRAINTS + ".\"CONSTRAINT_NAME\", " + KEY_CONSTRAINTS + ".\"ENABLE_VALIDATE_RELY\" "
+ " from " + TBLS + " "
+ " INNER JOIN " + KEY_CONSTRAINTS + " ON " + TBLS + ".\"TBL_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " ON " + COLUMNS_V2 + ".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " " + COLUMNS_V2 + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " ON " + TBLS + ".\"TBL_ID\" = " + PARTITION_KEYS + ".\"TBL_ID\" AND "
+ " " + PARTITION_KEYS + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " WHERE " + KEY_CONSTRAINTS + ".\"CONSTRAINT_TYPE\" = "+ MConstraint.NOT_NULL_CONSTRAINT + " AND"
+ " " + DBS + ".\"CTLG_NAME\" = ? AND"
+ (db_name == null ? "" : " " + DBS + ".\"NAME\" = ? AND")
+ (tbl_name == null ? "" : " " + TBLS + ".\"TBL_NAME\" = ? ") ;
queryText = queryText.trim();
if (queryText.endsWith("AND")) {
queryText = queryText.substring(0, queryText.length()-3);
}
List<String> pms = new ArrayList<>();
pms.add(catName);
if (db_name != null) {
pms.add(db_name);
}
if (tbl_name != null) {
pms.add(tbl_name);
}
Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryParams, pms.toArray(), queryText));
if (!sqlResult.isEmpty()) {
for (Object[] line : sqlResult) {
int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[4]);
boolean enable = (enableValidateRely & 4) != 0;
boolean validate = (enableValidateRely & 2) != 0;
boolean rely = (enableValidateRely & 1) != 0;
ret.add(new SQLNotNullConstraint(
catName,
MetastoreDirectSqlUtils.extractSqlString(line[0]),
MetastoreDirectSqlUtils.extractSqlString(line[1]),
MetastoreDirectSqlUtils.extractSqlString(line[2]),
MetastoreDirectSqlUtils.extractSqlString(line[3]),
enable,
validate,
rely));
}
}
queryParams.closeAll();
return ret;
}
public List<SQLDefaultConstraint> getDefaultConstraints(String catName, String db_name, String tbl_name)
throws MetaException {
List<SQLDefaultConstraint> ret = new ArrayList<SQLDefaultConstraint>();
String queryText =
"SELECT " + DBS + ".\"NAME\", " + TBLS + ".\"TBL_NAME\","
+ "CASE WHEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" IS NOT NULL THEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" "
+ "ELSE " + PARTITION_KEYS + ".\"PKEY_NAME\" END, "
+ "" + KEY_CONSTRAINTS + ".\"CONSTRAINT_NAME\", " + KEY_CONSTRAINTS + ".\"ENABLE_VALIDATE_RELY\", "
+ "" + KEY_CONSTRAINTS + ".\"DEFAULT_VALUE\" "
+ " from " + TBLS + " "
+ " INNER JOIN " + KEY_CONSTRAINTS + " ON " + TBLS + ".\"TBL_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " ON " + COLUMNS_V2 + ".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " " + COLUMNS_V2 + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " ON " + TBLS + ".\"TBL_ID\" = " + PARTITION_KEYS + ".\"TBL_ID\" AND "
+ " " + PARTITION_KEYS + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " WHERE " + KEY_CONSTRAINTS + ".\"CONSTRAINT_TYPE\" = "+ MConstraint.DEFAULT_CONSTRAINT+ " AND"
+ " " + DBS + ".\"CTLG_NAME\" = ? AND"
+ (db_name == null ? "" : " " + DBS + ".\"NAME\" = ? AND")
+ (tbl_name == null ? "" : " " + TBLS + ".\"TBL_NAME\" = ? ") ;
queryText = queryText.trim();
if (queryText.endsWith("AND")) {
queryText = queryText.substring(0, queryText.length()-3);
}
if (LOG.isDebugEnabled()){
LOG.debug("getDefaultConstraints: directsql : " + queryText);
}
List<String> pms = new ArrayList<>();
pms.add(catName);
if (db_name != null) {
pms.add(db_name);
}
if (tbl_name != null) {
pms.add(tbl_name);
}
Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryParams, pms.toArray(), queryText));
if (!sqlResult.isEmpty()) {
for (Object[] line : sqlResult) {
int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[4]);
boolean enable = (enableValidateRely & 4) != 0;
boolean validate = (enableValidateRely & 2) != 0;
boolean rely = (enableValidateRely & 1) != 0;
SQLDefaultConstraint currConstraint = new SQLDefaultConstraint(
catName,
MetastoreDirectSqlUtils.extractSqlString(line[0]),
MetastoreDirectSqlUtils.extractSqlString(line[1]),
MetastoreDirectSqlUtils.extractSqlString(line[2]),
MetastoreDirectSqlUtils.extractSqlString(line[5]),
MetastoreDirectSqlUtils.extractSqlString(line[3]),
enable,
validate,
rely);
ret.add(currConstraint);
}
}
queryParams.closeAll();
return ret;
}
public List<SQLCheckConstraint> getCheckConstraints(String catName, String db_name, String tbl_name)
throws MetaException {
List<SQLCheckConstraint> ret = new ArrayList<SQLCheckConstraint>();
String queryText =
"SELECT " + DBS + ".\"NAME\", " + TBLS + ".\"TBL_NAME\","
+ "CASE WHEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" IS NOT NULL THEN " + COLUMNS_V2 + ".\"COLUMN_NAME\" "
+ "ELSE " + PARTITION_KEYS + ".\"PKEY_NAME\" END, "
+ "" + KEY_CONSTRAINTS + ".\"CONSTRAINT_NAME\", " + KEY_CONSTRAINTS + ".\"ENABLE_VALIDATE_RELY\", "
+ "" + KEY_CONSTRAINTS + ".\"DEFAULT_VALUE\" "
+ " from " + TBLS + " "
+ " INNER JOIN " + KEY_CONSTRAINTS + " ON " + TBLS + ".\"TBL_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_TBL_ID\" "
+ " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ " LEFT OUTER JOIN " + COLUMNS_V2 + " ON " + COLUMNS_V2 + ".\"CD_ID\" = " + KEY_CONSTRAINTS + ".\"PARENT_CD_ID\" AND "
+ " " + COLUMNS_V2 + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " LEFT OUTER JOIN " + PARTITION_KEYS + " ON " + TBLS + ".\"TBL_ID\" = " + PARTITION_KEYS + ".\"TBL_ID\" AND "
+ " " + PARTITION_KEYS + ".\"INTEGER_IDX\" = " + KEY_CONSTRAINTS + ".\"PARENT_INTEGER_IDX\" "
+ " WHERE " + KEY_CONSTRAINTS + ".\"CONSTRAINT_TYPE\" = "+ MConstraint.CHECK_CONSTRAINT+ " AND"
+ " " + DBS + ".\"CTLG_NAME\" = ? AND"
+ (db_name == null ? "" : " " + DBS + ".\"NAME\" = ? AND")
+ (tbl_name == null ? "" : " " + TBLS + ".\"TBL_NAME\" = ? ") ;
queryText = queryText.trim();
if (queryText.endsWith("AND")) {
queryText = queryText.substring(0, queryText.length()-3);
}
if (LOG.isDebugEnabled()){
LOG.debug("getCheckConstraints: directsql : " + queryText);
}
List<String> pms = new ArrayList<>();
pms.add(catName);
if (db_name != null) {
pms.add(db_name);
}
if (tbl_name != null) {
pms.add(tbl_name);
}
Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
queryParams, pms.toArray(), queryText));
if (!sqlResult.isEmpty()) {
for (Object[] line : sqlResult) {
int enableValidateRely = MetastoreDirectSqlUtils.extractSqlInt(line[4]);
boolean enable = (enableValidateRely & 4) != 0;
boolean validate = (enableValidateRely & 2) != 0;
boolean rely = (enableValidateRely & 1) != 0;
SQLCheckConstraint currConstraint = new SQLCheckConstraint(
catName,
MetastoreDirectSqlUtils.extractSqlString(line[0]),
MetastoreDirectSqlUtils.extractSqlString(line[1]),
MetastoreDirectSqlUtils.extractSqlString(line[2]),
MetastoreDirectSqlUtils.extractSqlString(line[5]),
MetastoreDirectSqlUtils.extractSqlString(line[3]),
enable,
validate,
rely);
ret.add(currConstraint);
}
}
queryParams.closeAll();
return ret;
}
/**
* Drop partitions by using direct SQL queries.
* @param catName Metastore catalog name.
* @param dbName Metastore db name.
* @param tblName Metastore table name.
* @param partNames Partition names to get.
* @return List of partitions.
*/
public void dropPartitionsViaSqlFilter(final String catName, final String dbName,
final String tblName, List<String> partNames)
throws MetaException {
if (partNames.isEmpty()) {
return;
}
Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() {
@Override
public List<Void> run(List<String> input) throws MetaException {
String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
// Get partition ids
List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
filter, input, Collections.<String>emptyList(), null);
if (partitionIds.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
dropPartitionsByPartitionIds(partitionIds);
return Collections.emptyList();
}
});
}
/**
* Drops Partition-s. Should be called with the list short enough to not trip up Oracle/etc.
* @param partitionIdList The partition identifiers to drop
* @throws MetaException If there is an SQL exception during the execution it converted to
* MetaException
*/
private void dropPartitionsByPartitionIds(List<Long> partitionIdList) throws MetaException {
String queryText;
if (partitionIdList.isEmpty()) {
return;
}
String partitionIds = getIdListForIn(partitionIdList);
// Get the corresponding SD_ID-s, CD_ID-s, SERDE_ID-s
queryText =
"SELECT " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\", " + SDS + ".\"SERDE_ID\" "
+ "from " + SDS + " "
+ "INNER JOIN " + PARTITIONS + " ON " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" "
+ "WHERE " + PARTITIONS + ".\"PART_ID\" in (" + partitionIds + ")";
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils
.ensureList(executeWithArray(query, null, queryText));
List<Object> sdIdList = new ArrayList<>(partitionIdList.size());
List<Object> columnDescriptorIdList = new ArrayList<>(1);
List<Object> serdeIdList = new ArrayList<>(partitionIdList.size());
if (!sqlResult.isEmpty()) {
for (Object[] fields : sqlResult) {
sdIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[0]));
Long colId = MetastoreDirectSqlUtils.extractSqlLong(fields[1]);
if (!columnDescriptorIdList.contains(colId)) {
columnDescriptorIdList.add(colId);
}
serdeIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[2]));
}
}
query.closeAll();
try {
// Drop privileges
queryText = "delete from " + PART_PRIVS + " where \"PART_ID\" in (" + partitionIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop column level privileges
queryText = "delete from " + PART_COL_PRIVS + " where \"PART_ID\" in (" + partitionIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop partition statistics
queryText = "delete from " + PART_COL_STATS + " where \"PART_ID\" in (" + partitionIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the partition params
queryText = "delete from " + PARTITION_PARAMS + " where \"PART_ID\" in ("
+ partitionIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the partition key vals
queryText = "delete from " + PARTITION_KEY_VALS + " where \"PART_ID\" in ("
+ partitionIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the partitions
queryText = "delete from " + PARTITIONS + " where \"PART_ID\" in (" + partitionIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
} catch (SQLException sqlException) {
LOG.warn("SQL error executing query while dropping partition", sqlException);
throw new MetaException("Encountered error while dropping partitions.");
}
dropStorageDescriptors(sdIdList);
Deadline.checkTimeout();
dropSerdes(serdeIdList);
Deadline.checkTimeout();
dropDanglingColumnDescriptors(columnDescriptorIdList);
}
/**
* Drops SD-s. Should be called with the list short enough to not trip up Oracle/etc.
* @param storageDescriptorIdList The storage descriptor identifiers to drop
* @throws MetaException If there is an SQL exception during the execution it converted to
* MetaException
*/
private void dropStorageDescriptors(List<Object> storageDescriptorIdList) throws MetaException {
if (storageDescriptorIdList.isEmpty()) {
return;
}
String queryText;
String sdIds = getIdListForIn(storageDescriptorIdList);
// Get the corresponding SKEWED_STRING_LIST_ID data
queryText =
"select " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" "
+ "from " + SKEWED_VALUES + " "
+ "WHERE " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ")";
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils
.ensureList(executeWithArray(query, null, queryText));
List<Object> skewedStringListIdList = new ArrayList<>(0);
if (!sqlResult.isEmpty()) {
for (Object[] fields : sqlResult) {
skewedStringListIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[0]));
}
}
query.closeAll();
String skewedStringListIds = getIdListForIn(skewedStringListIdList);
try {
// Drop the SD params
queryText = "delete from " + SD_PARAMS + " where \"SD_ID\" in (" + sdIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the sort cols
queryText = "delete from " + SORT_COLS + " where \"SD_ID\" in (" + sdIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the bucketing cols
queryText = "delete from " + BUCKETING_COLS + " where \"SD_ID\" in (" + sdIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the skewed string lists
if (skewedStringListIdList.size() > 0) {
// Drop the skewed string value loc map
queryText = "delete from " + SKEWED_COL_VALUE_LOC_MAP + " where \"SD_ID\" in ("
+ sdIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the skewed values
queryText = "delete from " + SKEWED_VALUES + " where \"SD_ID_OID\" in (" + sdIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the skewed string list values
queryText = "delete from " + SKEWED_STRING_LIST_VALUES + " where \"STRING_LIST_ID\" in ("
+ skewedStringListIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the skewed string list
queryText = "delete from " + SKEWED_STRING_LIST + " where \"STRING_LIST_ID\" in ("
+ skewedStringListIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
}
// Drop the skewed cols
queryText = "delete from " + SKEWED_COL_NAMES + " where \"SD_ID\" in (" + sdIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the sds
queryText = "delete from " + SDS + " where \"SD_ID\" in (" + sdIds + ")";
executeNoResult(queryText);
} catch (SQLException sqlException) {
LOG.warn("SQL error executing query while dropping storage descriptor.", sqlException);
throw new MetaException("Encountered error while dropping storage descriptor.");
}
}
/**
* Drops Serde-s. Should be called with the list short enough to not trip up Oracle/etc.
* @param serdeIdList The serde identifiers to drop
* @throws MetaException If there is an SQL exception during the execution it converted to
* MetaException
*/
private void dropSerdes(List<Object> serdeIdList) throws MetaException {
String queryText;
if (serdeIdList.isEmpty()) {
return;
}
String serdeIds = getIdListForIn(serdeIdList);
try {
// Drop the serde params
queryText = "delete from " + SERDE_PARAMS + " where \"SERDE_ID\" in (" + serdeIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the serdes
queryText = "delete from " + SERDES + " where \"SERDE_ID\" in (" + serdeIds + ")";
executeNoResult(queryText);
} catch (SQLException sqlException) {
LOG.warn("SQL error executing query while dropping serde.", sqlException);
throw new MetaException("Encountered error while dropping serde.");
}
}
/**
* Checks if the column descriptors still has references for other SD-s. If not, then removes
* them. Should be called with the list short enough to not trip up Oracle/etc.
* @param columnDescriptorIdList The column identifiers
* @throws MetaException If there is an SQL exception during the execution it converted to
* MetaException
*/
private void dropDanglingColumnDescriptors(List<Object> columnDescriptorIdList)
throws MetaException {
if (columnDescriptorIdList.isEmpty()) {
return;
}
String queryText;
String colIds = getIdListForIn(columnDescriptorIdList);
// Drop column descriptor, if no relation left
queryText =
"SELECT " + SDS + ".\"CD_ID\", count(1) "
+ "from " + SDS + " "
+ "WHERE " + SDS + ".\"CD_ID\" in (" + colIds + ") "
+ "GROUP BY " + SDS + ".\"CD_ID\"";
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
List<Object[]> sqlResult = MetastoreDirectSqlUtils
.ensureList(executeWithArray(query, null, queryText));
List<Object> danglingColumnDescriptorIdList = new ArrayList<>(columnDescriptorIdList.size());
if (!sqlResult.isEmpty()) {
for (Object[] fields : sqlResult) {
if (MetastoreDirectSqlUtils.extractSqlInt(fields[1]) == 0) {
danglingColumnDescriptorIdList.add(MetastoreDirectSqlUtils.extractSqlLong(fields[0]));
}
}
}
query.closeAll();
if (!danglingColumnDescriptorIdList.isEmpty()) {
try {
String danglingCDIds = getIdListForIn(danglingColumnDescriptorIdList);
// Drop the columns_v2
queryText = "delete from " + COLUMNS_V2 + " where \"CD_ID\" in (" + danglingCDIds + ")";
executeNoResult(queryText);
Deadline.checkTimeout();
// Drop the cols
queryText = "delete from " + CDS + " where \"CD_ID\" in (" + danglingCDIds + ")";
executeNoResult(queryText);
} catch (SQLException sqlException) {
LOG.warn("SQL error executing query while dropping dangling col descriptions", sqlException);
throw new MetaException("Encountered error while dropping col descriptions");
}
}
}
public final static Object[] STATS_TABLE_TYPES = new Object[] {
TableType.MANAGED_TABLE.toString(), TableType.MATERIALIZED_VIEW.toString()
};
public List<org.apache.hadoop.hive.common.TableName> getTableNamesWithStats() throws MetaException {
// Could we also join with ACID tables to only get tables with outdated stats?
String queryText0 = "SELECT DISTINCT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", "
+ DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON "
+ TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"";
String queryText1 = " WHERE " + TBLS + ".\"TBL_TYPE\" IN ("
+ makeParams(STATS_TABLE_TYPES.length) + ")";
List<org.apache.hadoop.hive.common.TableName> result = new ArrayList<>();
String queryText = queryText0 + " INNER JOIN " + TAB_COL_STATS
+ " ON " + TBLS + ".\"TBL_ID\" = " + TAB_COL_STATS + ".\"TBL_ID\"" + queryText1;
getStatsTableListResult(queryText, result);
queryText = queryText0 + " INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = "
+ PARTITIONS + ".\"TBL_ID\"" + " INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS
+ ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\"" + queryText1;
getStatsTableListResult(queryText, result);
return result;
}
public Map<String, List<String>> getColAndPartNamesWithStats(
String catName, String dbName, String tableName) throws MetaException {
// Could we also join with ACID tables to only get tables with outdated stats?
String queryText = "SELECT DISTINCT " + PARTITIONS + ".\"PART_NAME\", " + PART_COL_STATS
+ ".\"COLUMN_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = "
+ DBS + ".\"DB_ID\" INNER JOIN " + PARTITIONS + " ON " + TBLS + ".\"TBL_ID\" = "
+ PARTITIONS + ".\"TBL_ID\" INNER JOIN " + PART_COL_STATS + " ON " + PARTITIONS
+ ".\"PART_ID\" = " + PART_COL_STATS + ".\"PART_ID\" WHERE " + DBS + ".\"NAME\" = ? AND "
+ DBS + ".\"CTLG_NAME\" = ? AND " + TBLS + ".\"TBL_NAME\" = ? ORDER BY "
+ PARTITIONS + ".\"PART_NAME\"";
LOG.debug("Running {}", queryText);
Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
try {
List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray(
query, new Object[] { dbName, catName, tableName }, queryText));
Map<String, List<String>> result = new HashMap<>();
String lastPartName = null;
List<String> cols = null;
for (Object[] line : sqlResult) {
String col = MetastoreDirectSqlUtils.extractSqlString(line[1]);
String part = MetastoreDirectSqlUtils.extractSqlString(line[0]);
if (!part.equals(lastPartName)) {
if (lastPartName != null) {
result.put(lastPartName, cols);
}
cols = cols == null ? new ArrayList<>() : new ArrayList<>(cols.size());
lastPartName = part;
}
cols.add(col);
}
if (lastPartName != null) {
result.put(lastPartName, cols);
}
return result;
} finally {
query.closeAll();
}
}
public List<org.apache.hadoop.hive.common.TableName> getAllTableNamesForStats() throws MetaException {
String queryText = "SELECT " + TBLS + ".\"TBL_NAME\", " + DBS + ".\"NAME\", "
+ DBS + ".\"CTLG_NAME\" FROM " + TBLS + " INNER JOIN " + DBS + " ON " + TBLS
+ ".\"DB_ID\" = " + DBS + ".\"DB_ID\""
+ " WHERE " + TBLS + ".\"TBL_TYPE\" IN (" + makeParams(STATS_TABLE_TYPES.length) + ")";
List<org.apache.hadoop.hive.common.TableName> result = new ArrayList<>();
getStatsTableListResult(queryText, result);
return result;
}
private void getStatsTableListResult(
String queryText, List<org.apache.hadoop.hive.common.TableName> result) throws MetaException {
LOG.debug("Running {}", queryText);
Query<?> query = pm.newQuery("javax.jdo.query.SQL", queryText);
try {
List<Object[]> sqlResult = MetastoreDirectSqlUtils
.ensureList(executeWithArray(query, STATS_TABLE_TYPES, queryText));
for (Object[] line : sqlResult) {
result.add(new org.apache.hadoop.hive.common.TableName(
MetastoreDirectSqlUtils.extractSqlString(line[2]), MetastoreDirectSqlUtils
.extractSqlString(line[1]), MetastoreDirectSqlUtils.extractSqlString(line[0])));
}
} finally {
query.closeAll();
}
}
public void lockDbTable(String tableName) throws MetaException {
String lockCommand = "lock table \"" + tableName + "\" in exclusive mode";
try {
executeNoResult(lockCommand);
} catch (SQLException sqle) {
throw new MetaException("Error while locking table " + tableName + ": " + sqle.getMessage());
}
}
public void deleteColumnStatsState(long tbl_id) throws MetaException {
// @formatter:off
String queryText = ""
+ "delete from " + PARTITION_PARAMS + " "
+ " where "
+ " \"PART_ID\" in (select p.\"PART_ID\" from " + PARTITIONS + " p where"
+ " p.\"TBL_ID\" = " + tbl_id + ")"
+ " and \"PARAM_KEY\" = '"+StatsSetupConst.COLUMN_STATS_ACCURATE + "'";
// @formatter:on
try {
executeNoResult(queryText);
} catch (SQLException e) {
throw new MetaException("Error removing column stat states:" + e.getMessage());
}
}
public Map<String, Map<String, String>> updatePartitionColumnStatisticsBatch(
Map<String, ColumnStatistics> partColStatsMap,
Table tbl,
List<TransactionalMetaStoreEventListener> listeners,
String validWriteIds, long writeId)
throws MetaException {
long numStats = 0;
for (Map.Entry entry : partColStatsMap.entrySet()) {
ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
numStats += colStats.getStatsObjSize();
}
long csId = updateStat.getNextCSIdForMPartitionColumnStatistics(numStats);
return updateStat.updatePartitionColumnStatistics(partColStatsMap, tbl, csId, validWriteIds, writeId, listeners);
}
}