Merge remote-tracking branch 'origin/master' into hadoop-next
Change-Id: Ib37056d1f273f6efb5b87c9b48512f8b14ab497b
diff --git a/be/src/gutil/walltime.cc b/be/src/gutil/walltime.cc
index b497500..04d7f4b 100644
--- a/be/src/gutil/walltime.cc
+++ b/be/src/gutil/walltime.cc
@@ -166,18 +166,6 @@
return true;
}
-WallTime WallTime_Now() {
-#if defined(__APPLE__)
- mach_timespec_t ts;
- walltime_internal::GetCurrentTime(&ts);
- return ts.tv_sec + ts.tv_nsec / static_cast<double>(1e9);
-#else
- timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- return ts.tv_sec + ts.tv_nsec / static_cast<double>(1e9);
-#endif // defined(__APPLE__)
-}
-
void StringAppendStrftime(string* dst,
const char* format,
time_t when,
diff --git a/be/src/gutil/walltime.h b/be/src/gutil/walltime.h
index 8d31627..2f04ebe 100644
--- a/be/src/gutil/walltime.h
+++ b/be/src/gutil/walltime.h
@@ -30,6 +30,12 @@
#include "common/logging.h"
#include "gutil/integral_types.h"
+#define NANOS_PER_SEC 1000000000ll
+#define NANOS_PER_MICRO 1000ll
+#define MICROS_PER_SEC 1000000ll
+#define MICROS_PER_MILLI 1000ll
+#define MILLIS_PER_SEC 1000ll
+
typedef double WallTime;
// Append result to a supplied string.
@@ -52,9 +58,6 @@
bool local,
WallTime* result);
-// Return current time in seconds as a WallTime.
-WallTime WallTime_Now();
-
typedef int64 MicrosecondsInt64;
typedef int64 NanosecondsInt64;
@@ -76,7 +79,7 @@
inline MicrosecondsInt64 GetCurrentTimeMicros() {
mach_timespec_t ts;
GetCurrentTime(&ts);
- return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
+ return ts.tv_sec * MICROS_PER_SEC + ts.tv_nsec / NANOS_PER_MICRO;
}
inline int64_t GetMonoTimeNanos() {
@@ -91,7 +94,7 @@
}
inline MicrosecondsInt64 GetMonoTimeMicros() {
- return GetMonoTimeNanos() / 1e3;
+ return GetMonoTimeNanos() / NANOS_PER_MICRO;
}
inline MicrosecondsInt64 GetThreadCpuTimeMicros() {
@@ -117,7 +120,8 @@
return 0;
}
- return thread_info_data.user_time.seconds * 1e6 + thread_info_data.user_time.microseconds;
+ return thread_info_data.user_time.seconds * MICROS_PER_SEC +
+ thread_info_data.user_time.microseconds;
}
#else
@@ -125,13 +129,13 @@
inline MicrosecondsInt64 GetClockTimeMicros(clockid_t clock) {
timespec ts;
clock_gettime(clock, &ts);
- return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
+ return ts.tv_sec * MICROS_PER_SEC + ts.tv_nsec / NANOS_PER_MICRO;
}
inline NanosecondsInt64 GetClockTimeNanos(clockid_t clock) {
timespec ts;
clock_gettime(clock, &ts);
- return ts.tv_sec * 1e9 + ts.tv_nsec;
+ return ts.tv_sec * NANOS_PER_SEC + ts.tv_nsec;
}
#endif // defined(__APPLE__)
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e6a9084..afbb33d 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -86,22 +86,9 @@
}
string TypeMapping(Type::type t) {
- switch (t) {
- case Type::BOOLEAN:
- return "BOOLEAN";
- case Type::INT32:
- return "INT32";
- case Type::INT64:
- return "INT64";
- case Type::FLOAT:
- return "FLOAT";
- case Type::DOUBLE:
- return "DOUBLE";
- case Type::BYTE_ARRAY:
- return "BYTE_ARRAY";
- default:
- return "UNKNOWN";
- }
+ auto it = _Type_VALUES_TO_NAMES.find(t);
+ if (it != _Type_VALUES_TO_NAMES.end()) return it->second;
+ return "UNKNOWN";
}
void AppendSchema(const vector<SchemaElement>& schema, int level,
@@ -148,8 +135,8 @@
// def levels - with our RLE scheme it is not possible to determine how many values
// were actually written if the final run is a literal run, only if the final run is
// a repeated run (see util/rle-encoding.h for more details).
-void CheckDataPage(const ColumnChunk& col, const PageHeader& header,
- const uint8_t* page) {
+// Returns the number of rows specified by the header.
+int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) {
const uint8_t* data = page;
std::vector<uint8_t> decompressed_buffer;
if (col.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
@@ -191,6 +178,8 @@
}
}
}
+
+ return header.data_page_header.num_values;
}
// Simple utility to read parquet files on local disk. This utility validates the
@@ -248,12 +237,14 @@
int total_page_header_size = 0;
int total_compressed_data_size = 0;
int total_uncompressed_data_size = 0;
- vector<int> column_sizes;
+ vector<int> column_byte_sizes;
+ vector<int> column_num_rows;
for (int i = 0; i < file_metadata.row_groups.size(); ++i) {
cerr << "Reading row group " << i << endl;
RowGroup& rg = file_metadata.row_groups[i];
- column_sizes.resize(rg.columns.size());
+ column_byte_sizes.resize(rg.columns.size());
+ column_num_rows.resize(rg.columns.size());
for (int c = 0; c < rg.columns.size(); ++c) {
cerr << " Reading column " << c << endl;
@@ -278,18 +269,23 @@
}
data += header_size;
- if (header.__isset.data_page_header) CheckDataPage(col, header, data);
+ if (header.__isset.data_page_header) {
+ column_num_rows[c] += CheckDataPage(col, header, data);
+ }
total_page_header_size += header_size;
- column_sizes[c] += header.compressed_page_size;
+ column_byte_sizes[c] += header.compressed_page_size;
total_compressed_data_size += header.compressed_page_size;
total_uncompressed_data_size += header.uncompressed_page_size;
data += header.compressed_page_size;
++pages_read;
}
- // Check that we ended exactly where we should have
+ // Check that we ended exactly where we should have.
assert(data == col_end);
+ // Check that all cols have the same number of rows.
+ assert(column_num_rows[0] == column_num_rows[c]);
}
+ num_rows += column_num_rows[0];
}
double compression_ratio =
(double)total_uncompressed_data_size / total_compressed_data_size;
@@ -306,9 +302,9 @@
<< "(" << (total_compressed_data_size / (double)file_len) << ")" << endl;
ss << " Column uncompressed size: " << total_uncompressed_data_size
<< "(" << compression_ratio << ")" << endl;
- for (int i = 0; i < column_sizes.size(); ++i) {
- ss << " " << "Col " << i << ": " << column_sizes[i]
- << "(" << (column_sizes[i] / (double)file_len) << ")" << endl;
+ for (int i = 0; i < column_byte_sizes.size(); ++i) {
+ ss << " " << "Col " << i << ": " << column_byte_sizes[i]
+ << "(" << (column_byte_sizes[i] / (double)file_len) << ")" << endl;
}
cerr << ss.str() << endl;
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index 0e73b6a..c1f85aa 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -170,7 +170,7 @@
// Now() can be called frequently (IMPALA-2407).
timespec ts;
clock_gettime(OsInfo::fast_clock(), &ts);
- return ts.tv_sec * 1e9 + ts.tv_nsec;
+ return ts.tv_sec * NANOS_PER_SEC + ts.tv_nsec;
#endif
}
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 28a0f66..efe6a3b 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -39,11 +39,11 @@
}
inline int64_t MonotonicMillis() {
- return GetMonoTimeMicros() / 1e3;
+ return GetMonoTimeMicros() / MICROS_PER_MILLI;
}
inline int64_t MonotonicSeconds() {
- return GetMonoTimeMicros() / 1e6;
+ return GetMonoTimeMicros() / MICROS_PER_SEC;
}
@@ -52,7 +52,7 @@
/// a cluster. For more accurate timings on the local host use the monotonic functions
/// above.
inline int64_t UnixMillis() {
- return GetCurrentTimeMicros() / 1e3;
+ return GetCurrentTimeMicros() / MICROS_PER_MILLI;
}
/// Sleeps the current thread for at least duration_ms milliseconds.
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 7cdd026..d971852 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -80,8 +80,8 @@
"The location of the debug webserver's SSL certificate file, in .pem format. If "
"empty, webserver SSL support is not enabled");
DEFINE_string(webserver_private_key_file, "", "The full path to the private key used as a"
- " counterpart to the public key contained in --ssl_server_certificate. If "
- "--ssl_server_certificate is set, this option must be set as well.");
+ " counterpart to the public key contained in --webserver_certificate_file. If "
+ "--webserver_certificate_file is set, this option must be set as well.");
DEFINE_string(webserver_private_key_password_cmd, "", "A Unix command whose output "
"returns the password used to decrypt the Webserver's certificate private key file "
"specified in --webserver_private_key_file. If the .PEM key file is not "
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index d11c1c3..8b3cc8c 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -141,7 +141,7 @@
# Remove spaces, trim minor versions, and convert to lowercase.
DISTRO_VERSION="$(tr -d ' \n' <<< "$DISTRO_VERSION" | cut -d. -f1 | tr "A-Z" "a-z")"
case "$DISTRO_VERSION" in
- centos6 | centos7 | debian7 | debian8 | sles12 | ubuntu* )
+ centos6 | centos7 | debian7 | debian8 | suselinux12 | ubuntu* )
KUDU_IS_SUPPORTED=true;;
esac
fi
diff --git a/bin/make_impala.sh b/bin/make_impala.sh
index 97525c3..70d088f 100755
--- a/bin/make_impala.sh
+++ b/bin/make_impala.sh
@@ -170,7 +170,6 @@
exit 0
fi
-MAKE_ARGS=-j${IMPALA_BUILD_THREADS:-4}
if [ $BUILD_FE_ONLY -eq 1 ]; then
${MAKE_CMD} ${MAKE_ARGS} fe
elif [ $BUILD_EVERYTHING -eq 1 ]; then
diff --git a/buildall.sh b/buildall.sh
index fc66acb..291f19e 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -59,7 +59,8 @@
BUILD_ASAN=0
BUILD_FE_ONLY=0
BUILD_TIDY=0
-MAKE_CMD=make
+# Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
+export MAKE_CMD=make
LZO_CMAKE_ARGS=
# Defaults that can be picked up from the environment, but are overridable through the
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 012da42..33f0591 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -276,10 +276,10 @@
terminal BITAND, BITOR, BITXOR, BITNOT;
terminal EQUAL, NOT, NOTEQUAL, LESSTHAN, GREATERTHAN;
terminal FACTORIAL; // Placeholder terminal for postfix factorial operator
+terminal COMMENTED_PLAN_HINT_START, COMMENTED_PLAN_HINT_END;
terminal String IDENT;
terminal String EMPTY_IDENT;
terminal String NUMERIC_OVERFLOW;
-terminal String COMMENTED_PLAN_HINTS;
terminal BigDecimal INTEGER_LITERAL;
terminal BigDecimal DECIMAL_LITERAL;
terminal String STRING_LITERAL;
@@ -367,7 +367,8 @@
nonterminal Subquery subquery;
nonterminal JoinOperator join_operator;
nonterminal opt_inner, opt_outer;
-nonterminal ArrayList<String> opt_plan_hints;
+nonterminal PlanHint plan_hint;
+nonterminal List<PlanHint> opt_plan_hints, plan_hint_list;
nonterminal TypeDef type_def;
nonterminal Type type;
nonterminal Expr sign_chain_expr;
@@ -2300,41 +2301,41 @@
list.add(table);
RESULT = list;
:}
- | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:hints table_ref:table
+ | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:join_hints table_ref:table
opt_plan_hints:table_hints
{:
table.setJoinOp(JoinOperator.CROSS_JOIN);
// We will throw an AnalysisException if there are join hints so that we can provide
// a better error message than a parser exception.
- table.setJoinHints(hints);
+ table.setJoinHints(join_hints);
table.setTableHints(table_hints);
list.add(table);
RESULT = list;
:}
- | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+ | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
opt_plan_hints:table_hints
{:
table.setJoinOp((JoinOperator) op);
- table.setJoinHints(hints);
+ table.setJoinHints(join_hints);
table.setTableHints(table_hints);
list.add(table);
RESULT = list;
:}
- | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+ | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
opt_plan_hints:table_hints KW_ON expr:e
{:
table.setJoinOp((JoinOperator) op);
- table.setJoinHints(hints);
+ table.setJoinHints(join_hints);
table.setTableHints(table_hints);
table.setOnClause(e);
list.add(table);
RESULT = list;
:}
- | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+ | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
opt_plan_hints:table_hints KW_USING LPAREN ident_list:colNames RPAREN
{:
table.setJoinOp((JoinOperator) op);
- table.setJoinHints(hints);
+ table.setJoinHints(join_hints);
table.setTableHints(table_hints);
table.setUsingClause(colNames);
list.add(table);
@@ -2381,30 +2382,42 @@
;
opt_plan_hints ::=
- COMMENTED_PLAN_HINTS:l
- {:
- ArrayList<String> hints = new ArrayList<String>();
- String[] tokens = l.split(",");
- for (String token: tokens) {
- String trimmedToken = token.trim();
- if (trimmedToken.length() > 0) hints.add(trimmedToken);
- }
- RESULT = hints;
- :}
+ COMMENTED_PLAN_HINT_START plan_hint_list:hints COMMENTED_PLAN_HINT_END
+ {: RESULT = hints; :}
/* legacy straight_join hint style */
| KW_STRAIGHT_JOIN
- {:
- ArrayList<String> hints = new ArrayList<String>();
- hints.add("straight_join");
- RESULT = hints;
- :}
+ {: RESULT = Lists.newArrayList(new PlanHint("straight_join")); :}
/* legacy plan-hint style */
- | LBRACKET ident_list:l RBRACKET
- {: RESULT = l; :}
+ | LBRACKET plan_hint_list:hints RBRACKET
+ {: RESULT = hints; :}
+ | /* empty */
+ {: RESULT = Lists.newArrayList(); :}
+ ;
+
+plan_hint ::=
+ KW_STRAIGHT_JOIN
+ {: RESULT = new PlanHint("straight_join"); :}
+ | IDENT:name
+ {: RESULT = new PlanHint(name); :}
+ | IDENT:name LPAREN ident_list:args RPAREN
+ {: RESULT = new PlanHint(name, args); :}
| /* empty */
{: RESULT = null; :}
;
+plan_hint_list ::=
+ plan_hint:hint
+ {:
+ ArrayList<PlanHint> hints = Lists.newArrayList(hint);
+ RESULT = hints;
+ :}
+ | plan_hint_list:hints COMMA plan_hint:hint
+ {:
+ if (hint != null) hints.add(hint);
+ RESULT = hints;
+ :}
+ ;
+
ident_list ::=
ident_or_default:ident
{:
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 1dacf48..902d100 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -64,7 +64,7 @@
private final List<PartitionKeyValue> partitionKeyValues_;
// User-supplied hints to control hash partitioning before the table sink in the plan.
- private final List<String> planHints_;
+ private List<PlanHint> planHints_ = Lists.newArrayList();
// False if the original insert statement had a query statement, true if we need to
// auto-generate one (for insert into tbl()) during analysis.
@@ -124,6 +124,14 @@
// clustering step.
private boolean hasClusteredHint_ = false;
+ // For every column of the target table that is referenced in the optional 'sortby()'
+ // hint, this list will contain the corresponding result expr from 'resultExprs_'.
+ // Before insertion, all rows will be sorted by these exprs. If the list is empty, no
+ // additional sorting by non-partitioning columns will be performed. For Hdfs tables,
+ // the 'sortby()' hint must not contain partition columns. For Kudu tables, it must not
+ // contain primary key columns.
+ private List<Expr> sortByExprs_ = Lists.newArrayList();
+
// Output expressions that produce the final results to write to the target table. May
// include casts. Set in prepareExpressions().
// If this is an INSERT on a non-Kudu table, it will contain one Expr for all
@@ -153,19 +161,19 @@
public static InsertStmt createInsert(WithClause withClause, TableName targetTable,
boolean overwrite, List<PartitionKeyValue> partitionKeyValues,
- List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
+ List<PlanHint> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
return new InsertStmt(withClause, targetTable, overwrite, partitionKeyValues,
planHints, queryStmt, columnPermutation, false);
}
public static InsertStmt createUpsert(WithClause withClause, TableName targetTable,
- List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
+ List<PlanHint> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
return new InsertStmt(withClause, targetTable, false, null, planHints, queryStmt,
columnPermutation, true);
}
protected InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite,
- List<PartitionKeyValue> partitionKeyValues, List<String> planHints,
+ List<PartitionKeyValue> partitionKeyValues, List<PlanHint> planHints,
QueryStmt queryStmt, List<String> columnPermutation, boolean isUpsert) {
Preconditions.checkState(!isUpsert || (!overwrite && partitionKeyValues == null));
withClause_ = withClause;
@@ -173,7 +181,7 @@
originalTableName_ = targetTableName_;
overwrite_ = overwrite;
partitionKeyValues_ = partitionKeyValues;
- planHints_ = planHints;
+ planHints_ = (planHints != null) ? planHints : new ArrayList<PlanHint>();
queryStmt_ = queryStmt;
needsGeneratedQueryStatement_ = (queryStmt == null);
columnPermutation_ = columnPermutation;
@@ -210,6 +218,7 @@
hasShuffleHint_ = false;
hasNoShuffleHint_ = false;
hasClusteredHint_ = false;
+ sortByExprs_.clear();
resultExprs_.clear();
mentionedColumns_.clear();
primaryKeyExprs_.clear();
@@ -729,25 +738,27 @@
}
private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
- if (planHints_ == null) return;
if (!planHints_.isEmpty() && table_ instanceof HBaseTable) {
- throw new AnalysisException("INSERT hints are only supported for inserting into " +
- "Hdfs and Kudu tables.");
+ throw new AnalysisException(String.format("INSERT hints are only supported for " +
+ "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
}
boolean hasNoClusteredHint = false;
- for (String hint: planHints_) {
- if (hint.equalsIgnoreCase("SHUFFLE")) {
+ for (PlanHint hint: planHints_) {
+ if (hint.is("SHUFFLE")) {
hasShuffleHint_ = true;
analyzer.setHasPlanHints();
- } else if (hint.equalsIgnoreCase("NOSHUFFLE")) {
+ } else if (hint.is("NOSHUFFLE")) {
hasNoShuffleHint_ = true;
analyzer.setHasPlanHints();
- } else if (hint.equalsIgnoreCase("CLUSTERED")) {
+ } else if (hint.is("CLUSTERED")) {
hasClusteredHint_ = true;
analyzer.setHasPlanHints();
- } else if (hint.equalsIgnoreCase("NOCLUSTERED")) {
+ } else if (hint.is("NOCLUSTERED")) {
hasNoClusteredHint = true;
analyzer.setHasPlanHints();
+ } else if (hint.is("SORTBY")) {
+ analyzeSortByHint(hint);
+ analyzer.setHasPlanHints();
} else {
analyzer.addWarning("INSERT hint not recognized: " + hint);
}
@@ -761,6 +772,51 @@
}
}
+ private void analyzeSortByHint(PlanHint hint) throws AnalysisException {
+ // HBase tables don't support insert hints at all (must be enforced by the caller).
+ Preconditions.checkState(!(table_ instanceof HBaseTable));
+
+ if (isUpsert_) {
+ throw new AnalysisException("SORTBY hint is not supported in UPSERT statements.");
+ }
+
+ List<String> columnNames = hint.getArgs();
+ Preconditions.checkState(!columnNames.isEmpty());
+ for (String columnName: columnNames) {
+ // Make sure it's not a Kudu primary key column or Hdfs partition column.
+ if (table_ instanceof KuduTable) {
+ KuduTable kuduTable = (KuduTable) table_;
+ if (kuduTable.isPrimaryKeyColumn(columnName)) {
+ throw new AnalysisException(String.format("SORTBY hint column list must not " +
+ "contain Kudu primary key column: '%s'", columnName));
+ }
+ } else {
+ for (Column tableColumn: table_.getClusteringColumns()) {
+ if (tableColumn.getName().equals(columnName)) {
+ throw new AnalysisException(String.format("SORTBY hint column list must " +
+ "not contain Hdfs partition column: '%s'", columnName));
+ }
+ }
+ }
+
+ // Find the matching column in the target table's column list (by name) and store
+ // the corresponding result expr in sortByExprs_.
+ boolean foundColumn = false;
+ List<Column> columns = table_.getNonClusteringColumns();
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns.get(i).getName().equals(columnName)) {
+ sortByExprs_.add(resultExprs_.get(i));
+ foundColumn = true;
+ break;
+ }
+ }
+ if (!foundColumn) {
+ throw new AnalysisException(String.format("Could not find SORTBY hint column " +
+ "'%s' in table.", columnName));
+ }
+ }
+ }
+
@Override
public ArrayList<Expr> getResultExprs() { return resultExprs_; }
@@ -772,7 +828,7 @@
private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; }
- public List<String> getPlanHints() { return planHints_; }
+ public List<PlanHint> getPlanHints() { return planHints_; }
public TableName getTargetTableName() { return targetTableName_; }
public Table getTargetTable() { return table_; }
public void setTargetTable(Table table) { this.table_ = table; }
@@ -788,6 +844,7 @@
public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
public boolean hasClusteredHint() { return hasClusteredHint_; }
public ArrayList<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
+ public List<Expr> getSortByExprs() { return sortByExprs_; }
public List<String> getMentionedColumns() {
List<String> result = Lists.newArrayList();
@@ -812,6 +869,7 @@
resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
primaryKeyExprs_ = Expr.substituteList(primaryKeyExprs_, smap, analyzer, true);
+ sortByExprs_ = Expr.substituteList(sortByExprs_, smap, analyzer, true);
}
@Override
@@ -840,8 +898,8 @@
}
strBuilder.append(" PARTITION (" + Joiner.on(", ").join(values) + ")");
}
- if (planHints_ != null) {
- strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(planHints_));
+ if (!planHints_.isEmpty()) {
+ strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(getPlanHints()));
}
if (!needsGeneratedQueryStatement_) {
strBuilder.append(" " + queryStmt_.toSql());
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
index 3ba2ad2..d5f0e70 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
@@ -137,12 +137,11 @@
}
}
- // Transform <COL> = NULL into IsNull expr; <String COL> = '' into IsNull expr and
- // <String COL> = 'String Value' into lower case.
+ // Transform <COL> = NULL into IsNull expr; <String COL> = '' into IsNull expr.
// The reason is that COL = NULL is allowed for selecting the NULL
// partition, but a COL = NULL predicate can never be true, so we
// need to transform such predicates before feeding them into the
- // partition pruner. Same logic goes to String transformation.
+ // partition pruner.
private List<Expr> transformPartitionConjuncts(Analyzer analyzer, List<Expr> conjuncts)
throws AnalysisException {
List<Expr> transformedConjuncts = Lists.newArrayList();
@@ -162,9 +161,6 @@
} else if (leftChild != null && stringChild != null) {
if (stringChild.getStringValue().isEmpty()) {
result = new IsNullPredicate(leftChild, false);
- } else {
- stringChild = new StringLiteral(stringChild.getStringValue().toLowerCase());
- result.setChild(1, stringChild);
}
}
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/PlanHint.java b/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
new file mode 100644
index 0000000..d16919f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Class to parse and store query plan hints, which can occur in various places inside SQL
+ * query statements. A hint consist of a name and an optional list of arguments.
+ */
+public class PlanHint {
+ /// The plan hint name.
+ private final String name_;
+
+ /// Optional list of arguments.
+ private final List<String> args_;
+
+ public PlanHint(String name) {
+ name_ = name;
+ args_ = Lists.newArrayList();
+ }
+
+ public PlanHint(String name, List<String> args) {
+ name_ = name;
+ args_ = args;
+ }
+
+ /// Check whether this hint equals to a given string, ignoring case.
+ public boolean is(String s) { return name_.equalsIgnoreCase(s); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null) return false;
+ if (getClass() != o.getClass()) return false;
+ PlanHint oh = (PlanHint) o;
+ return name_.equals(oh.name_) && args_.equals(oh.args_);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ if (!args_.isEmpty()) {
+ sb.append("(");
+ sb.append(Joiner.on(",").join(args_));
+ sb.append(")");
+ }
+ return sb.toString();
+ }
+
+ public List<String> getArgs() { return args_; }
+ public String toSql() { return toString(); }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectList.java b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
index 4c504bf..d7f12ff 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectList.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
@@ -22,13 +22,14 @@
import org.apache.impala.common.AnalysisException;
import org.apache.impala.rewrite.ExprRewriter;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Select list items plus optional distinct clause and optional plan hints.
*/
public class SelectList {
- private List<String> planHints_;
+ private List<PlanHint> planHints_ = Lists.newArrayList();;
private boolean isDistinct_;
/////////////////////////////////////////
@@ -50,7 +51,7 @@
}
public SelectList(List<SelectListItem> items, boolean isDistinct,
- List<String> planHints) {
+ List<PlanHint> planHints) {
isDistinct_ = isDistinct;
items_ = items;
planHints_ = planHints;
@@ -60,8 +61,7 @@
* C'tor for cloning.
*/
public SelectList(SelectList other) {
- planHints_ =
- (other.planHints_ != null) ? Lists.newArrayList(other.planHints_) : null;
+ planHints_ = Lists.newArrayList(other.planHints_);
items_ = Lists.newArrayList();
for (SelectListItem item: other.items_) {
items_.add(item.clone());
@@ -70,16 +70,20 @@
}
public List<SelectListItem> getItems() { return items_; }
- public void setPlanHints(List<String> planHints) { planHints_ = planHints; }
- public List<String> getPlanHints() { return planHints_; }
+
+ public void setPlanHints(List<PlanHint> planHints) {
+ Preconditions.checkNotNull(planHints);
+ planHints_ = planHints;
+ }
+
+ public List<PlanHint> getPlanHints() { return planHints_; }
public boolean isDistinct() { return isDistinct_; }
public void setIsDistinct(boolean value) { isDistinct_ = value; }
- public boolean hasPlanHints() { return planHints_ != null; }
+ public boolean hasPlanHints() { return !planHints_.isEmpty(); }
public void analyzePlanHints(Analyzer analyzer) {
- if (planHints_ == null) return;
- for (String hint: planHints_) {
- if (!hint.equalsIgnoreCase("straight_join")) {
+ for (PlanHint hint: planHints_) {
+ if (!hint.is("straight_join")) {
analyzer.addWarning("PLAN hint not recognized: " + hint);
}
analyzer.setIsStraightJoin();
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index d6bbfd2..975b70b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -82,10 +82,10 @@
protected final Privilege priv_;
protected JoinOperator joinOp_;
- protected ArrayList<String> joinHints_;
+ protected List<PlanHint> joinHints_ = Lists.newArrayList();
protected List<String> usingColNames_;
- protected ArrayList<String> tableHints_;
+ protected List<PlanHint> tableHints_ = Lists.newArrayList();
protected TReplicaPreference replicaPreference_;
protected boolean randomReplica_;
@@ -156,13 +156,11 @@
hasExplicitAlias_ = other.hasExplicitAlias_;
priv_ = other.priv_;
joinOp_ = other.joinOp_;
- joinHints_ =
- (other.joinHints_ != null) ? Lists.newArrayList(other.joinHints_) : null;
+ joinHints_ = Lists.newArrayList(other.joinHints_);
onClause_ = (other.onClause_ != null) ? other.onClause_.clone() : null;
usingColNames_ =
(other.usingColNames_ != null) ? Lists.newArrayList(other.usingColNames_) : null;
- tableHints_ =
- (other.tableHints_ != null) ? Lists.newArrayList(other.tableHints_) : null;
+ tableHints_ = Lists.newArrayList(other.tableHints_);
replicaPreference_ = other.replicaPreference_;
randomReplica_ = other.randomReplica_;
distrMode_ = other.distrMode_;
@@ -262,8 +260,8 @@
return resolvedPath_.getRootTable();
}
public Privilege getPrivilege() { return priv_; }
- public ArrayList<String> getJoinHints() { return joinHints_; }
- public ArrayList<String> getTableHints() { return tableHints_; }
+ public List<PlanHint> getJoinHints() { return joinHints_; }
+ public List<PlanHint> getTableHints() { return tableHints_; }
public Expr getOnClause() { return onClause_; }
public List<String> getUsingClause() { return usingColNames_; }
public void setJoinOp(JoinOperator op) { this.joinOp_ = op; }
@@ -271,12 +269,23 @@
public void setUsingClause(List<String> colNames) { this.usingColNames_ = colNames; }
public TableRef getLeftTblRef() { return leftTblRef_; }
public void setLeftTblRef(TableRef leftTblRef) { this.leftTblRef_ = leftTblRef; }
- public void setJoinHints(ArrayList<String> hints) { this.joinHints_ = hints; }
- public void setTableHints(ArrayList<String> hints) { this.tableHints_ = hints; }
+
+ public void setJoinHints(List<PlanHint> hints) {
+ Preconditions.checkNotNull(hints);
+ joinHints_ = hints;
+ }
+
+ public void setTableHints(List<PlanHint> hints) {
+ Preconditions.checkNotNull(hints);
+ tableHints_ = hints;
+ }
+
public boolean isBroadcastJoin() { return distrMode_ == DistributionMode.BROADCAST; }
+
public boolean isPartitionedJoin() {
return distrMode_ == DistributionMode.PARTITIONED;
}
+
public DistributionMode getDistributionMode() { return distrMode_; }
public List<TupleId> getCorrelatedTupleIds() { return correlatedTupleIds_; }
public boolean isAnalyzed() { return isAnalyzed_; }
@@ -336,7 +345,7 @@
}
private void analyzeTableHints(Analyzer analyzer) {
- if (tableHints_ == null) return;
+ if (tableHints_.isEmpty()) return;
if (!(this instanceof BaseTableRef)) {
analyzer.addWarning("Table hints not supported for inline view and collections");
return;
@@ -347,17 +356,17 @@
!(getResolvedPath().destTable() instanceof HdfsTable)) {
analyzer.addWarning("Table hints only supported for Hdfs tables");
}
- for (String hint: tableHints_) {
- if (hint.equalsIgnoreCase("SCHEDULE_CACHE_LOCAL")) {
+ for (PlanHint hint: tableHints_) {
+ if (hint.is("SCHEDULE_CACHE_LOCAL")) {
analyzer.setHasPlanHints();
replicaPreference_ = TReplicaPreference.CACHE_LOCAL;
- } else if (hint.equalsIgnoreCase("SCHEDULE_DISK_LOCAL")) {
+ } else if (hint.is("SCHEDULE_DISK_LOCAL")) {
analyzer.setHasPlanHints();
replicaPreference_ = TReplicaPreference.DISK_LOCAL;
- } else if (hint.equalsIgnoreCase("SCHEDULE_REMOTE")) {
+ } else if (hint.is("SCHEDULE_REMOTE")) {
analyzer.setHasPlanHints();
replicaPreference_ = TReplicaPreference.REMOTE;
- } else if (hint.equalsIgnoreCase("SCHEDULE_RANDOM_REPLICA")) {
+ } else if (hint.is("SCHEDULE_RANDOM_REPLICA")) {
analyzer.setHasPlanHints();
randomReplica_ = true;
} else {
@@ -369,9 +378,9 @@
}
private void analyzeJoinHints(Analyzer analyzer) throws AnalysisException {
- if (joinHints_ == null) return;
- for (String hint: joinHints_) {
- if (hint.equalsIgnoreCase("BROADCAST")) {
+ if (joinHints_.isEmpty()) return;
+ for (PlanHint hint: joinHints_) {
+ if (hint.is("BROADCAST")) {
if (joinOp_ == JoinOperator.RIGHT_OUTER_JOIN
|| joinOp_ == JoinOperator.FULL_OUTER_JOIN
|| joinOp_ == JoinOperator.RIGHT_SEMI_JOIN
@@ -384,7 +393,7 @@
}
distrMode_ = DistributionMode.BROADCAST;
analyzer.setHasPlanHints();
- } else if (hint.equalsIgnoreCase("SHUFFLE")) {
+ } else if (hint.is("SHUFFLE")) {
if (joinOp_ == JoinOperator.CROSS_JOIN) {
throw new AnalysisException("CROSS JOIN does not support SHUFFLE.");
}
@@ -545,7 +554,7 @@
}
StringBuilder output = new StringBuilder(" " + joinOp_.toString() + " ");
- if(joinHints_ != null) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
+ if(!joinHints_.isEmpty()) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
output.append(tableRefToSql());
if (usingColNames_ != null) {
output.append(" USING (").append(Joiner.on(", ").join(usingColNames_)).append(")");
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index be4ab6f..35f7e79 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -389,8 +389,9 @@
* commented plan hint style such that hinted views created by Impala are readable by
* Hive (parsed as a comment by Hive).
*/
- public static String getPlanHintsSql(List<String> hints) {
- if (hints == null || hints.isEmpty()) return "";
+ public static String getPlanHintsSql(List<PlanHint> hints) {
+ Preconditions.checkNotNull(hints);
+ if (hints.isEmpty()) return "";
StringBuilder sb = new StringBuilder();
sb.append("\n-- +");
sb.append(Joiner.on(",").join(hints));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index c7d2097..79e5072 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -535,7 +535,7 @@
for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
for (TPartitionKeyValue kv: partitionSpec) {
if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
- targetValues.add(kv.getValue().toLowerCase());
+ targetValues.add(kv.getValue());
// Same key was specified twice
if (!keys.add(kv.getName().toLowerCase())) {
return null;
@@ -569,7 +569,7 @@
// backwards compatibility with Hive, and is clearly broken.
if (value.isEmpty()) value = getNullPartitionKeyValue();
}
- if (!targetValues.get(i).equals(value.toLowerCase())) {
+ if (!targetValues.get(i).equals(value)) {
matchFound = false;
break;
}
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 580f5e0..cdb620c 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -309,7 +309,7 @@
SlotRef ref = (SlotRef) predicate.getChild(0);
LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
- // Cannot push prediates with null literal values (KUDU-1595).
+ // Cannot push predicates with null literal values (KUDU-1595).
if (literal instanceof NullLiteral) return false;
String colName = ref.getDesc().getColumn().getName();
@@ -379,8 +379,13 @@
List<Object> values = Lists.newArrayList();
for (int i = 1; i < predicate.getChildren().size(); ++i) {
if (!(predicate.getChild(i).isLiteral())) return false;
- Object value = getKuduInListValue((LiteralExpr) predicate.getChild(i));
- Preconditions.checkNotNull(value == null);
+ LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
+
+ // Cannot push predicates with null literal values (KUDU-1595).
+ if (literal instanceof NullLiteral) return false;
+
+ Object value = getKuduInListValue(literal);
+ Preconditions.checkNotNull(value);
values.add(value);
}
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 3369686..297e9b2 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -144,7 +144,7 @@
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
}
// Add optional sort node to the plan, based on clustered/noclustered plan hint.
- createClusteringSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
+ createPreInsertSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
// set up table sink for root fragment
rootFragment.setSink(insertStmt.createDataSink());
resultExprs = insertStmt.getResultExprs();
@@ -512,21 +512,26 @@
}
/**
- * Insert a sort node on top of the plan, depending on the clustered/noclustered plan
- * hint. This will sort the data produced by 'inputFragment' by the clustering columns
- * (key columns for Kudu tables), so that partitions can be written sequentially in the
- * table sink.
+ * Insert a sort node on top of the plan, depending on the clustered/noclustered/sortby
+ * plan hint. If clustering is enabled in insertStmt, then the ordering columns will
+ * start with the clustering columns (key columns for Kudu tables), so that partitions
+ * can be written sequentially in the table sink. Any additional non-clustering columns
+ * specified by the sortby hint will be added to the ordering columns and after any
+ * clustering columns. If neither clustering nor a sortby hint are specified, then no
+ * sort node will be added to the plan.
*/
- public void createClusteringSort(InsertStmt insertStmt, PlanFragment inputFragment,
+ public void createPreInsertSort(InsertStmt insertStmt, PlanFragment inputFragment,
Analyzer analyzer) throws ImpalaException {
- if (!insertStmt.hasClusteredHint()) return;
+ List<Expr> orderingExprs = Lists.newArrayList();
- List<Expr> orderingExprs;
- if (insertStmt.getTargetTable() instanceof KuduTable) {
- orderingExprs = Lists.newArrayList(insertStmt.getPrimaryKeyExprs());
- } else {
- orderingExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
+ if (insertStmt.hasClusteredHint()) {
+ if (insertStmt.getTargetTable() instanceof KuduTable) {
+ orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
+ } else {
+ orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
+ }
}
+ orderingExprs.addAll(insertStmt.getSortByExprs());
// Ignore constants for the sake of clustering.
Expr.removeConstants(orderingExprs);
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 982e9a2..6d1a773 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -301,7 +301,6 @@
%}
LineTerminator = \r|\n|\r\n
-NonTerminator = [^\r\n]
Whitespace = {LineTerminator} | [ \t\f]
// Order of rules to resolve ambiguity:
@@ -321,14 +320,30 @@
SingleQuoteStringLiteral = \'(\\.|[^\\\'])*\'
DoubleQuoteStringLiteral = \"(\\.|[^\\\"])*\"
+EolHintBegin = "--" " "* "+"
+CommentedHintBegin = "/*" " "* "+"
+CommentedHintEnd = "*/"
+
// Both types of plan hints must appear within a single line.
-TraditionalCommentedPlanHints = "/*" [ ]* "+" [^\r\n*]* "*/"
-// Must end with a line terminator.
-EndOfLineCommentedPlanHints = "--" [ ]* "+" {NonTerminator}* {LineTerminator}
+HintContent = " "* "+" [^\r\n]*
Comment = {TraditionalComment} | {EndOfLineComment}
-TraditionalComment = "/*" ~"*/"
-EndOfLineComment = "--" {NonTerminator}* {LineTerminator}?
+
+// Match anything that has a comment end (*/) in it.
+ContainsCommentEnd = [^]* "*/" [^]*
+// Match anything that has a line terminator in it.
+ContainsLineTerminator = [^]* {LineTerminator} [^]*
+
+// A traditional comment is anything that starts and ends like a comment and has neither a
+// plan hint inside nor a CommentEnd (*/).
+TraditionalComment = "/*" !({HintContent}|{ContainsCommentEnd}) "*/"
+// Similar for a end-of-line comment.
+EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminator}?
+
+// This additional state is needed because newlines signal the end of a end-of-line hint
+// if one has been started earlier. Hence we need to discern between newlines within and
+// outside of end-of-line hints.
+%state EOLHINT
%%
// Put '...' before '.'
@@ -412,18 +427,22 @@
return newToken(SqlParserSymbols.STRING_LITERAL, yytext().substring(1, yytext().length()-1));
}
-{TraditionalCommentedPlanHints} {
- String text = yytext();
- // Remove everything before the first '+' as well as the trailing "*/"
- String hintStr = text.substring(text.indexOf('+') + 1, text.length() - 2);
- return newToken(SqlParserSymbols.COMMENTED_PLAN_HINTS, hintStr.trim());
+{CommentedHintBegin} {
+ return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_START, null);
}
-{EndOfLineCommentedPlanHints} {
- String text = yytext();
- // Remove everything before the first '+'
- String hintStr = text.substring(text.indexOf('+') + 1);
- return newToken(SqlParserSymbols.COMMENTED_PLAN_HINTS, hintStr.trim());
+{CommentedHintEnd} {
+ return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_END, null);
+}
+
+{EolHintBegin} {
+ yybegin(EOLHINT);
+ return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_START, null);
+}
+
+<EOLHINT> {LineTerminator} {
+ yybegin(YYINITIAL);
+ return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_END, null);
}
{Comment} { /* ignore */ }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 935edc5..5a08f98 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -424,7 +424,7 @@
AnalyzesOk("alter table functional.stringpartitionkey PARTITION " +
"(string_col='partition1') set fileformat parquet");
AnalyzesOk("alter table functional.stringpartitionkey PARTITION " +
- "(string_col='PaRtiTion1') set location '/a/b/c'");
+ "(string_col='partition1') set location '/a/b/c'");
AnalyzesOk("alter table functional.alltypes PARTITION (year=2010, month=11) " +
"set tblproperties('a'='1')");
AnalyzesOk("alter table functional.alltypes PARTITION (year<=2010, month=11) " +
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 11f6842..9b641a0 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1604,14 +1604,6 @@
String.format("select * from functional.alltypes a join %sbadhint%s " +
"functional.alltypes b using (int_col)", prefix, suffix),
"JOIN hint not recognized: badhint");
- // Hints must be comma separated. Legacy-style hint does not parse because
- // of space-separated identifiers.
- if (!prefix.contains("[")) {
- AnalyzesOk(String.format(
- "select * from functional.alltypes a join %sbroadcast broadcast%s " +
- "functional.alltypes b using (int_col)", prefix, suffix),
- "JOIN hint not recognized: broadcast broadcast");
- }
AnalysisError(
String.format("select * from functional.alltypes a cross join %sshuffle%s " +
"functional.alltypes b", prefix, suffix),
@@ -1744,7 +1736,8 @@
AnalysisError(String.format(
"insert into table functional_hbase.alltypes %sshuffle%s " +
"select * from functional_hbase.alltypes", prefix, suffix),
- "INSERT hints are only supported for inserting into Hdfs and Kudu tables.");
+ "INSERT hints are only supported for inserting into Hdfs and Kudu tables: " +
+ "functional_hbase.alltypes");
// Conflicting plan hints.
AnalysisError("insert into table functional.alltypessmall " +
"partition (year, month) /* +shuffle,noshuffle */ " +
@@ -1766,6 +1759,46 @@
"insert into table functional.alltypessmall partition (year, month) " +
"/* +clustered,noclustered */ select * from functional.alltypes", prefix,
suffix), "Conflicting INSERT hints: clustered and noclustered");
+
+ // Below are tests for hints that are not supported by the legacy syntax.
+ if (prefix == "[") continue;
+
+ // Tests for sortby hint
+ AnalyzesOk(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %ssortby(int_col)%s select * from functional.alltypes",
+ prefix, suffix));
+ AnalyzesOk(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %sshuffle,clustered,sortby(int_col)%s select * from " +
+ "functional.alltypes", prefix, suffix));
+ AnalyzesOk(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %ssortby(int_col, bool_col)%s select * from " +
+ "functional.alltypes", prefix, suffix));
+ AnalyzesOk(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %sshuffle,clustered,sortby(int_col,bool_col)%s " +
+ "select * from functional.alltypes", prefix, suffix));
+ AnalyzesOk(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %sshuffle,sortby(int_col,bool_col),clustered%s " +
+ "select * from functional.alltypes", prefix, suffix));
+ AnalyzesOk(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %ssortby(int_col,bool_col),shuffle,clustered%s " +
+ "select * from functional.alltypes", prefix, suffix));
+ // Column in sortby hint must exist.
+ AnalysisError(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %ssortby(foo)%s select * from functional.alltypes",
+ prefix, suffix), "Could not find SORTBY hint column 'foo' in table.");
+ // Column in sortby hint must not be a Hdfs partition column.
+ AnalysisError(String.format("insert into functional.alltypessmall " +
+ "partition (year, month) %ssortby(year)%s select * from " +
+ "functional.alltypes", prefix, suffix),
+ "SORTBY hint column list must not contain Hdfs partition column: 'year'");
+ // Column in sortby hint must not be a Kudu primary key column.
+ AnalysisError(String.format("insert into functional_kudu.alltypessmall " +
+ "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
+ "SORTBY hint column list must not contain Kudu primary key column: 'id'");
+ // sortby() hint is not supported in UPSERT queries
+ AnalysisError(String.format("upsert into functional_kudu.alltypessmall " +
+ "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
+ "SORTBY hint is not supported in UPSERT statements.");
}
// Multiple non-conflicting hints and case insensitivity of hints.
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index ab2eaec..17a90b1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -209,6 +209,13 @@
ParsesOk("/* select 1; */ select 1");
ParsesOk("/** select 1; */ select 1");
ParsesOk("/* select */ select 1 /* 1 */");
+ ParsesOk("select 1 /* sortby(() */");
+ // Empty columns list in sortby hint
+ ParserError("select 1 /*+ sortby() */");
+ // Mismatching parentheses
+ ParserError("select 1 /*+ sortby(() */");
+ ParserError("select 1 /*+ sortby(a) \n");
+ ParserError("select 1 --+ sortby(a) */\n from t");
}
/**
@@ -230,6 +237,9 @@
ParserError("-- baz /*\nselect 1*/");
ParsesOk("select -- blah\n 1");
ParsesOk("select -- select 1\n 1");
+ ParsesOk("select 1 -- sortby(()");
+ // Mismatching parentheses
+ ParserError("select 1 -- +sortby(()\n");
}
/**
@@ -241,10 +251,10 @@
SelectStmt selectStmt = (SelectStmt) ParsesOk(stmt);
Preconditions.checkState(selectStmt.getTableRefs().size() > 1);
List<String> actualHints = Lists.newArrayList();
- assertEquals(null, selectStmt.getTableRefs().get(0).getJoinHints());
+ assertTrue(selectStmt.getTableRefs().get(0).getJoinHints().isEmpty());
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
- List<String> hints = selectStmt.getTableRefs().get(i).getJoinHints();
- if (hints != null) actualHints.addAll(hints);
+ List<PlanHint> hints = selectStmt.getTableRefs().get(i).getJoinHints();
+ for (PlanHint hint: hints) actualHints.add(hint.toString());
}
if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -255,8 +265,8 @@
Preconditions.checkState(selectStmt.getTableRefs().size() > 0);
List<String> actualHints = Lists.newArrayList();
for (int i = 0; i < selectStmt.getTableRefs().size(); ++i) {
- List<String> hints = selectStmt.getTableRefs().get(i).getTableHints();
- if (hints != null) actualHints.addAll(hints);
+ List<PlanHint> hints = selectStmt.getTableRefs().get(i).getTableHints();
+ for (PlanHint hint: hints) actualHints.add(hint.toString());
}
if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -267,10 +277,10 @@
Preconditions.checkState(selectStmt.getTableRefs().size() > 0);
List<String> actualHints = Lists.newArrayList();
for (int i = 0; i < selectStmt.getTableRefs().size(); ++i) {
- List<String> joinHints = selectStmt.getTableRefs().get(i).getJoinHints();
- if (joinHints != null) actualHints.addAll(joinHints);
- List<String> tableHints = selectStmt.getTableRefs().get(i).getTableHints();
- if (tableHints != null) actualHints.addAll(tableHints);
+ List<PlanHint> joinHints = selectStmt.getTableRefs().get(i).getJoinHints();
+ for (PlanHint hint: joinHints) actualHints.add(hint.toString());
+ List<PlanHint> tableHints = selectStmt.getTableRefs().get(i).getTableHints();
+ for (PlanHint hint: tableHints) actualHints.add(hint.toString());
}
if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -282,8 +292,10 @@
*/
private void TestSelectListHints(String stmt, String... expectedHints) {
SelectStmt selectStmt = (SelectStmt) ParsesOk(stmt);
- List<String> actualHints = selectStmt.getSelectList().getPlanHints();
- if (actualHints == null) actualHints = Lists.newArrayList((String) null);
+ List<String> actualHints = Lists.newArrayList();
+ List<PlanHint> hints = selectStmt.getSelectList().getPlanHints();
+ for (PlanHint hint: hints) actualHints.add(hint.toString());
+ if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
assertEquals(Lists.newArrayList(expectedHints), actualHints);
}
@@ -292,8 +304,10 @@
*/
private void TestInsertHints(String stmt, String... expectedHints) {
InsertStmt insertStmt = (InsertStmt) ParsesOk(stmt);
- List<String> actualHints = insertStmt.getPlanHints();
- if (actualHints == null) actualHints = Lists.newArrayList((String) null);
+ List<String> actualHints = Lists.newArrayList();
+ List<PlanHint> hints = insertStmt.getPlanHints();
+ for (PlanHint hint: hints) actualHints.add(hint.toString());
+ if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
assertEquals(Lists.newArrayList(expectedHints), actualHints);
}
@@ -407,22 +421,26 @@
suffix), "schedule_cache_local", "schedule_random_replica", "broadcast",
"schedule_remote");
+ TestSelectListHints(String.format(
+ "select %sfoo,bar,baz%s * from functional.alltypes a", prefix, suffix),
+ "foo", "bar", "baz");
+
// Test select-list hints (e.g., straight_join). The legacy-style hint has no
// prefix and suffix.
- if (prefix.contains("[")) {
- prefix = "";
- suffix = "";
- }
- TestSelectListHints(String.format(
- "select %sstraight_join%s * from functional.alltypes a", prefix, suffix),
- "straight_join");
- // Only the new hint-style is recognized
- if (!prefix.equals("")) {
+ {
+ String localPrefix = prefix;
+ String localSuffix = suffix;
+ if (prefix == "[") {
+ localPrefix = "";
+ localSuffix = "";
+ }
TestSelectListHints(String.format(
- "select %sfoo,bar,baz%s * from functional.alltypes a", prefix, suffix),
- "foo", "bar", "baz");
+ "select %sstraight_join%s * from functional.alltypes a", localPrefix,
+ localSuffix), "straight_join");
}
- if (prefix.isEmpty()) continue;
+
+ // Below are tests for hints that are not supported by the legacy syntax.
+ if (prefix == "[") continue;
// Test mixing commented hints and comments.
for (String[] commentStyle: commentStyles) {
@@ -439,6 +457,22 @@
TestSelectListHints(query, "straight_join");
TestJoinHints(query, "shuffle");
}
+
+ // Tests for hints with arguments.
+ TestInsertHints(String.format(
+ "insert into t %ssortby(a)%s select * from t", prefix, suffix),
+ "sortby(a)");
+ TestInsertHints(String.format(
+ "insert into t %sclustered,shuffle,sortby(a)%s select * from t", prefix,
+ suffix), "clustered", "shuffle", "sortby(a)");
+ TestInsertHints(String.format(
+ "insert into t %ssortby(a,b)%s select * from t", prefix, suffix),
+ "sortby(a,b)");
+ TestInsertHints(String.format(
+ "insert into t %ssortby(a , b)%s select * from t", prefix, suffix),
+ "sortby(a,b)");
+ ParserError(String.format(
+ "insert into t %ssortby( a , , ,,, b )%s select * from t", prefix, suffix));
}
// No "+" at the beginning so the comment is not recognized as a hint.
TestJoinHints("select * from functional.alltypes a join /* comment */" +
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index b5cf446..9514cc6 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -463,11 +463,11 @@
// Insert hint.
testToSql(String.format(
"insert into functional.alltypes(int_col, bool_col) " +
- "partition(year, month) %snoshuffle%s " +
+ "partition(year, month) %snoshuffle,sortby(int_col)%s " +
"select int_col, bool_col, year, month from functional.alltypes",
prefix, suffix),
"INSERT INTO TABLE functional.alltypes(int_col, bool_col) " +
- "PARTITION (year, month) \n-- +noshuffle\n " +
+ "PARTITION (year, month) \n-- +noshuffle,sortby(int_col)\n " +
"SELECT int_col, bool_col, year, month FROM functional.alltypes");
// Table hint
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index c3b8648..9831d21 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -46,7 +46,7 @@
hdfs == 2.0.2
docopt == 0.6.2
execnet == 1.4.0
-impyla == 0.11.2
+impyla == 0.14.0
bitarray == 0.8.1
sasl == 0.1.3
six == 1.9.0
diff --git a/testdata/bin/check-hbase-nodes.py b/testdata/bin/check-hbase-nodes.py
index 999a657..ffe7a7c 100755
--- a/testdata/bin/check-hbase-nodes.py
+++ b/testdata/bin/check-hbase-nodes.py
@@ -30,7 +30,7 @@
from contextlib import closing
from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NoNodeError, ConnectionLoss
from kazoo.handlers.threading import KazooTimeoutError
LOGGER = logging.getLogger('hbase_check')
@@ -43,6 +43,7 @@
ZK_HOSTS = '127.0.0.1:2181'
HBASE_NODES = ['/hbase/master', '/hbase/rs']
ADMIN_USER = 'admin'
+MAX_ZOOKEEPER_CONNECTION_RETRIES = 3
def parse_args():
@@ -128,12 +129,31 @@
timeout_seconds: Number of seconds to attempt to get node
Returns:
- 0 success, or else the number of unresponsive nodes
+ 0 success, or else the number of errors
"""
- with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
- errors = sum([check_znode(node, zk_client, timeout) for node in nodes])
- zk_client.stop()
- return errors
+ connection_retries = 0
+
+ while True:
+ with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
+ try:
+ return sum([check_znode(node, zk_client, timeout) for node in nodes])
+ except ConnectionLoss as e:
+ connection_retries += 1
+ if connection_retries > MAX_ZOOKEEPER_CONNECTION_RETRIES:
+ LOGGER.error("Max connection retries exceeded: {0}".format(str(e)))
+ raise
+ else:
+ err_msg = ("Zookeeper connection loss: retrying connection "
+ "({0} of {1} attempts)")
+ LOGGER.warn(err_msg.format(connection_retries,
+ MAX_ZOOKEEPER_CONNECTION_RETRIES))
+ time.sleep(1)
+ except Exception as e:
+ LOGGER.error("Unexpected error checking HBase node: {0}".format(str(e)))
+ raise
+ finally:
+ LOGGER.info("Stopping Zookeeper client")
+ zk_client.stop()
def is_hdfs_running(host, admin_user):
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index f201d0a..8e3adb9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -705,3 +705,76 @@
00:SCAN HDFS [functional.alltypesnopart]
partitions=1/1 files=0 size=0B
====
+# IMPALA-4163: sortby hint in insert statement adds sort node.
+insert into table functional.alltypes partition(year, month)
+ /*+ sortby(int_col, bool_col),shuffle */
+ select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+| partitions=24
+|
+01:SORT
+| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+| partitions=24
+|
+02:SORT
+| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4163: sortby hint in insert statement with noshuffle adds sort node.
+insert into table functional.alltypes partition(year, month)
+ /*+ sortby(int_col, bool_col),noshuffle */
+ select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+| partitions=24
+|
+01:SORT
+| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+| partitions=24
+|
+01:SORT
+| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4163: sortby hint in insert statement adds ordering columns to clustering sort.
+insert into table functional.alltypes partition(year, month)
+ /*+ sortby(int_col, bool_col),clustered */
+ select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+| partitions=24
+|
+01:SORT
+| order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+| partitions=24
+|
+02:SORT
+| order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+ partitions=24/24 files=24 size=478.45KB
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 776882d..9f2270f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -314,3 +314,15 @@
predicates: CAST(a.id AS STRING) > '123'
kudu predicates: a.id > 10
====
+# IMPALA-4662: Kudu analysis failure for NULL literal in IN list
+# NULL literal in values list results in applying predicate at scan node
+select id from functional_kudu.alltypestiny where
+id in (1, null) and string_col in (null) and bool_col in (null) and double_col in (null)
+and float_col in (null) and tinyint_col in (null) and smallint_col in (null) and
+bigint_col in (null)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.alltypestiny]
+ predicates: id IN (1, NULL), bigint_col IN (NULL), bool_col IN (NULL), double_col IN (NULL), float_col IN (NULL), smallint_col IN (NULL), string_col IN (NULL), tinyint_col IN (NULL)
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index 5601b35..9dfaf34 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -945,3 +945,29 @@
---- RESULTS
: 100
====
+---- QUERY
+# IMPALA-4163: insert into table sortby() plan hint
+insert into table alltypesinsert
+partition (year, month) /*+ clustered,shuffle,sortby(int_col, bool_col) */
+select * from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 2
+year=2009/month=2/: 2
+year=2009/month=3/: 2
+year=2009/month=4/: 2
+====
+---- QUERY
+# IMPALA-4163: insert into table sortby() plan hint
+insert into table alltypesnopart_insert
+/*+ clustered,shuffle,sortby(int_col, bool_col) */
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
+double_col, date_string_col, string_col, timestamp_col from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+: 8
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test b/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
index 36104c3..1aa5c51 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
@@ -133,3 +133,23 @@
---- TYPES
STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
====
+---- QUERY
+# Tests case-sensitivity of string-typed partition columns.
+alter table p1 add partition (j=2,k="D");
+alter table p1 add partition (j=2,k="E");
+alter table p1 add partition (j=2,k="F");
+====
+---- QUERY
+show partitions p1
+---- RESULTS
+'NULL','g',-1,0,regex:.+,regex:.+,regex:.+,regex:.+,regex:.+,regex:.*/test-warehouse/.+/p1/j=__HIVE_DEFAULT_PARTITION__/k=g
+'2','D',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=D
+'2','E',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=E
+'2','F',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=F
+'2','d',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=d
+'2','e',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=e
+'2','f',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=f
+'Total','',0,0,regex:.+,regex:.+,'','','',''
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index fb24f65..690c999 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -48,7 +48,8 @@
# 7) If a query errored, verify that memory was overcommitted during execution and the
# error is a mem limit exceeded error. There is no other reason a query should error
# and any such error will cause the stress test to stop.
-# 8) Verify the result set hash of successful queries.
+# 8) Verify the result set hash of successful queries if there are no DML queries in the
+# current run.
from __future__ import print_function
@@ -74,6 +75,7 @@
import tests.util.test_file_parser as test_file_parser
from tests.comparison.cluster import Timeout
+from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
from tests.comparison.model_translator import SqlWriter
from tests.comparison.query_generator import QueryGenerator
from tests.comparison.query_profile import DefaultProfile
@@ -91,7 +93,7 @@
MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
# The version of the file format containing the collected query runtime info.
-RUNTIME_INFO_FILE_VERSION = 2
+RUNTIME_INFO_FILE_VERSION = 3
def create_and_start_daemon_thread(fn, name):
@@ -112,7 +114,8 @@
thread_names = dict([(t.ident, t.name) for t in threading.enumerate()])
stacks = list()
for thread_id, stack in sys._current_frames().items():
- stacks.append("\n# Thread: %s(%d)"
+ stacks.append(
+ "\n# Thread: %s(%d)"
% (thread_names.get(thread_id, "No name"), thread_id))
for filename, lineno, name, line in traceback.extract_stack(stack):
stacks.append('File: "%s", line %d, in %s' % (filename, lineno, name))
@@ -128,8 +131,8 @@
def print_crash_info_if_exists(impala, start_time):
"""If any impalads are found not running, they will assumed to have crashed and an
- error message will be printed to stderr for each stopped impalad. Returns a value
- that evaluates to True if any impalads are stopped.
+ error message will be printed to stderr for each stopped impalad. Returns a value
+ that evaluates to True if any impalads are stopped.
"""
max_attempts = 5
for remaining_attempts in xrange(max_attempts - 1, -1, -1):
@@ -137,11 +140,12 @@
crashed_impalads = impala.find_crashed_impalads(start_time)
break
except Timeout as e:
- LOG.info("Timeout checking if impalads crashed: %s." % e +
- (" Will retry." if remaining_attempts else ""))
+ LOG.info(
+ "Timeout checking if impalads crashed: %s."
+ % e + (" Will retry." if remaining_attempts else ""))
else:
- LOG.error("Aborting after %s failed attempts to check if impalads crashed",
- max_attempts)
+ LOG.error(
+ "Aborting after %s failed attempts to check if impalads crashed", max_attempts)
raise e
for message in crashed_impalads.itervalues():
print(message, file=sys.stderr)
@@ -164,20 +168,20 @@
class MemBroker(object):
"""Provides memory usage coordination for clients running in different processes.
- The broker fulfills reservation requests by blocking as needed so total memory
- used by clients never exceeds the total available memory (including an
- 'overcommitable' amount).
+ The broker fulfills reservation requests by blocking as needed so total memory
+ used by clients never exceeds the total available memory (including an
+ 'overcommitable' amount).
- The lock built in to _available is also used to protect access to other members.
+ The lock built in to _available is also used to protect access to other members.
- The state stored in this class is actually an encapsulation of part of the state
- of the StressRunner class below. The state here is separated for clarity.
+ The state stored in this class is actually an encapsulation of part of the state
+ of the StressRunner class below. The state here is separated for clarity.
"""
def __init__(self, real_mem_mb, overcommitable_mem_mb):
"""'real_mem_mb' memory should be the amount of memory that each impalad is able
- to use. 'overcommitable_mem_mb' is the amount of memory that will be dispensed
- over the 'real' amount.
+ to use. 'overcommitable_mem_mb' is the amount of memory that will be dispensed
+ over the 'real' amount.
"""
self._total_mem_mb = real_mem_mb + overcommitable_mem_mb
self._available = Value("i", self._total_mem_mb)
@@ -210,16 +214,16 @@
@contextmanager
def reserve_mem_mb(self, mem_mb):
"""Blocks until the requested amount of memory is available and taken for the caller.
- This function should be used in a 'with' block. The taken memory will
- automatically be released when the 'with' context exits. A numeric id is returned
- so clients can compare against 'last_overcommitted_reservation_id' to see if
- memory was overcommitted since the reservation was obtained.
+ This function should be used in a 'with' block. The taken memory will
+ automatically be released when the 'with' context exits. A numeric id is returned
+ so clients can compare against 'last_overcommitted_reservation_id' to see if
+ memory was overcommitted since the reservation was obtained.
- with broker.reserve_mem_mb(100) as reservation_id:
- # Run query using 100 MB of memory
- if <query failed>:
- # Immediately check broker.was_overcommitted(reservation_id) to see if
- # memory was overcommitted.
+ with broker.reserve_mem_mb(100) as reservation_id:
+ # Run query using 100 MB of memory
+ if <query failed>:
+ # Immediately check broker.was_overcommitted(reservation_id) to see if
+ # memory was overcommitted.
"""
reservation_id = self._wait_until_reserved(mem_mb)
try:
@@ -232,8 +236,9 @@
with self._available.get_lock():
if req <= self._available.value:
self._available.value -= req
- LOG.debug("Reserved %s MB; %s MB available; %s MB overcommitted", req,
- self._available.value, self.overcommitted_mem_mb)
+ LOG.debug(
+ "Reserved %s MB; %s MB available; %s MB overcommitted",
+ req, self._available.value, self.overcommitted_mem_mb)
reservation_id = self._next_reservation_id.value
increment(self._next_reservation_id)
if self.overcommitted_mem_mb > 0:
@@ -244,23 +249,24 @@
def _release(self, req):
with self._available.get_lock():
self._available.value += req
- LOG.debug("Released %s MB; %s MB available; %s MB overcommitted", req,
- self._available.value, self.overcommitted_mem_mb)
+ LOG.debug(
+ "Released %s MB; %s MB available; %s MB overcommitted",
+ req, self._available.value, self.overcommitted_mem_mb)
def was_overcommitted(self, reservation_id):
"""Returns True if memory was overcommitted since the given reservation was made.
- For an accurate return value, this should be called just after the query ends
- or while the query is still running.
+ For an accurate return value, this should be called just after the query ends
+ or while the query is still running.
"""
return reservation_id <= self._last_overcommitted_reservation_id.value
class StressRunner(object):
"""This class contains functionality related to producing/consuming queries for the
- purpose of stress testing Impala.
+ purpose of stress testing Impala.
- Queries will be executed in separate processes since python threading is limited
- to the use of a single CPU.
+ Queries will be executed in separate processes since python threading is limited
+ to the use of a single CPU.
"""
# This is the point at which the work queue will block because it is full.
@@ -270,6 +276,8 @@
self.use_kerberos = False
self.common_query_options = {}
self._mem_broker = None
+ self._verify_results = True
+ self._select_probability = None
# Synchronized blocking work queue for producer/consumers.
self._query_queue = Queue(self.WORK_QUEUE_CAPACITY)
@@ -306,7 +314,8 @@
self._num_successive_errors = Value("i", 0)
self.result_hash_log_dir = gettempdir()
- self._status_headers = [" Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
+ self._status_headers = [
+ " Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
"Err", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
self._num_queries_to_run = None
@@ -315,25 +324,31 @@
self._query_consumer_thread = None
self._mem_polling_thread = None
- def run_queries(self, queries, impala, num_queries_to_run, mem_overcommit_pct,
- should_print_status):
+ def run_queries(
+ self, queries, impala, num_queries_to_run, mem_overcommit_pct, should_print_status,
+ verify_results, select_probability
+ ):
"""Runs queries randomly chosen from 'queries' and stops after 'num_queries_to_run'
- queries have completed.
+ queries have completed. 'select_probability' should be float between 0 and 1, it
+ determines the likelihood of choosing a select query (as opposed to a DML query,
+ for example).
- Before a query is run, a mem limit will be chosen. 'spill_probability' determines
- the likelihood of choosing a mem limit that will cause spilling. To induce
- spilling, a value is randomly chosen below the min memory needed to avoid spilling
- but above the min memory needed with spilling. So the min/max query memory
- requirements must be determined before calling this method.
+ Before a query is run, a mem limit will be chosen. 'spill_probability' determines
+ the likelihood of choosing a mem limit that will cause spilling. To induce
+ spilling, a value is randomly chosen below the min memory needed to avoid spilling
+ but above the min memory needed with spilling. So the min/max query memory
+ requirements must be determined before calling this method.
- If 'mem_overcommit_pct' is zero, an exception will be raised if any queries
- fail for any reason other than cancellation (controlled by the 'cancel_probability'
- property), since each query should have enough memory to run successfully. If
- non-zero, failures due to insufficient memory will be ignored if memory was
- overcommitted at any time during execution.
+ If 'mem_overcommit_pct' is zero, an exception will be raised if any queries
+ fail for any reason other than cancellation (controlled by the 'cancel_probability'
+ property), since each query should have enough memory to run successfully. If
+ non-zero, failures due to insufficient memory will be ignored if memory was
+ overcommitted at any time during execution.
- If a query completes without error, the result will be verified. An error
- will be raised upon a result mismatch.
+ If a query completes without error, the result will be verified if 'verify_results'
+ is True. An error will be raised upon a result mismatch. 'verify_results' should be
+ false for the case where the expected results are not known in advance, if we are
+ running DML queries, for example.
"""
# TODO: The state from a previous run should be cleared out. This isn't really a
# problem now because the one caller (main()) never calls a second time.
@@ -346,9 +361,13 @@
# If there is a crash, start looking for errors starting from this time.
start_time = datetime.now()
- self._mem_broker = MemBroker(impala.min_impalad_mem_mb,
+ self._mem_broker = MemBroker(
+ impala.min_impalad_mem_mb,
int(impala.min_impalad_mem_mb * mem_overcommit_pct / 100))
+ self._verify_results = verify_results
+ self._select_probability = select_probability
+
# Print the status to show the state before starting.
if should_print_status:
self._print_status_header()
@@ -363,9 +382,11 @@
# Wait for everything to finish.
sleep_secs = 0.1
- while self._query_producer_thread.is_alive() \
- or self._query_consumer_thread.is_alive() \
- or self._query_runners:
+ while (
+ self._query_producer_thread.is_alive() or
+ self._query_consumer_thread.is_alive() or
+ self._query_runners
+ ):
if self._query_producer_thread.error or self._query_consumer_thread.error:
# This is bad enough to abort early. A failure here probably means there's a
# bug in this script. The mem poller could be checked for an error too. It is
@@ -382,9 +403,12 @@
sys.exit(runner.exitcode)
LOG.info("No crashes detected")
checked_for_crashes = True
- if self._num_successive_errors.value \
- >= self.num_successive_errors_needed_to_abort:
- print("Aborting due to %s successive errors encountered"
+ if (
+ self._num_successive_errors.value >=
+ self.num_successive_errors_needed_to_abort
+ ):
+ print(
+ "Aborting due to %s successive errors encountered"
% self._num_successive_errors.value, file=sys.stderr)
sys.exit(1)
del self._query_runners[idx]
@@ -392,9 +416,11 @@
if should_print_status:
last_report_secs += sleep_secs
if last_report_secs > 5:
- if not self._query_producer_thread.is_alive() \
- or not self._query_consumer_thread.is_alive() \
- or not self._query_runners:
+ if (
+ not self._query_producer_thread.is_alive() or
+ not self._query_consumer_thread.is_alive() or
+ not self._query_runners
+ ):
LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive())
LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive())
LOG.debug("Queue size: %s" % self._query_queue.qsize())
@@ -412,15 +438,32 @@
def _start_producing_queries(self, queries):
def enqueue_queries():
+ # Generate a dict(query type -> list of queries).
+ queries_by_type = {}
+ for query in queries:
+ if query.query_type not in queries_by_type:
+ queries_by_type[query.query_type] = []
+ queries_by_type[query.query_type].append(query)
try:
for _ in xrange(self._num_queries_to_run):
- self._query_queue.put(choice(queries))
+ # First randomly determine a query type, then choose a random query of that
+ # type.
+ if (
+ QueryType.SELECT in queries_by_type and
+ (len(queries_by_type.keys()) == 1 or random() < self._select_probability)
+ ):
+ result = choice(queries_by_type[QueryType.SELECT])
+ else:
+ query_type = choice([
+ key for key in queries_by_type if key != QueryType.SELECT])
+ result = choice(queries_by_type[query_type])
+ self._query_queue.put(result)
except Exception as e:
LOG.error("Error producing queries: %s", e)
current_thread().error = e
raise e
- self._query_producer_thread = create_and_start_daemon_thread(enqueue_queries,
- "Query Producer")
+ self._query_producer_thread = create_and_start_daemon_thread(
+ enqueue_queries, "Query Producer")
def _start_consuming_queries(self, impala):
def start_additional_runners_if_needed():
@@ -467,9 +510,11 @@
query_sumbission_is_locked = False
ready_to_unlock = None
- if not query_sumbission_is_locked \
- and self.leak_check_interval_mins \
- and time() > self._next_leak_check_unix_time.value:
+ if (
+ not query_sumbission_is_locked and
+ self.leak_check_interval_mins and
+ time() > self._next_leak_check_unix_time.value
+ ):
assert self._num_queries_running <= len(self._query_runners), \
"Each running query should belong to a runner"
LOG.debug("Stopping query submission")
@@ -505,8 +550,8 @@
if query_sumbission_is_locked:
LOG.debug("Resuming query submission")
self._submit_query_lock.release()
- self._mem_polling_thread = create_and_start_daemon_thread(poll_mem_usage,
- "Mem Usage Poller")
+ self._mem_polling_thread = create_and_start_daemon_thread(
+ poll_mem_usage, "Mem Usage Poller")
def _get_mem_usage_values(self, reset=False):
reported = None
@@ -534,7 +579,7 @@
def _start_single_runner(self, impalad):
"""Consumer function to take a query of the queue and run it. This is intended to
- run in a separate process so validating the result set can use a full CPU.
+ run in a separate process so validating the result set can use a full CPU.
"""
LOG.debug("New query runner started")
runner = QueryRunner()
@@ -564,7 +609,8 @@
mem_limit = query.required_mem_mb_without_spilling
solo_runtime = query.solo_runtime_secs_without_spilling
else:
- mem_limit = randrange(query.required_mem_mb_with_spilling,
+ mem_limit = randrange(
+ query.required_mem_mb_with_spilling,
query.required_mem_mb_without_spilling + 1)
solo_runtime = query.solo_runtime_secs_with_spilling
@@ -583,8 +629,8 @@
if should_cancel:
timeout = randrange(1, max(int(solo_runtime), 2))
else:
- timeout = solo_runtime * max(10, self._num_queries_started.value
- - self._num_queries_finished.value)
+ timeout = solo_runtime * max(
+ 10, self._num_queries_started.value - self._num_queries_finished.value)
report = runner.run_query(query, timeout, mem_limit)
LOG.debug("Got execution report for query")
if report.timed_out and should_cancel:
@@ -609,26 +655,34 @@
# The server may fail to respond to clients if the load is high. An error
# message with "connect()...Connection timed out" comes from the impalad so
# that will not be ignored.
- if ("Connection timed out" in error_msg and "connect()" not in error_msg) \
- or "ECONNRESET" in error_msg \
- or "couldn't get a client" in error_msg \
- or "timeout: timed out" in error_msg:
+ if (
+ ("Connection timed out" in error_msg and "connect()" not in error_msg) or
+ "ECONNRESET" in error_msg or
+ "couldn't get a client" in error_msg or
+ "timeout: timed out" in error_msg
+ ):
self._num_successive_errors.value = 0
continue
increment(self._num_successive_errors)
increment(self._num_other_errors)
raise Exception("Query failed: %s" % str(report.non_mem_limit_error))
- if report.mem_limit_exceeded \
- and not self._mem_broker.was_overcommitted(reservation_id):
+ if (
+ report.mem_limit_exceeded and
+ not self._mem_broker.was_overcommitted(reservation_id)
+ ):
increment(self._num_successive_errors)
- raise Exception("Unexpected mem limit exceeded; mem was not overcommitted\n"
+ raise Exception(
+ "Unexpected mem limit exceeded; mem was not overcommitted\n"
"Profile: %s" % report.profile)
- if not report.mem_limit_exceeded \
- and not report.timed_out \
- and report.result_hash != query.result_hash:
+ if (
+ not report.mem_limit_exceeded and
+ not report.timed_out and
+ (self._verify_results and report.result_hash != query.result_hash)
+ ):
increment(self._num_successive_errors)
increment(self._num_result_mismatches)
- raise Exception("Result hash mismatch; expected %s, got %s\nQuery: %s"
+ raise Exception(
+ "Result hash mismatch; expected %s, got %s\nQuery: %s"
% (query.result_hash, report.result_hash, query.sql))
self._num_successive_errors.value = 0
@@ -665,18 +719,34 @@
pass
+class QueryType(object):
+ COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
+
+
class Query(object):
"""Contains a SQL statement along with expected runtime information."""
def __init__(self):
self.name = None
self.sql = None
+ # In order to be able to make good estimates for DML queries in the binary search,
+ # we need to bring the table to a good initial state before excuting the sql. Running
+ # set_up_sql accomplishes this task.
+ self.set_up_sql = None
self.db_name = None
self.result_hash = None
self.required_mem_mb_with_spilling = None
self.required_mem_mb_without_spilling = None
self.solo_runtime_secs_with_spilling = None
self.solo_runtime_secs_without_spilling = None
+ # Query options to set before running the query.
+ self.options = {}
+ # Determines the order in which we will populate query runtime info. Queries with the
+ # lowest population_order property will be handled first.
+ self.population_order = 0
+ # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
+ # UPSERT, DELETE.
+ self.query_type = QueryType.SELECT
def __repr__(self):
return dedent("""
@@ -686,13 +756,17 @@
Solo Runtime: %(solo_runtime_secs_with_spilling)s
Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
DB: %(db_name)s
- SQL: %(sql)s>""".strip() % self.__dict__)
+ Options: %(options)s
+ Set up SQL: %(set_up_sql)s>
+ SQL: %(sql)s>
+ Population order: %(population_order)r>
+ """.strip() % self.__dict__)
class QueryRunner(object):
"""Encapsulates functionality to run a query and provide a runtime report."""
- SPILLED_PATTERNS= [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
+ SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
BATCH_SIZE = 1024
def __init__(self):
@@ -711,8 +785,11 @@
self.impalad_conn.close()
self.impalad_conn = None
- def run_query(self, query, timeout_secs, mem_limit_mb):
- """Run a query and return an execution report."""
+ def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False):
+ """Run a query and return an execution report. If 'run_set_up' is True, set up sql
+ will be executed before the main query. This should be the case during the binary
+ search phase of the stress test.
+ """
if not self.impalad_conn:
raise Exception("connect() must first be called")
@@ -721,23 +798,32 @@
try:
with self.impalad_conn.cursor() as cursor:
start_time = time()
+ if query.db_name:
+ LOG.debug("Using %s database", query.db_name)
+ cursor.execute("USE %s" % query.db_name)
+ if run_set_up and query.set_up_sql:
+ LOG.debug("Running set up query:\n%s", self.set_up_sql)
+ cursor.execute(query.set_up_sql)
for query_option, value in self.common_query_options.iteritems():
cursor.execute(
"SET {query_option}={value}".format(query_option=query_option, value=value))
+ for query_option, value in query.options.iteritems():
+ cursor.execute(
+ "SET {query_option}={value}".format(query_option=query_option, value=value))
cursor.execute("SET ABORT_ON_ERROR=1")
LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
- if query.db_name:
- LOG.debug("Using %s database", query.db_name)
- cursor.execute("USE %s" % query.db_name)
- LOG.debug("Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+ LOG.debug(
+ "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
error = None
try:
- cursor.execute_async("/* Mem: %s MB. Coordinator: %s. */\n"
+ cursor.execute_async(
+ "/* Mem: %s MB. Coordinator: %s. */\n"
% (mem_limit_mb, self.impalad.host_name) + query.sql)
- LOG.debug("Query id is %s",
- op_handle_to_query_id(cursor._last_operation_handle))
+ LOG.debug(
+ "Query id is %s", op_handle_to_query_id(cursor._last_operation.handle if
+ cursor._last_operation else None))
sleep_secs = 0.1
secs_since_log = 0
while cursor.is_executing():
@@ -749,24 +835,32 @@
LOG.debug("Waiting for query to execute")
sleep(sleep_secs)
secs_since_log += sleep_secs
- try:
- report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
- except QueryTimeout:
- self._cancel(cursor, report)
- return report
+ if query.query_type == QueryType.SELECT:
+ try:
+ report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
+ except QueryTimeout:
+ self._cancel(cursor, report)
+ return report
+ else:
+ # If query is in error state, this will raise an exception
+ cursor._wait_to_finish()
except Exception as error:
- LOG.debug("Error running query with id %s: %s",
- op_handle_to_query_id(cursor._last_operation_handle), error)
+ LOG.debug(
+ "Error running query with id %s: %s",
+ op_handle_to_query_id(cursor._last_operation.handle if
+ cursor._last_operation else None), error)
self._check_for_mem_limit_exceeded(report, cursor, error)
if report.non_mem_limit_error or report.mem_limit_exceeded:
return report
report.runtime_secs = time() - start_time
- if self.check_if_mem_was_spilled:
+ if cursor.execution_failed() or self.check_if_mem_was_spilled:
# Producing a query profile can be somewhat expensive. A v-tune profile of
# impalad showed 10% of cpu time spent generating query profiles.
report.profile = cursor.get_profile()
- report.mem_was_spilled = any([pattern.search(report.profile) is not None
- for pattern in QueryRunner.SPILLED_PATTERNS])
+ report.mem_was_spilled = any([
+ pattern.search(report.profile) is not None
+ for pattern in QueryRunner.SPILLED_PATTERNS])
+ report.mem_limit_exceeded = "Memory limit exceeded" in report.profile
except Exception as error:
# A mem limit error would have been caught above, no need to check for that here.
report.non_mem_limit_error = error
@@ -776,7 +870,7 @@
report.timed_out = True
# Copy the operation handle in case another thread causes the handle to be reset.
- operation_handle = cursor._last_operation_handle
+ operation_handle = cursor._last_operation.handle if cursor._last_operation else None
if not operation_handle:
return
@@ -795,14 +889,15 @@
def _check_for_mem_limit_exceeded(self, report, cursor, caught_exception):
"""To be called after a query failure to check for signs of failed due to a
- mem limit. The report will be updated accordingly.
+ mem limit. The report will be updated accordingly.
"""
- if cursor._last_operation_handle:
+ if cursor._last_operation:
try:
report.profile = cursor.get_profile()
except Exception as e:
- LOG.debug("Error getting profile for query with id %s: %s",
- op_handle_to_query_id(cursor._last_operation_handle), e)
+ LOG.debug(
+ "Error getting profile for query with id %s: %s",
+ op_handle_to_query_id(cursor._last_operation.handle), e)
caught_msg = str(caught_exception).lower().strip()
# Exceeding a mem limit may result in the message "cancelled".
@@ -817,25 +912,32 @@
# exceeding the mem_limit could be something like:
# Metadata states that in group hdfs://<node>:8020<path> there are <X> rows,
# but only <Y> rows were read.
- if "metadata states that in group" in caught_msg \
- and "rows were read" in caught_msg:
+ if (
+ "metadata states that in group" in caught_msg and
+ "rows were read" in caught_msg
+ ):
report.mem_limit_exceeded = True
return
- LOG.debug("Non-mem limit error for query with id %s: %s",
- op_handle_to_query_id(cursor._last_operation_handle), caught_exception,
+ LOG.debug(
+ "Non-mem limit error for query with id %s: %s",
+ op_handle_to_query_id(
+ cursor._last_operation.handle if cursor._last_operation else None),
+ caught_exception,
exc_info=True)
report.non_mem_limit_error = caught_exception
def _hash_result(self, cursor, timeout_unix_time, query):
"""Returns a hash that is independent of row order. 'query' is only used for debug
- logging purposes (if the result is not as expected a log file will be left for
- investigations).
+ logging purposes (if the result is not as expected a log file will be left for
+ investigations).
"""
- query_id = op_handle_to_query_id(cursor._last_operation_handle)
+ query_id = op_handle_to_query_id(cursor._last_operation.handle if
+ cursor._last_operation else None)
# A value of 1 indicates that the hash thread should continue to work.
should_continue = Value("i", 1)
+
def hash_result_impl():
result_log = None
try:
@@ -848,12 +950,16 @@
result_log.write("\n")
current_thread().result = 1
while should_continue.value:
- LOG.debug("Fetching result for query with id %s",
- op_handle_to_query_id(cursor._last_operation_handle))
+ LOG.debug(
+ "Fetching result for query with id %s",
+ op_handle_to_query_id(
+ cursor._last_operation.handle if cursor._last_operation else None))
rows = cursor.fetchmany(self.BATCH_SIZE)
if not rows:
- LOG.debug("No more results for query with id %s",
- op_handle_to_query_id(cursor._last_operation_handle))
+ LOG.debug(
+ "No more results for query with id %s",
+ op_handle_to_query_id(
+ cursor._last_operation.handle if cursor._last_operation else None))
return
for row in rows:
for idx, val in enumerate(row):
@@ -882,12 +988,14 @@
finally:
if result_log is not None:
result_log.close()
- if current_thread().error is not None \
- and current_thread().result == query.result_hash:
+ if (
+ current_thread().error is not None and
+ current_thread().result == query.result_hash
+ ):
os.remove(result_log.name)
- hash_thread = create_and_start_daemon_thread(hash_result_impl,
- "Fetch Results %s" % query_id)
+ hash_thread = create_and_start_daemon_thread(
+ hash_result_impl, "Fetch Results %s" % query_id)
hash_thread.join(max(timeout_unix_time - time(), 0))
if hash_thread.is_alive():
should_continue.value = 0
@@ -900,11 +1008,12 @@
def load_tpc_queries(workload, load_in_kudu=False):
"""Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'.
If 'load_in_kudu' is True, it loads only queries specified for the Kudu storage
- engine."""
+ engine.
+ """
LOG.info("Loading %s queries", workload)
queries = list()
- query_dir = os.path.join(os.path.dirname(__file__), "..", "..",
- "testdata", "workloads", workload, "queries")
+ query_dir = os.path.join(
+ os.path.dirname(__file__), "..", "..", "testdata", "workloads", workload, "queries")
engine = 'kudu-' if load_in_kudu else ''
file_name_pattern = re.compile(r"%s-%s(q\d+).test$" % (workload, engine))
for query_file in os.listdir(query_dir):
@@ -914,7 +1023,8 @@
file_path = os.path.join(query_dir, query_file)
file_queries = load_queries_from_test_file(file_path)
if len(file_queries) != 1:
- raise Exception("Expected exactly 1 query to be in file %s but got %s"
+ raise Exception(
+ "Expected exactly 1 query to be in file %s but got %s"
% (file_path, len(file_queries)))
query = file_queries[0]
query.name = match.group(1)
@@ -934,13 +1044,15 @@
return queries
-def load_random_queries_and_populate_runtime_info(query_generator, model_translator,
- tables, db_name, impala, use_kerberos, query_count, query_timeout_secs,
- result_hash_log_dir):
+def load_random_queries_and_populate_runtime_info(
+ query_generator, model_translator, tables, db_name, impala, use_kerberos, query_count,
+ query_timeout_secs, result_hash_log_dir
+):
"""Returns a list of random queries. Each query will also have its runtime info
- populated. The runtime info population also serves to validate the query.
+ populated. The runtime info population also serves to validate the query.
"""
LOG.info("Generating random queries")
+
def generate_candidates():
while True:
query_model = query_generator.create_query(tables)
@@ -949,20 +1061,26 @@
query.sql = sql
query.db_name = db_name
yield query
- return populate_runtime_info_for_random_queries(impala, use_kerberos,
- generate_candidates(), query_count, query_timeout_secs, result_hash_log_dir)
+ return populate_runtime_info_for_random_queries(
+ impala, use_kerberos, generate_candidates(), query_count, query_timeout_secs,
+ result_hash_log_dir)
-def populate_runtime_info_for_random_queries(impala, use_kerberos, candidate_queries,
- query_count, query_timeout_secs, result_hash_log_dir):
+def populate_runtime_info_for_random_queries(
+ impala, use_kerberos, candidate_queries,
+ query_count, query_timeout_secs, result_hash_log_dir
+):
"""Returns a list of random queries. Each query will also have its runtime info
- populated. The runtime info population also serves to validate the query.
+ populated. The runtime info population also serves to validate the query.
"""
start_time = datetime.now()
queries = list()
+ # TODO(IMPALA-4632): Consider running reset_databases() here if we want to extend DML
+ # functionality to random stress queries as well.
for query in candidate_queries:
try:
- populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
+ populate_runtime_info(
+ query, impala, use_kerberos, result_hash_log_dir,
timeout_secs=query_timeout_secs)
queries.append(query)
except Exception as e:
@@ -970,27 +1088,30 @@
# query generator bugs).
if print_crash_info_if_exists(impala, start_time):
raise e
- LOG.warn("Error running query (the test will continue)\n%s\n%s", e, query.sql,
- exc_info=True)
+ LOG.warn(
+ "Error running query (the test will continue)\n%s\n%s",
+ e, query.sql, exc_info=True)
if len(queries) == query_count:
break
return queries
-def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
- timeout_secs=maxint, samples=1, max_conflicting_samples=0):
+def populate_runtime_info(
+ query, impala, use_kerberos, result_hash_log_dir,
+ timeout_secs=maxint, samples=1, max_conflicting_samples=0
+):
"""Runs the given query by itself repeatedly until the minimum memory is determined
- with and without spilling. Potentially all fields in the Query class (except
- 'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
- the corresponding runtime field may still be None if the query could not be run
- without spilling.
+ with and without spilling. Potentially all fields in the Query class (except
+ 'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
+ the corresponding runtime field may still be None if the query could not be run
+ without spilling.
- 'samples' and 'max_conflicting_samples' control the reliability of the collected
- information. The problem is that memory spilling or usage may differ (by a large
- amount) from run to run due to races during execution. The parameters provide a way
- to express "X out of Y runs must have resulted in the same outcome". Increasing the
- number of samples and decreasing the tolerance (max conflicts) increases confidence
- but also increases the time to collect the data.
+ 'samples' and 'max_conflicting_samples' control the reliability of the collected
+ information. The problem is that memory spilling or usage may differ (by a large
+ amount) from run to run due to races during execution. The parameters provide a way
+ to express "X out of Y runs must have resulted in the same outcome". Increasing the
+ number of samples and decreasing the tolerance (max conflicts) increases confidence
+ but also increases the time to collect the data.
"""
LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
runner = QueryRunner()
@@ -1015,12 +1136,16 @@
def update_runtime_info():
required_mem = min(mem_limit, impala.min_impalad_mem_mb)
if report.mem_was_spilled:
- if query.required_mem_mb_with_spilling is None \
- or required_mem < query.required_mem_mb_with_spilling:
+ if (
+ query.required_mem_mb_with_spilling is None or
+ required_mem < query.required_mem_mb_with_spilling
+ ):
query.required_mem_mb_with_spilling = required_mem
query.solo_runtime_secs_with_spilling = report.runtime_secs
- elif query.required_mem_mb_without_spilling is None \
- or required_mem < query.required_mem_mb_without_spilling:
+ elif (
+ query.required_mem_mb_without_spilling is None or
+ required_mem < query.required_mem_mb_without_spilling
+ ):
query.required_mem_mb_without_spilling = required_mem
query.solo_runtime_secs_without_spilling = report.runtime_secs
@@ -1028,7 +1153,7 @@
reports_by_outcome = defaultdict(list)
leading_outcome = None
for remaining_samples in xrange(samples - 1, -1, -1):
- report = runner.run_query(query, timeout_secs, mem_limit)
+ report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
if report.timed_out:
raise QueryTimeout()
if report.non_mem_limit_error:
@@ -1038,8 +1163,9 @@
if query.result_hash is None:
query.result_hash = report.result_hash
elif query.result_hash != report.result_hash:
- raise Exception("Result hash mismatch; expected %s, got %s"
- % (query.result_hash, report.result_hash))
+ raise Exception(
+ "Result hash mismatch; expected %s, got %s" %
+ (query.result_hash, report.result_hash))
if report.mem_limit_exceeded:
outcome = "EXCEEDED"
@@ -1055,8 +1181,10 @@
leading_outcome = outcome
if len(reports_by_outcome[leading_outcome]) + max_conflicting_samples == samples:
break
- if len(reports_by_outcome[leading_outcome]) + remaining_samples \
- < samples - max_conflicting_samples:
+ if (
+ len(reports_by_outcome[leading_outcome]) + remaining_samples <
+ samples - max_conflicting_samples
+ ):
return
if desired_outcome \
and len(reports_by_outcome[desired_outcome]) + remaining_samples \
@@ -1076,8 +1204,8 @@
if report and report.mem_limit_exceeded:
limit_exceeded_mem = mem_limit
if mem_limit == impala.min_impalad_mem_mb:
- LOG.warn("Query could not be run even when using all available memory\n%s",
- query.sql)
+ LOG.warn(
+ "Query couldn't be run even when using all available memory\n%s", query.sql)
return
mem_limit = min(2 * mem_limit, impala.min_impalad_mem_mb)
continue
@@ -1097,8 +1225,8 @@
old_required_mem_mb_without_spilling = None
else:
mem_limit = (lower_bound + upper_bound) / 2
- should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC \
- or upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+ should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC or \
+ upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
report = get_report(desired_outcome=("NOT_SPILLED" if spill_mem else None))
if not report:
lower_bound = mem_limit
@@ -1120,13 +1248,13 @@
break
lower_bound = upper_bound = impala.min_impalad_mem_mb
# This value may be updated during the search for the absolute minimum.
- LOG.info("Minimum memory to avoid spilling is %s MB"
- % query.required_mem_mb_without_spilling)
+ LOG.info(
+ "Minimum memory to avoid spilling: %s MB" % query.required_mem_mb_without_spilling)
LOG.info("Finding absolute minimum memory required")
lower_bound = limit_exceeded_mem
- upper_bound = min(spill_mem or maxint, non_spill_mem or maxint,
- impala.min_impalad_mem_mb)
+ upper_bound = min(
+ spill_mem or maxint, non_spill_mem or maxint, impala.min_impalad_mem_mb)
while True:
if old_required_mem_mb_with_spilling:
mem_limit = old_required_mem_mb_with_spilling
@@ -1147,13 +1275,16 @@
query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
break
LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
- if query.required_mem_mb_without_spilling is not None \
- and query.required_mem_mb_without_spilling is not None \
- and query.required_mem_mb_without_spilling < query.required_mem_mb_with_spilling:
+ if (
+ query.required_mem_mb_without_spilling is not None and
+ query.required_mem_mb_without_spilling is not None and
+ query.required_mem_mb_without_spilling < query.required_mem_mb_with_spilling
+ ):
# Query execution is not deterministic and sometimes a query will run without spilling
# at a lower mem limit than it did with spilling. In that case, just use the lower
# value.
- LOG.info("A lower memory limit to avoid spilling was found while searching for"
+ LOG.info(
+ "A lower memory limit to avoid spilling was found while searching for"
" the absolute minimum memory.")
query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
@@ -1162,12 +1293,15 @@
def estimate_query_mem_mb_usage(query, query_runner):
"""Runs an explain plan then extracts and returns the estimated memory needed to run
- the query.
+ the query.
"""
with query_runner.impalad_conn.cursor() as cursor:
LOG.debug("Using %s database", query.db_name)
if query.db_name:
cursor.execute('USE ' + query.db_name)
+ if query.query_type == QueryType.COMPUTE_STATS:
+ # Running "explain" on compute stats is not supported by Impala.
+ return
LOG.debug("Explaining query\n%s", query.sql)
cursor.execute('EXPLAIN ' + query.sql)
first_val = cursor.fetchone()[0]
@@ -1186,13 +1320,16 @@
store = json.load(file)
_check_store_version(store)
if not store:
- store = {"host_names": list(), "db_names": dict(),
- "version": RUNTIME_INFO_FILE_VERSION}
+ store = {
+ "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
with open(path, "w+") as file:
store["host_names"] = sorted([i.host_name for i in impala.impalads])
queries = store["db_names"].get(query.db_name, dict())
- queries[query.sql] = query
+ query_by_options = queries.get(query.sql, dict())
+ query_by_options[str(sorted(query.options.items()))] = query
+ queries[query.sql] = query_by_options
store["db_names"][query.db_name] = queries
+
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
data = dict(obj.__dict__)
@@ -1200,61 +1337,66 @@
if "sql" in data:
del data["sql"]
return data
- json.dump(store, file, cls=JsonEncoder, sort_keys=True, indent=2,
- separators=(',', ': '))
+ json.dump(
+ store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
def load_runtime_info(path, impala=None):
"""Reads the query runtime information at 'path' and returns a
- dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
- instance do not match the data in 'path'.
+ dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
+ instance do not match the data in 'path'.
"""
- queries_by_db_and_sql = defaultdict(dict)
+ queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
if not os.path.exists(path):
return queries_by_db_and_sql
with open(path) as file:
store = json.load(file)
_check_store_version(store)
- if impala and \
- store.get("host_names") != sorted([i.host_name for i in impala.impalads]):
+ if (
+ impala and
+ store.get("host_names") != sorted([i.host_name for i in impala.impalads])
+ ):
return queries_by_db_and_sql
for db_name, queries_by_sql in store["db_names"].iteritems():
- for sql, json_query in queries_by_sql.iteritems():
- query = Query()
- query.__dict__.update(json_query)
- query.sql = sql
- queries_by_db_and_sql[db_name][sql] = query
+ for sql, queries_by_options in queries_by_sql.iteritems():
+ for options, json_query in queries_by_options.iteritems():
+ query = Query()
+ query.__dict__.update(json_query)
+ query.sql = sql
+ queries_by_db_and_sql[db_name][sql][options] = query
return queries_by_db_and_sql
def _check_store_version(store):
"""Clears 'store' if the version is too old or raises an error if the version is too
- new.
+ new.
"""
if store["version"] < RUNTIME_INFO_FILE_VERSION:
LOG.warn("Runtime file info version is old and will be ignored")
store.clear()
elif store["version"] > RUNTIME_INFO_FILE_VERSION:
- raise Exception("Unexpected runtime file info version %s expected %s"
+ raise Exception(
+ "Unexpected runtime file info version %s expected %s"
% (store["version"], RUNTIME_INFO_FILE_VERSION))
def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
# TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
# when it was used.
- print(",".join(["Database", "Query",
- "Old Mem MB w/Spilling",
- "New Mem MB w/Spilling",
- "Diff %",
- "Old Runtime w/Spilling",
- "New Runtime w/Spilling",
- "Diff %",
- "Old Mem MB wout/Spilling",
- "New Mem MB wout/Spilling",
- "Diff %",
- "Old Runtime wout/Spilling",
- "New Runtime wout/Spilling",
- "Diff %"]))
+ print(",".join([
+ "Database", "Query",
+ "Old Mem MB w/Spilling",
+ "New Mem MB w/Spilling",
+ "Diff %",
+ "Old Runtime w/Spilling",
+ "New Runtime w/Spilling",
+ "Diff %",
+ "Old Mem MB wout/Spilling",
+ "New Mem MB wout/Spilling",
+ "Diff %",
+ "Old Runtime wout/Spilling",
+ "New Runtime wout/Spilling",
+ "Diff %"]))
for db_name, old_queries in old_runtime_info.iteritems():
new_queries = new_runtime_info.get(db_name)
if not new_queries:
@@ -1267,8 +1409,10 @@
sys.stdout.write(",")
sys.stdout.write(old_query["name"])
sys.stdout.write(",")
- for attr in ["required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
- "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"]:
+ for attr in [
+ "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
+ "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
+ ]:
old_value = old_query[attr]
sys.stdout.write(str(old_value))
sys.stdout.write(",")
@@ -1283,90 +1427,345 @@
print()
+def generate_DML_queries(cursor, dml_mod_values):
+ """Generate insert, upsert, update, delete DML statements.
+
+ For each table in the database that cursor is connected to, create 4 DML queries
+ (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
+ controls which rows will be affected. The generated queries assume that for each table
+ in the database, there exists a table with a '_original' suffix that is never modified.
+
+ This function has some limitations:
+ 1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
+ 2. Requires that the type of the first column of the primary key is an integer type.
+ """
+ LOG.info("Generating DML queries")
+ tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+ if not t.endswith("_original")]
+ result = []
+ for table in tables:
+ if not table.primary_keys:
+ # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
+ # table.
+ LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
+ continue
+ if len(table.primary_keys) > 1:
+ # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
+ LOG.debug("Skipping table '{0}' because it has more than "
+ "1 primary key column.".format(table.name))
+ continue
+ primary_key = table.primary_keys[0]
+ if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
+ # We want to be able to apply the modulo operation on the primary key. If the
+ # the first primary key column happens to not be an integer, we will skip
+ # generating queries for this table
+ LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
+ "primary key is not an integer.".format(table.name, primary_key.name))
+ continue
+ for mod_value in dml_mod_values:
+ # Insert
+ insert_query = Query()
+ # Populate runtime info for Insert and Upsert queries before Update and Delete
+ # queries because tables remain in original state after running the Insert and
+ # Upsert queries. During the binary search in runtime info population for the
+ # Insert query, we first delete some rows and then reinsert them, so the table
+ # remains in the original state. For the delete, the order is reversed, so the table
+ # is not in the original state after running the the delete (or update) query. This
+ # is why population_order is smaller for Insert and Upsert queries.
+ insert_query.population_order = 1
+ insert_query.query_type = QueryType.INSERT
+ insert_query.name = "insert_{0}".format(table.name)
+ insert_query.db_name = cursor.db_name
+ insert_query.sql = (
+ "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
+ "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+ # Upsert
+ upsert_query = Query()
+ upsert_query.population_order = 1
+ upsert_query.query_type = QueryType.UPSERT
+ upsert_query.name = "upsert_{0}".format(table.name)
+ upsert_query.db_name = cursor.db_name
+ upsert_query.sql = (
+ "UPSERT INTO TABLE {0} SELECT * "
+ "FROM {0}_original WHERE {1} % {2} = 0").format(
+ table.name, primary_key.name, mod_value)
+ # Update
+ update_query = Query()
+ update_query.population_order = 2
+ update_query.query_type = QueryType.UPDATE
+ update_query.name = "update_{0}".format(table.name)
+ update_query.db_name = cursor.db_name
+ update_list = ', '.join(
+ 'a.{0} = b.{0}'.format(col.name)
+ for col in table.cols if not col.is_primary_key)
+ update_query.sql = (
+ "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
+ "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
+ table_name=table.name, pk=primary_key.name, mod_value=mod_value,
+ update_list=update_list)
+ # Delete
+ delete_query = Query()
+ delete_query.population_order = 2
+ delete_query.query_type = QueryType.DELETE
+ delete_query.name = "delete_{0}".format(table.name)
+ delete_query.db_name = cursor.db_name
+ delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
+ table.name, primary_key.name, mod_value)
+
+ if table.name + "_original" in set(table.name for table in tables):
+ insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
+ table.name, primary_key.name, mod_value)
+ upsert_query.set_up_sql = insert_query.set_up_sql
+ update_query.set_up_sql = (
+ "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
+ "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+ delete_query.set_up_sql = update_query.set_up_sql
+
+ result.append(insert_query)
+ LOG.debug("Added insert query: {0}".format(insert_query))
+ result.append(update_query)
+ LOG.debug("Added update query: {0}".format(update_query))
+ result.append(upsert_query)
+ LOG.debug("Added upsert query: {0}".format(upsert_query))
+ result.append(delete_query)
+ LOG.debug("Added delete query: {0}".format(delete_query))
+ assert len(result) > 0, "No DML queries were added."
+ return result
+
+
+def generate_compute_stats_queries(cursor):
+ """For each table in the database that cursor is connected to, generate several compute
+ stats queries. Each query will have a different value for the MT_DOP query option.
+ """
+ LOG.info("Generating Compute Stats queries")
+ tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+ if not t.endswith("_original")]
+ result = []
+ mt_dop_values = [str(2**k) for k in range(5)]
+ for table in tables:
+ for mt_dop_value in mt_dop_values:
+ compute_query = Query()
+ compute_query.population_order = 1
+ compute_query.query_type = QueryType.COMPUTE_STATS
+ compute_query.sql = "COMPUTE STATS {0}".format(table.name)
+ compute_query.options["MT_DOP"] = mt_dop_value
+ compute_query.db_name = cursor.db_name
+ compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
+ table.name, compute_query.options["MT_DOP"])
+ result.append(compute_query)
+ LOG.debug("Added compute stats query: {0}".format(compute_query))
+ return result
+
+
+def prepare_database(cursor):
+ """For each table in the database that cursor is connected to, create an identical copy
+ with '_original' suffix. This function is idempotent.
+
+ Note: At this time we only support Kudu tables with a simple hash partitioning based on
+ the primary key. (SHOW CREATE TABLE would not work otherwise.)
+ """
+ tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+ for table_name in tables:
+ if not table_name.endswith("_original") and table_name + "_original" not in tables:
+ LOG.debug("Creating original table: {0}".format(table_name))
+ cursor.execute("SHOW CREATE TABLE " + table_name)
+ create_sql = cursor.fetchone()[0]
+ search_pattern = r"CREATE TABLE (\w*)\.(.*) \("
+ replacement = "CREATE TABLE {tbl} (".format(tbl=table_name + "_original")
+ create_original_sql = re.sub(
+ search_pattern, replacement, create_sql, count=1)
+ LOG.debug("Create original SQL:\n{0}".format(create_original_sql))
+ cursor.execute(create_original_sql)
+ cursor.execute("INSERT INTO {0}_original SELECT * FROM {0}".format(table_name))
+ cursor.execute("COMPUTE STATS {0}".format(table_name + "_original"))
+
+
+def reset_databases(cursor):
+ """Reset the database to the initial state. This is done by overwriting tables which
+ don't have the _original suffix with data from tables with the _original suffix.
+
+ Note: At this time we only support Kudu tables with a simple hash partitioning based on
+ the primary key. (SHOW CREATE TABLE would not work otherwise.)
+ """
+ LOG.info("Resetting {0} database".format(cursor.db_name))
+ tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+ for table_name in tables:
+ if not table_name.endswith("_original"):
+ if table_name + "_original" in tables:
+ cursor.execute("SHOW CREATE TABLE " + table_name)
+ create_table_command = cursor.fetchone()[0]
+ cursor.execute("DROP TABLE {0}".format(table_name))
+ cursor.execute(create_table_command)
+ cursor.execute("INSERT INTO {0} SELECT * FROM {0}_original".format(table_name))
+ cursor.execute("COMPUTE STATS {0}".format(table_name))
+ else:
+ LOG.debug("Table '{0}' cannot be reset because '{0}_original' does not"
+ " exist in '{1}' database.".format(table_name, cursor.db_name))
+
+
+def populate_all_queries(queries, impala, args, runtime_info_path,
+ queries_with_runtime_info_by_db_sql_and_options):
+ """Populate runtime info for all queries, ordered by the population_order property."""
+ result = []
+ queries_by_order = {}
+ for query in queries:
+ if query.population_order not in queries_by_order:
+ queries_by_order[query.population_order] = []
+ queries_by_order[query.population_order].append(query)
+ for population_order in sorted(queries_by_order.keys()):
+ for query in queries_by_order[population_order]:
+ if (
+ query.sql in
+ queries_with_runtime_info_by_db_sql_and_options[query.db_name] and
+ str(sorted(query.options.items())) in
+ queries_with_runtime_info_by_db_sql_and_options[query.db_name][query.sql]
+ ):
+ LOG.debug("Reusing previous runtime data for query: " + query.sql)
+ result.append(queries_with_runtime_info_by_db_sql_and_options[
+ query.db_name][query.sql][str(sorted(query.options.items()))])
+ else:
+ populate_runtime_info(
+ query, impala, args.use_kerberos, args.result_hash_log_dir,
+ samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+ save_runtime_info(runtime_info_path, query, impala)
+ result.append(query)
+ return result
+
+
def main():
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from random import shuffle
import tests.comparison.cli_options as cli_options
- parser = ArgumentParser(epilog=dedent("""
- Before running this script a CM cluster must be setup and any needed data
- such as TPC-H/DS must be loaded. The first time this script is run it will
- find memory limits and runtimes for each query and save the data to disk (since
- collecting the data is slow) at --runtime-info-path then run the stress test.
- Later runs will reuse the saved memory limits and timings. If the cluster changes
- significantly the memory limits should be re-measured (deleting the file at
- --runtime-info-path will cause re-measuring to happen).""").strip(),
- formatter_class=ArgumentDefaultsHelpFormatter)
+ parser = ArgumentParser(
+ epilog=dedent("""
+ Before running this script a CM cluster must be setup and any needed data
+ such as TPC-H/DS must be loaded. The first time this script is run it will
+ find memory limits and runtimes for each query and save the data to disk (since
+ collecting the data is slow) at --runtime-info-path then run the stress test.
+ Later runs will reuse the saved memory limits and timings. If the cluster changes
+ significantly the memory limits should be re-measured (deleting the file at
+ --runtime-info-path will cause re-measuring to happen).""").strip(),
+ formatter_class=ArgumentDefaultsHelpFormatter)
cli_options.add_logging_options(parser)
cli_options.add_cluster_options(parser)
cli_options.add_kerberos_options(parser)
- parser.add_argument("--runtime-info-path",
+ parser.add_argument(
+ "--runtime-info-path",
default=os.path.join(gettempdir(), "{cm_host}_query_runtime_info.json"),
help="The path to store query runtime info at. '{cm_host}' will be replaced with"
" the actual host name from --cm-host.")
- parser.add_argument("--samples", default=1, type=int,
+ parser.add_argument(
+ "--samples", default=1, type=int,
help='Used when collecting "runtime info" - the number of samples to collect when'
' testing a particular mem limit value.')
- parser.add_argument("--max-conflicting-samples", default=0, type=int,
+ parser.add_argument(
+ "--max-conflicting-samples", default=0, type=int,
help='Used when collecting "runtime info" - the number of samples outcomes that'
' can disagree when deciding to accept a particular mem limit. Ex, when trying to'
' determine the mem limit that avoids spilling with samples=5 and'
' max-conflicting-samples=1, then 4/5 queries must not spill at a particular mem'
' limit.')
- parser.add_argument("--result-hash-log-dir", default=gettempdir(),
+ parser.add_argument(
+ "--result-hash-log-dir", default=gettempdir(),
help="If query results do not match, a log file will be left in this dir. The log"
" file is also created during the first run when runtime info is collected for"
" each query.")
- parser.add_argument("--no-status", action="store_true",
- help="Do not print the status table.")
- parser.add_argument("--cancel-current-queries", action="store_true",
+ parser.add_argument(
+ "--no-status", action="store_true", help="Do not print the status table.")
+ parser.add_argument(
+ "--cancel-current-queries", action="store_true",
help="Cancel any queries running on the cluster before beginning.")
- parser.add_argument("--filter-query-mem-ratio", type=float, default=0.333,
+ parser.add_argument(
+ "--filter-query-mem-ratio", type=float, default=0.333,
help="Queries that require this ratio of total available memory will be filtered.")
- parser.add_argument("--startup-queries-per-second", type=float, default=2.0,
+ parser.add_argument(
+ "--startup-queries-per-second", type=float, default=2.0,
help="Adjust this depending on the cluster size and workload. This determines"
" the minimum amount of time between successive query submissions when"
" the workload is initially ramping up.")
- parser.add_argument("--fail-upon-successive-errors", type=int, default=1,
+ parser.add_argument(
+ "--fail-upon-successive-errors", type=int, default=1,
help="Continue running until N query errors are encountered in a row. Set"
" this to a high number to only stop when something catastrophic happens. A"
" value of 1 stops upon the first error.")
- parser.add_argument("--mem-limit-padding-pct", type=int, default=25,
+ parser.add_argument(
+ "--mem-limit-padding-pct", type=int, default=25,
help="Pad query mem limits found by solo execution with this percentage when"
" running concurrently. After padding queries will not be expected to fail"
" due to mem limit exceeded.")
- parser.add_argument("--timeout-multiplier", type=float, default=1.0,
+ parser.add_argument(
+ "--mem-limit-padding-abs", type=int, default=0,
+ help="Pad query mem limits found by solo execution with this value (in megabytes)"
+ " running concurrently. After padding queries will not be expected to fail"
+ " due to mem limit exceeded. This is useful if we want to be able to add the same"
+ " amount of memory to smaller queries as to the big ones.")
+ parser.add_argument(
+ "--timeout-multiplier", type=float, default=1.0,
help="Query timeouts will be multiplied by this value.")
parser.add_argument("--max-queries", type=int, default=100)
+ parser.add_argument(
+ "--reset-databases-before-binary-search", action="store_true",
+ help="If True, databases will be reset to their original state before the binary"
+ " search.")
+ parser.add_argument(
+ "--reset-databases-after-binary-search", action="store_true",
+ help="If True, databases will be reset to their original state after the binary"
+ " search and before starting the stress test. The primary intent of this option is"
+ " to undo the changes made to the databases by the binary search. This option can"
+ " also be used to reset the databases before running other (non stress) tests on"
+ " the same data.")
+ parser.add_argument(
+ "--generate-dml-queries", action="store_true",
+ help="If True, DML queries will be generated for Kudu databases.")
+ parser.add_argument(
+ "--dml-mod-values", nargs="+", type=int, default=[11],
+ help="List of mod values to use for the DML queries. There will be 4 DML (delete,"
+ " insert, update, upsert) queries generated per mod value per table. The smaller"
+ " the value, the more rows the DML query would touch (the query should touch about"
+ " 1/mod_value rows.)")
+ parser.add_argument(
+ "--generate-compute-stats-queries", action="store_true",
+ help="If True, Compute Stats queries will be generated.")
+ parser.add_argument(
+ "--select-probability", type=float, default=0.5,
+ help="Probability of choosing a select query (as opposed to a DML query).")
parser.add_argument("--tpcds-db", help="If provided, TPC-DS queries will be used.")
parser.add_argument("--tpch-db", help="If provided, TPC-H queries will be used.")
- parser.add_argument("--tpch-nested-db",
- help="If provided, nested TPC-H queries will be used.")
- parser.add_argument("--tpch-kudu-db",
- help="If provided, TPC-H queries for Kudu will be used.")
- parser.add_argument("--tpcds-kudu-db",
- help="If provided, TPC-DS queries for Kudu will be used.")
- parser.add_argument("--random-db",
- help="If provided, random queries will be used.")
- parser.add_argument("--random-query-count", type=int, default=50,
+ parser.add_argument(
+ "--tpch-nested-db", help="If provided, nested TPC-H queries will be used.")
+ parser.add_argument(
+ "--tpch-kudu-db", help="If provided, TPC-H queries for Kudu will be used.")
+ parser.add_argument(
+ "--tpcds-kudu-db", help="If provided, TPC-DS queries for Kudu will be used.")
+ parser.add_argument(
+ "--random-db", help="If provided, random queries will be used.")
+ parser.add_argument(
+ "--random-query-count", type=int, default=50,
help="The number of random queries to generate.")
- parser.add_argument("--random-query-timeout-seconds", type=int, default=(5 * 60),
+ parser.add_argument(
+ "--random-query-timeout-seconds", type=int, default=(5 * 60),
help="A random query that runs longer than this time when running alone will"
" be discarded.")
- parser.add_argument("--query-file-path", help="Use queries in the given file. The file"
+ parser.add_argument(
+ "--query-file-path", help="Use queries in the given file. The file"
" format must be the same as standard test case format. Queries are expected to "
" be randomly generated and will be validated before running in stress mode.")
- parser.add_argument("--query-file-db", help="The name of the database to use with the "
- "queries from --query-file-path.")
+ parser.add_argument(
+ "--query-file-db",
+ help="The name of the database to use with the queries from --query-file-path.")
parser.add_argument("--mem-overcommit-pct", type=float, default=0)
- parser.add_argument("--mem-spill-probability", type=float, default=0.33,
- dest="spill_probability",
+ parser.add_argument(
+ "--mem-spill-probability", type=float, default=0.33, dest="spill_probability",
help="The probability that a mem limit will be set low enough to induce spilling.")
- parser.add_argument("--mem-leak-check-interval-mins", type=int, default=None,
+ parser.add_argument(
+ "--mem-leak-check-interval-mins", type=int, default=None,
help="Periodically stop query execution and check that memory levels have reset.")
- parser.add_argument("--cancel-probability", type=float, default=0.1,
+ parser.add_argument(
+ "--cancel-probability", type=float, default=0.1,
help="The probability a query will be cancelled.")
- parser.add_argument("--nlj-filter", choices=("in", "out", None),
+ parser.add_argument(
+ "--nlj-filter", choices=("in", "out", None),
help="'in' means only nested-loop queries will be used, 'out' means no NLJ queries"
" will be used. The default is to not filter either way.")
parser.add_argument(
@@ -1377,14 +1776,18 @@
"DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
args = parser.parse_args()
- cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file,
- log_thread_id=True, log_process_id=True)
+ cli_options.configure_logging(
+ args.log_level, debug_log_file=args.debug_log_file, log_thread_id=True,
+ log_process_id=True)
LOG.debug("CLI args: %s" % (args, ))
- if not args.tpcds_db and not args.tpch_db and not args.random_db \
- and not args.tpch_nested_db and not args.tpch_kudu_db \
- and not args.tpcds_kudu_db and not args.query_file_path:
- raise Exception("At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
+ if (
+ not args.tpcds_db and not args.tpch_db and not args.random_db and not
+ args.tpch_nested_db and not args.tpch_kudu_db and not
+ args.tpcds_kudu_db and not args.query_file_path
+ ):
+ raise Exception(
+ "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
"--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
# The stress test sets these, so callers cannot override them.
@@ -1437,7 +1840,8 @@
runtime_info_path = args.runtime_info_path
if "{cm_host}" in runtime_info_path:
runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
- queries_with_runtime_info_by_db_and_sql = load_runtime_info(runtime_info_path, impala)
+ queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
+ runtime_info_path, impala)
# Start loading the test queries.
queries = list()
@@ -1449,36 +1853,57 @@
for query in tpcds_queries:
query.db_name = args.tpcds_db
queries.extend(tpcds_queries)
+ if args.generate_compute_stats_queries:
+ with impala.cursor(db_name=args.tpcds_db) as cursor:
+ queries.extend(generate_compute_stats_queries(cursor))
if args.tpch_db:
tpch_queries = load_tpc_queries("tpch")
for query in tpch_queries:
query.db_name = args.tpch_db
queries.extend(tpch_queries)
+ if args.generate_compute_stats_queries:
+ with impala.cursor(db_name=args.tpch_db) as cursor:
+ queries.extend(generate_compute_stats_queries(cursor))
if args.tpch_nested_db:
tpch_nested_queries = load_tpc_queries("tpch_nested")
for query in tpch_nested_queries:
query.db_name = args.tpch_nested_db
queries.extend(tpch_nested_queries)
+ if args.generate_compute_stats_queries:
+ with impala.cursor(db_name=args.tpch_nested_db) as cursor:
+ queries.extend(generate_compute_stats_queries(cursor))
if args.tpch_kudu_db:
tpch_kudu_queries = load_tpc_queries("tpch", load_in_kudu=True)
for query in tpch_kudu_queries:
query.db_name = args.tpch_kudu_db
queries.extend(tpch_kudu_queries)
+ if args.generate_compute_stats_queries:
+ with impala.cursor(db_name=args.tpch_kudu_db) as cursor:
+ queries.extend(generate_compute_stats_queries(cursor))
+ if args.generate_dml_queries:
+ with impala.cursor(db_name=args.tpch_kudu_db) as cursor:
+ prepare_database(cursor)
+ queries.extend(generate_DML_queries(cursor, args.dml_mod_values))
if args.tpcds_kudu_db:
tpcds_kudu_queries = load_tpc_queries("tpcds", load_in_kudu=True)
for query in tpcds_kudu_queries:
query.db_name = args.tpcds_kudu_db
queries.extend(tpcds_kudu_queries)
- for idx in xrange(len(queries) - 1, -1, -1):
- query = queries[idx]
- if query.sql in queries_with_runtime_info_by_db_and_sql[query.db_name]:
- query = queries_with_runtime_info_by_db_and_sql[query.db_name][query.sql]
- LOG.debug("Reusing previous runtime data for query: " + query.sql)
- queries[idx] = query
- else:
- populate_runtime_info(query, impala, args.use_kerberos, args.result_hash_log_dir,
- samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
- save_runtime_info(runtime_info_path, query, impala)
+ if args.generate_compute_stats_queries:
+ with impala.cursor(db_name=args.tpcds_kudu_db) as cursor:
+ queries.extend(generate_compute_stats_queries(cursor))
+ if args.generate_dml_queries:
+ with impala.cursor(db_name=args.tpcds_kudu_db) as cursor:
+ prepare_database(cursor)
+ queries.extend(generate_DML_queries(cursor, args.dml_mod_values))
+
+ if args.reset_databases_before_binary_search:
+ for database in set(query.db_name for query in queries):
+ with impala.cursor(db_name=database) as cursor:
+ reset_databases(cursor)
+
+ queries = populate_all_queries(queries, impala, args, runtime_info_path,
+ queries_with_runtime_info_by_db_sql_and_options)
# A particular random query may either fail (due to a generator or Impala bug) or
# take a really long time to complete. So the queries needs to be validated. Since the
@@ -1487,28 +1912,30 @@
query_generator = QueryGenerator(DefaultProfile())
with impala.cursor(db_name=args.random_db) as cursor:
tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
- queries.extend(load_random_queries_and_populate_runtime_info(query_generator,
- SqlWriter.create(), tables, args.random_db, impala, args.use_kerberos,
- args.random_query_count, args.random_query_timeout_seconds,
+ queries.extend(load_random_queries_and_populate_runtime_info(
+ query_generator, SqlWriter.create(), tables, args.random_db, impala,
+ args.use_kerberos, args.random_query_count, args.random_query_timeout_seconds,
args.result_hash_log_dir))
if args.query_file_path:
- file_queries = load_queries_from_test_file(args.query_file_path,
- db_name=args.query_file_db)
+ file_queries = load_queries_from_test_file(
+ args.query_file_path, db_name=args.query_file_db)
shuffle(file_queries)
- queries.extend(populate_runtime_info_for_random_queries(impala, args.use_kerberos,
- file_queries, args.random_query_count, args.random_query_timeout_seconds,
- args.result_hash_log_dir))
+ queries.extend(populate_runtime_info_for_random_queries(
+ impala, args.use_kerberos, file_queries, args.random_query_count,
+ args.random_query_timeout_seconds, args.result_hash_log_dir))
# Apply tweaks to the query's runtime info as requested by CLI options.
for idx in xrange(len(queries) - 1, -1, -1):
query = queries[idx]
if query.required_mem_mb_with_spilling:
- query.required_mem_mb_with_spilling += int(query.required_mem_mb_with_spilling
- * args.mem_limit_padding_pct / 100.0)
+ query.required_mem_mb_with_spilling += int(
+ query.required_mem_mb_with_spilling * args.mem_limit_padding_pct / 100.0) + \
+ args.mem_limit_padding_abs
if query.required_mem_mb_without_spilling:
- query.required_mem_mb_without_spilling += int(query.required_mem_mb_without_spilling
- * args.mem_limit_padding_pct / 100.0)
+ query.required_mem_mb_without_spilling += int(
+ query.required_mem_mb_without_spilling * args.mem_limit_padding_pct / 100.0) + \
+ args.mem_limit_padding_abs
if query.solo_runtime_secs_with_spilling:
query.solo_runtime_secs_with_spilling *= args.timeout_multiplier
if query.solo_runtime_secs_without_spilling:
@@ -1522,6 +1949,7 @@
LOG.debug("Filtered query due to mem ratio option: " + query.sql)
del queries[idx]
+ # Remove queries that have a nested loop join in the plan.
if args.nlj_filter:
with impala.cursor(db_name=args.random_db) as cursor:
for idx in xrange(len(queries) - 1, -1, -1):
@@ -1548,6 +1976,15 @@
raise Exception("All queries were filtered")
print("Using %s queries" % len(queries))
+ # After the binary search phase finishes, it may be a good idea to reset the database
+ # again to start the stress test from a clean state.
+ if args.reset_databases_after_binary_search:
+ for database in set(query.db_name for query in queries):
+ with impala.cursor(db_name=database) as cursor:
+ reset_databases(cursor)
+
+ LOG.info("Number of queries in the list: {0}".format(len(queries)))
+
stress_runner = StressRunner()
stress_runner.result_hash_log_dir = args.result_hash_log_dir
stress_runner.startup_queries_per_sec = args.startup_queries_per_second
@@ -1557,8 +1994,12 @@
stress_runner.spill_probability = args.spill_probability
stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
stress_runner.common_query_options = common_query_options
- stress_runner.run_queries(queries, impala, args.max_queries, args.mem_overcommit_pct,
- not args.no_status) # This is the value of 'should_print_status'.
+ stress_runner.run_queries(
+ queries, impala, args.max_queries, args.mem_overcommit_pct,
+ should_print_status=not args.no_status,
+ verify_results=not args.generate_dml_queries,
+ select_probability=args.select_probability)
+
if __name__ == "__main__":
main()
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 207f4b5..ad40b68 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -52,7 +52,7 @@
mem = float(mem)
if mem <= 0:
return
- units = units.strip().upper()
+ units = units.strip().upper() if units else ""
if units.endswith("B"):
units = units[:-1]
if not units: