Merge remote-tracking branch 'origin/master' into hadoop-next

Change-Id: Ib37056d1f273f6efb5b87c9b48512f8b14ab497b
diff --git a/be/src/gutil/walltime.cc b/be/src/gutil/walltime.cc
index b497500..04d7f4b 100644
--- a/be/src/gutil/walltime.cc
+++ b/be/src/gutil/walltime.cc
@@ -166,18 +166,6 @@
   return true;
 }
 
-WallTime WallTime_Now() {
-#if defined(__APPLE__)
-  mach_timespec_t ts;
-  walltime_internal::GetCurrentTime(&ts);
-  return ts.tv_sec + ts.tv_nsec / static_cast<double>(1e9);
-#else
-  timespec ts;
-  clock_gettime(CLOCK_REALTIME, &ts);
-  return ts.tv_sec + ts.tv_nsec / static_cast<double>(1e9);
-#endif  // defined(__APPLE__)
-}
-
 void StringAppendStrftime(string* dst,
                           const char* format,
                           time_t when,
diff --git a/be/src/gutil/walltime.h b/be/src/gutil/walltime.h
index 8d31627..2f04ebe 100644
--- a/be/src/gutil/walltime.h
+++ b/be/src/gutil/walltime.h
@@ -30,6 +30,12 @@
 #include "common/logging.h"
 #include "gutil/integral_types.h"
 
+#define NANOS_PER_SEC  1000000000ll
+#define NANOS_PER_MICRO      1000ll
+#define MICROS_PER_SEC    1000000ll
+#define MICROS_PER_MILLI     1000ll
+#define MILLIS_PER_SEC       1000ll
+
 typedef double WallTime;
 
 // Append result to a supplied string.
@@ -52,9 +58,6 @@
                                     bool local,
                                     WallTime* result);
 
-// Return current time in seconds as a WallTime.
-WallTime WallTime_Now();
-
 typedef int64 MicrosecondsInt64;
 typedef int64 NanosecondsInt64;
 
@@ -76,7 +79,7 @@
 inline MicrosecondsInt64 GetCurrentTimeMicros() {
   mach_timespec_t ts;
   GetCurrentTime(&ts);
-  return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
+  return ts.tv_sec * MICROS_PER_SEC + ts.tv_nsec / NANOS_PER_MICRO;
 }
 
 inline int64_t GetMonoTimeNanos() {
@@ -91,7 +94,7 @@
 }
 
 inline MicrosecondsInt64 GetMonoTimeMicros() {
-  return GetMonoTimeNanos() / 1e3;
+  return GetMonoTimeNanos() / NANOS_PER_MICRO;
 }
 
 inline MicrosecondsInt64 GetThreadCpuTimeMicros() {
@@ -117,7 +120,8 @@
     return 0;
   }
 
-  return thread_info_data.user_time.seconds * 1e6 + thread_info_data.user_time.microseconds;
+  return thread_info_data.user_time.seconds * MICROS_PER_SEC +
+      thread_info_data.user_time.microseconds;
 }
 
 #else
@@ -125,13 +129,13 @@
 inline MicrosecondsInt64 GetClockTimeMicros(clockid_t clock) {
   timespec ts;
   clock_gettime(clock, &ts);
-  return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
+  return ts.tv_sec * MICROS_PER_SEC + ts.tv_nsec / NANOS_PER_MICRO;
 }
 
 inline NanosecondsInt64 GetClockTimeNanos(clockid_t clock) {
   timespec ts;
   clock_gettime(clock, &ts);
-  return ts.tv_sec * 1e9 + ts.tv_nsec;
+  return ts.tv_sec * NANOS_PER_SEC + ts.tv_nsec;
 }
 
 #endif // defined(__APPLE__)
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e6a9084..afbb33d 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -86,22 +86,9 @@
 }
 
 string TypeMapping(Type::type t) {
-  switch (t) {
-    case Type::BOOLEAN:
-      return "BOOLEAN";
-    case Type::INT32:
-      return "INT32";
-    case Type::INT64:
-      return "INT64";
-    case Type::FLOAT:
-      return "FLOAT";
-    case Type::DOUBLE:
-      return "DOUBLE";
-    case Type::BYTE_ARRAY:
-      return "BYTE_ARRAY";
-    default:
-      return "UNKNOWN";
-  }
+  auto it = _Type_VALUES_TO_NAMES.find(t);
+  if (it != _Type_VALUES_TO_NAMES.end()) return it->second;
+  return "UNKNOWN";
 }
 
 void AppendSchema(const vector<SchemaElement>& schema, int level,
@@ -148,8 +135,8 @@
 //     def levels - with our RLE scheme it is not possible to determine how many values
 //     were actually written if the final run is a literal run, only if the final run is
 //     a repeated run (see util/rle-encoding.h for more details).
-void CheckDataPage(const ColumnChunk& col, const PageHeader& header,
-    const uint8_t* page) {
+// Returns the number of rows specified by the header.
+int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) {
   const uint8_t* data = page;
   std::vector<uint8_t> decompressed_buffer;
   if (col.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
@@ -191,6 +178,8 @@
       }
     }
   }
+
+  return header.data_page_header.num_values;
 }
 
 // Simple utility to read parquet files on local disk.  This utility validates the
@@ -248,12 +237,14 @@
   int total_page_header_size = 0;
   int total_compressed_data_size = 0;
   int total_uncompressed_data_size = 0;
-  vector<int> column_sizes;
+  vector<int> column_byte_sizes;
+  vector<int> column_num_rows;
 
   for (int i = 0; i < file_metadata.row_groups.size(); ++i) {
     cerr << "Reading row group " << i << endl;
     RowGroup& rg = file_metadata.row_groups[i];
-    column_sizes.resize(rg.columns.size());
+    column_byte_sizes.resize(rg.columns.size());
+    column_num_rows.resize(rg.columns.size());
 
     for (int c = 0; c < rg.columns.size(); ++c) {
       cerr << "  Reading column " << c << endl;
@@ -278,18 +269,23 @@
         }
 
         data += header_size;
-        if (header.__isset.data_page_header) CheckDataPage(col, header, data);
+        if (header.__isset.data_page_header) {
+          column_num_rows[c] += CheckDataPage(col, header, data);
+        }
 
         total_page_header_size += header_size;
-        column_sizes[c] += header.compressed_page_size;
+        column_byte_sizes[c] += header.compressed_page_size;
         total_compressed_data_size += header.compressed_page_size;
         total_uncompressed_data_size += header.uncompressed_page_size;
         data += header.compressed_page_size;
         ++pages_read;
       }
-      // Check that we ended exactly where we should have
+      // Check that we ended exactly where we should have.
       assert(data == col_end);
+      // Check that all cols have the same number of rows.
+      assert(column_num_rows[0] == column_num_rows[c]);
     }
+    num_rows += column_num_rows[0];
   }
   double compression_ratio =
       (double)total_uncompressed_data_size / total_compressed_data_size;
@@ -306,9 +302,9 @@
      << "(" << (total_compressed_data_size / (double)file_len) << ")" << endl;
   ss << "  Column uncompressed size: " << total_uncompressed_data_size
      << "(" << compression_ratio << ")" << endl;
-  for (int i = 0; i < column_sizes.size(); ++i) {
-    ss << "    " << "Col " << i << ": " << column_sizes[i]
-       << "(" << (column_sizes[i] / (double)file_len) << ")" << endl;
+  for (int i = 0; i < column_byte_sizes.size(); ++i) {
+    ss << "    " << "Col " << i << ": " << column_byte_sizes[i]
+       << "(" << (column_byte_sizes[i] / (double)file_len) << ")" << endl;
   }
   cerr << ss.str() << endl;
 
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index 0e73b6a..c1f85aa 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -170,7 +170,7 @@
     // Now() can be called frequently (IMPALA-2407).
     timespec ts;
     clock_gettime(OsInfo::fast_clock(), &ts);
-    return ts.tv_sec * 1e9 + ts.tv_nsec;
+    return ts.tv_sec * NANOS_PER_SEC + ts.tv_nsec;
 #endif
   }
 
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 28a0f66..efe6a3b 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -39,11 +39,11 @@
 }
 
 inline int64_t MonotonicMillis() {
-  return GetMonoTimeMicros() / 1e3;
+  return GetMonoTimeMicros() / MICROS_PER_MILLI;
 }
 
 inline int64_t MonotonicSeconds() {
-  return GetMonoTimeMicros() / 1e6;
+  return GetMonoTimeMicros() / MICROS_PER_SEC;
 }
 
 
@@ -52,7 +52,7 @@
 /// a cluster. For more accurate timings on the local host use the monotonic functions
 /// above.
 inline int64_t UnixMillis() {
-  return GetCurrentTimeMicros() / 1e3;
+  return GetCurrentTimeMicros() / MICROS_PER_MILLI;
 }
 
 /// Sleeps the current thread for at least duration_ms milliseconds.
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 7cdd026..d971852 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -80,8 +80,8 @@
     "The location of the debug webserver's SSL certificate file, in .pem format. If "
     "empty, webserver SSL support is not enabled");
 DEFINE_string(webserver_private_key_file, "", "The full path to the private key used as a"
-    " counterpart to the public key contained in --ssl_server_certificate. If "
-    "--ssl_server_certificate is set, this option must be set as well.");
+    " counterpart to the public key contained in --webserver_certificate_file. If "
+    "--webserver_certificate_file is set, this option must be set as well.");
 DEFINE_string(webserver_private_key_password_cmd, "", "A Unix command whose output "
     "returns the password used to decrypt the Webserver's certificate private key file "
     "specified in --webserver_private_key_file. If the .PEM key file is not "
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index d11c1c3..8b3cc8c 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -141,7 +141,7 @@
       # Remove spaces, trim minor versions, and convert to lowercase.
       DISTRO_VERSION="$(tr -d ' \n' <<< "$DISTRO_VERSION" | cut -d. -f1 | tr "A-Z" "a-z")"
       case "$DISTRO_VERSION" in
-        centos6 | centos7 | debian7 | debian8 | sles12 | ubuntu* )
+        centos6 | centos7 | debian7 | debian8 | suselinux12 | ubuntu* )
             KUDU_IS_SUPPORTED=true;;
       esac
     fi
diff --git a/bin/make_impala.sh b/bin/make_impala.sh
index 97525c3..70d088f 100755
--- a/bin/make_impala.sh
+++ b/bin/make_impala.sh
@@ -170,7 +170,6 @@
   exit 0
 fi
 
-MAKE_ARGS=-j${IMPALA_BUILD_THREADS:-4}
 if [ $BUILD_FE_ONLY -eq 1 ]; then
   ${MAKE_CMD} ${MAKE_ARGS} fe
 elif [ $BUILD_EVERYTHING -eq 1 ]; then
diff --git a/buildall.sh b/buildall.sh
index fc66acb..291f19e 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -59,7 +59,8 @@
 BUILD_ASAN=0
 BUILD_FE_ONLY=0
 BUILD_TIDY=0
-MAKE_CMD=make
+# Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
+export MAKE_CMD=make
 LZO_CMAKE_ARGS=
 
 # Defaults that can be picked up from the environment, but are overridable through the
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 012da42..33f0591 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -276,10 +276,10 @@
 terminal BITAND, BITOR, BITXOR, BITNOT;
 terminal EQUAL, NOT, NOTEQUAL, LESSTHAN, GREATERTHAN;
 terminal FACTORIAL; // Placeholder terminal for postfix factorial operator
+terminal COMMENTED_PLAN_HINT_START, COMMENTED_PLAN_HINT_END;
 terminal String IDENT;
 terminal String EMPTY_IDENT;
 terminal String NUMERIC_OVERFLOW;
-terminal String COMMENTED_PLAN_HINTS;
 terminal BigDecimal INTEGER_LITERAL;
 terminal BigDecimal DECIMAL_LITERAL;
 terminal String STRING_LITERAL;
@@ -367,7 +367,8 @@
 nonterminal Subquery subquery;
 nonterminal JoinOperator join_operator;
 nonterminal opt_inner, opt_outer;
-nonterminal ArrayList<String> opt_plan_hints;
+nonterminal PlanHint plan_hint;
+nonterminal List<PlanHint> opt_plan_hints, plan_hint_list;
 nonterminal TypeDef type_def;
 nonterminal Type type;
 nonterminal Expr sign_chain_expr;
@@ -2300,41 +2301,41 @@
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:hints table_ref:table
+  | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints
   {:
     table.setJoinOp(JoinOperator.CROSS_JOIN);
     // We will throw an AnalysisException if there are join hints so that we can provide
     // a better error message than a parser exception.
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+  | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints
   {:
     table.setJoinOp((JoinOperator) op);
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+  | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints KW_ON expr:e
   {:
     table.setJoinOp((JoinOperator) op);
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     table.setOnClause(e);
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+  | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints KW_USING LPAREN ident_list:colNames RPAREN
   {:
     table.setJoinOp((JoinOperator) op);
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     table.setUsingClause(colNames);
     list.add(table);
@@ -2381,30 +2382,42 @@
   ;
 
 opt_plan_hints ::=
-  COMMENTED_PLAN_HINTS:l
-  {:
-    ArrayList<String> hints = new ArrayList<String>();
-    String[] tokens = l.split(",");
-    for (String token: tokens) {
-      String trimmedToken = token.trim();
-      if (trimmedToken.length() > 0) hints.add(trimmedToken);
-    }
-    RESULT = hints;
-  :}
+  COMMENTED_PLAN_HINT_START plan_hint_list:hints COMMENTED_PLAN_HINT_END
+  {: RESULT = hints; :}
   /* legacy straight_join hint style */
   | KW_STRAIGHT_JOIN
-  {:
-    ArrayList<String> hints = new ArrayList<String>();
-    hints.add("straight_join");
-    RESULT = hints;
-  :}
+  {: RESULT = Lists.newArrayList(new PlanHint("straight_join")); :}
   /* legacy plan-hint style */
-  | LBRACKET ident_list:l RBRACKET
-  {: RESULT = l; :}
+  | LBRACKET plan_hint_list:hints RBRACKET
+  {: RESULT = hints; :}
+  | /* empty */
+  {: RESULT = Lists.newArrayList(); :}
+  ;
+
+plan_hint ::=
+  KW_STRAIGHT_JOIN
+  {: RESULT = new PlanHint("straight_join"); :}
+  | IDENT:name
+  {: RESULT = new PlanHint(name); :}
+  | IDENT:name LPAREN ident_list:args RPAREN
+  {: RESULT = new PlanHint(name, args); :}
   | /* empty */
   {: RESULT = null; :}
   ;
 
+plan_hint_list ::=
+  plan_hint:hint
+  {:
+    ArrayList<PlanHint> hints = Lists.newArrayList(hint);
+    RESULT = hints;
+  :}
+  | plan_hint_list:hints COMMA plan_hint:hint
+  {:
+    if (hint != null) hints.add(hint);
+    RESULT = hints;
+  :}
+  ;
+
 ident_list ::=
   ident_or_default:ident
   {:
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 1dacf48..902d100 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -64,7 +64,7 @@
   private final List<PartitionKeyValue> partitionKeyValues_;
 
   // User-supplied hints to control hash partitioning before the table sink in the plan.
-  private final List<String> planHints_;
+  private List<PlanHint> planHints_ = Lists.newArrayList();
 
   // False if the original insert statement had a query statement, true if we need to
   // auto-generate one (for insert into tbl()) during analysis.
@@ -124,6 +124,14 @@
   // clustering step.
   private boolean hasClusteredHint_ = false;
 
+  // For every column of the target table that is referenced in the optional 'sortby()'
+  // hint, this list will contain the corresponding result expr from 'resultExprs_'.
+  // Before insertion, all rows will be sorted by these exprs. If the list is empty, no
+  // additional sorting by non-partitioning columns will be performed. For Hdfs tables,
+  // the 'sortby()' hint must not contain partition columns. For Kudu tables, it must not
+  // contain primary key columns.
+  private List<Expr> sortByExprs_ = Lists.newArrayList();
+
   // Output expressions that produce the final results to write to the target table. May
   // include casts. Set in prepareExpressions().
   // If this is an INSERT on a non-Kudu table, it will contain one Expr for all
@@ -153,19 +161,19 @@
 
   public static InsertStmt createInsert(WithClause withClause, TableName targetTable,
       boolean overwrite, List<PartitionKeyValue> partitionKeyValues,
-      List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
+      List<PlanHint> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
     return new InsertStmt(withClause, targetTable, overwrite, partitionKeyValues,
         planHints, queryStmt, columnPermutation, false);
   }
 
   public static InsertStmt createUpsert(WithClause withClause, TableName targetTable,
-      List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
+      List<PlanHint> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
     return new InsertStmt(withClause, targetTable, false, null, planHints, queryStmt,
         columnPermutation, true);
   }
 
   protected InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite,
-      List<PartitionKeyValue> partitionKeyValues, List<String> planHints,
+      List<PartitionKeyValue> partitionKeyValues, List<PlanHint> planHints,
       QueryStmt queryStmt, List<String> columnPermutation, boolean isUpsert) {
     Preconditions.checkState(!isUpsert || (!overwrite && partitionKeyValues == null));
     withClause_ = withClause;
@@ -173,7 +181,7 @@
     originalTableName_ = targetTableName_;
     overwrite_ = overwrite;
     partitionKeyValues_ = partitionKeyValues;
-    planHints_ = planHints;
+    planHints_ = (planHints != null) ? planHints : new ArrayList<PlanHint>();
     queryStmt_ = queryStmt;
     needsGeneratedQueryStatement_ = (queryStmt == null);
     columnPermutation_ = columnPermutation;
@@ -210,6 +218,7 @@
     hasShuffleHint_ = false;
     hasNoShuffleHint_ = false;
     hasClusteredHint_ = false;
+    sortByExprs_.clear();
     resultExprs_.clear();
     mentionedColumns_.clear();
     primaryKeyExprs_.clear();
@@ -729,25 +738,27 @@
   }
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
-    if (planHints_ == null) return;
     if (!planHints_.isEmpty() && table_ instanceof HBaseTable) {
-      throw new AnalysisException("INSERT hints are only supported for inserting into " +
-          "Hdfs and Kudu tables.");
+      throw new AnalysisException(String.format("INSERT hints are only supported for " +
+          "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
     }
     boolean hasNoClusteredHint = false;
-    for (String hint: planHints_) {
-      if (hint.equalsIgnoreCase("SHUFFLE")) {
+    for (PlanHint hint: planHints_) {
+      if (hint.is("SHUFFLE")) {
         hasShuffleHint_ = true;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("NOSHUFFLE")) {
+      } else if (hint.is("NOSHUFFLE")) {
         hasNoShuffleHint_ = true;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("CLUSTERED")) {
+      } else if (hint.is("CLUSTERED")) {
         hasClusteredHint_ = true;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("NOCLUSTERED")) {
+      } else if (hint.is("NOCLUSTERED")) {
         hasNoClusteredHint = true;
         analyzer.setHasPlanHints();
+      } else if (hint.is("SORTBY")) {
+        analyzeSortByHint(hint);
+        analyzer.setHasPlanHints();
       } else {
         analyzer.addWarning("INSERT hint not recognized: " + hint);
       }
@@ -761,6 +772,51 @@
     }
   }
 
+  private void analyzeSortByHint(PlanHint hint) throws AnalysisException {
+    // HBase tables don't support insert hints at all (must be enforced by the caller).
+    Preconditions.checkState(!(table_ instanceof HBaseTable));
+
+    if (isUpsert_) {
+      throw new AnalysisException("SORTBY hint is not supported in UPSERT statements.");
+    }
+
+    List<String> columnNames = hint.getArgs();
+    Preconditions.checkState(!columnNames.isEmpty());
+    for (String columnName: columnNames) {
+      // Make sure it's not a Kudu primary key column or Hdfs partition column.
+      if (table_ instanceof KuduTable) {
+        KuduTable kuduTable = (KuduTable) table_;
+        if (kuduTable.isPrimaryKeyColumn(columnName)) {
+          throw new AnalysisException(String.format("SORTBY hint column list must not " +
+              "contain Kudu primary key column: '%s'", columnName));
+        }
+      } else {
+        for (Column tableColumn: table_.getClusteringColumns()) {
+          if (tableColumn.getName().equals(columnName)) {
+            throw new AnalysisException(String.format("SORTBY hint column list must " +
+                "not contain Hdfs partition column: '%s'", columnName));
+          }
+        }
+      }
+
+      // Find the matching column in the target table's column list (by name) and store
+      // the corresponding result expr in sortByExprs_.
+      boolean foundColumn = false;
+      List<Column> columns = table_.getNonClusteringColumns();
+      for (int i = 0; i < columns.size(); ++i) {
+        if (columns.get(i).getName().equals(columnName)) {
+          sortByExprs_.add(resultExprs_.get(i));
+          foundColumn = true;
+          break;
+        }
+      }
+      if (!foundColumn) {
+        throw new AnalysisException(String.format("Could not find SORTBY hint column " +
+            "'%s' in table.", columnName));
+      }
+    }
+  }
+
   @Override
   public ArrayList<Expr> getResultExprs() { return resultExprs_; }
 
@@ -772,7 +828,7 @@
 
   private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; }
 
-  public List<String> getPlanHints() { return planHints_; }
+  public List<PlanHint> getPlanHints() { return planHints_; }
   public TableName getTargetTableName() { return targetTableName_; }
   public Table getTargetTable() { return table_; }
   public void setTargetTable(Table table) { this.table_ = table; }
@@ -788,6 +844,7 @@
   public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
   public boolean hasClusteredHint() { return hasClusteredHint_; }
   public ArrayList<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
+  public List<Expr> getSortByExprs() { return sortByExprs_; }
 
   public List<String> getMentionedColumns() {
     List<String> result = Lists.newArrayList();
@@ -812,6 +869,7 @@
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
     partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
     primaryKeyExprs_ = Expr.substituteList(primaryKeyExprs_, smap, analyzer, true);
+    sortByExprs_ = Expr.substituteList(sortByExprs_, smap, analyzer, true);
   }
 
   @Override
@@ -840,8 +898,8 @@
       }
       strBuilder.append(" PARTITION (" + Joiner.on(", ").join(values) + ")");
     }
-    if (planHints_ != null) {
-      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(planHints_));
+    if (!planHints_.isEmpty()) {
+      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(getPlanHints()));
     }
     if (!needsGeneratedQueryStatement_) {
       strBuilder.append(" " + queryStmt_.toSql());
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
index 3ba2ad2..d5f0e70 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
@@ -137,12 +137,11 @@
     }
   }
 
-  // Transform <COL> = NULL into IsNull expr; <String COL> = '' into IsNull expr and
-  // <String COL> = 'String Value' into lower case.
+  // Transform <COL> = NULL into IsNull expr; <String COL> = '' into IsNull expr.
   // The reason is that COL = NULL is allowed for selecting the NULL
   // partition, but a COL = NULL predicate can never be true, so we
   // need to transform such predicates before feeding them into the
-  // partition pruner. Same logic goes to String transformation.
+  // partition pruner.
   private List<Expr> transformPartitionConjuncts(Analyzer analyzer, List<Expr> conjuncts)
       throws AnalysisException {
     List<Expr> transformedConjuncts = Lists.newArrayList();
@@ -162,9 +161,6 @@
           } else if (leftChild != null && stringChild != null) {
             if (stringChild.getStringValue().isEmpty()) {
               result = new IsNullPredicate(leftChild, false);
-            } else {
-              stringChild = new StringLiteral(stringChild.getStringValue().toLowerCase());
-              result.setChild(1, stringChild);
             }
           }
         }
diff --git a/fe/src/main/java/org/apache/impala/analysis/PlanHint.java b/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
new file mode 100644
index 0000000..d16919f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Class to parse and store query plan hints, which can occur in various places inside SQL
+ * query statements. A hint consist of a name and an optional list of arguments.
+ */
+public class PlanHint {
+  /// The plan hint name.
+  private final String name_;
+
+  /// Optional list of arguments.
+  private final List<String> args_;
+
+  public PlanHint(String name) {
+    name_ = name;
+    args_ = Lists.newArrayList();
+  }
+
+  public PlanHint(String name, List<String> args) {
+    name_ = name;
+    args_ = args;
+  }
+
+  /// Check whether this hint equals to a given string, ignoring case.
+  public boolean is(String s) { return name_.equalsIgnoreCase(s); }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null) return false;
+    if (getClass() != o.getClass()) return false;
+    PlanHint oh = (PlanHint) o;
+    return name_.equals(oh.name_) && args_.equals(oh.args_);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(name_);
+    if (!args_.isEmpty()) {
+      sb.append("(");
+      sb.append(Joiner.on(",").join(args_));
+      sb.append(")");
+    }
+    return sb.toString();
+  }
+
+  public List<String> getArgs() { return args_; }
+  public String toSql() { return toString(); }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectList.java b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
index 4c504bf..d7f12ff 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectList.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
@@ -22,13 +22,14 @@
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.rewrite.ExprRewriter;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
  * Select list items plus optional distinct clause and optional plan hints.
  */
 public class SelectList {
-  private List<String> planHints_;
+  private List<PlanHint> planHints_ = Lists.newArrayList();;
   private boolean isDistinct_;
 
   /////////////////////////////////////////
@@ -50,7 +51,7 @@
   }
 
   public SelectList(List<SelectListItem> items, boolean isDistinct,
-      List<String> planHints) {
+      List<PlanHint> planHints) {
     isDistinct_ = isDistinct;
     items_ = items;
     planHints_ = planHints;
@@ -60,8 +61,7 @@
    * C'tor for cloning.
    */
   public SelectList(SelectList other) {
-    planHints_ =
-        (other.planHints_ != null) ? Lists.newArrayList(other.planHints_) : null;
+    planHints_ = Lists.newArrayList(other.planHints_);
     items_ = Lists.newArrayList();
     for (SelectListItem item: other.items_) {
       items_.add(item.clone());
@@ -70,16 +70,20 @@
   }
 
   public List<SelectListItem> getItems() { return items_; }
-  public void setPlanHints(List<String> planHints) { planHints_ = planHints; }
-  public List<String> getPlanHints() { return planHints_; }
+
+  public void setPlanHints(List<PlanHint> planHints) {
+    Preconditions.checkNotNull(planHints);
+    planHints_ = planHints;
+  }
+
+  public List<PlanHint> getPlanHints() { return planHints_; }
   public boolean isDistinct() { return isDistinct_; }
   public void setIsDistinct(boolean value) { isDistinct_ = value; }
-  public boolean hasPlanHints() { return planHints_ != null; }
+  public boolean hasPlanHints() { return !planHints_.isEmpty(); }
 
   public void analyzePlanHints(Analyzer analyzer) {
-    if (planHints_ == null) return;
-    for (String hint: planHints_) {
-      if (!hint.equalsIgnoreCase("straight_join")) {
+    for (PlanHint hint: planHints_) {
+      if (!hint.is("straight_join")) {
         analyzer.addWarning("PLAN hint not recognized: " + hint);
       }
       analyzer.setIsStraightJoin();
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index d6bbfd2..975b70b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -82,10 +82,10 @@
   protected final Privilege priv_;
 
   protected JoinOperator joinOp_;
-  protected ArrayList<String> joinHints_;
+  protected List<PlanHint> joinHints_ = Lists.newArrayList();
   protected List<String> usingColNames_;
 
-  protected ArrayList<String> tableHints_;
+  protected List<PlanHint> tableHints_ = Lists.newArrayList();
   protected TReplicaPreference replicaPreference_;
   protected boolean randomReplica_;
 
@@ -156,13 +156,11 @@
     hasExplicitAlias_ = other.hasExplicitAlias_;
     priv_ = other.priv_;
     joinOp_ = other.joinOp_;
-    joinHints_ =
-        (other.joinHints_ != null) ? Lists.newArrayList(other.joinHints_) : null;
+    joinHints_ = Lists.newArrayList(other.joinHints_);
     onClause_ = (other.onClause_ != null) ? other.onClause_.clone() : null;
     usingColNames_ =
         (other.usingColNames_ != null) ? Lists.newArrayList(other.usingColNames_) : null;
-    tableHints_ =
-        (other.tableHints_ != null) ? Lists.newArrayList(other.tableHints_) : null;
+    tableHints_ = Lists.newArrayList(other.tableHints_);
     replicaPreference_ = other.replicaPreference_;
     randomReplica_ = other.randomReplica_;
     distrMode_ = other.distrMode_;
@@ -262,8 +260,8 @@
     return resolvedPath_.getRootTable();
   }
   public Privilege getPrivilege() { return priv_; }
-  public ArrayList<String> getJoinHints() { return joinHints_; }
-  public ArrayList<String> getTableHints() { return tableHints_; }
+  public List<PlanHint> getJoinHints() { return joinHints_; }
+  public List<PlanHint> getTableHints() { return tableHints_; }
   public Expr getOnClause() { return onClause_; }
   public List<String> getUsingClause() { return usingColNames_; }
   public void setJoinOp(JoinOperator op) { this.joinOp_ = op; }
@@ -271,12 +269,23 @@
   public void setUsingClause(List<String> colNames) { this.usingColNames_ = colNames; }
   public TableRef getLeftTblRef() { return leftTblRef_; }
   public void setLeftTblRef(TableRef leftTblRef) { this.leftTblRef_ = leftTblRef; }
-  public void setJoinHints(ArrayList<String> hints) { this.joinHints_ = hints; }
-  public void setTableHints(ArrayList<String> hints) { this.tableHints_ = hints; }
+
+  public void setJoinHints(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    joinHints_ = hints;
+  }
+
+  public void setTableHints(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    tableHints_ = hints;
+  }
+
   public boolean isBroadcastJoin() { return distrMode_ == DistributionMode.BROADCAST; }
+
   public boolean isPartitionedJoin() {
     return distrMode_ == DistributionMode.PARTITIONED;
   }
+
   public DistributionMode getDistributionMode() { return distrMode_; }
   public List<TupleId> getCorrelatedTupleIds() { return correlatedTupleIds_; }
   public boolean isAnalyzed() { return isAnalyzed_; }
@@ -336,7 +345,7 @@
   }
 
   private void analyzeTableHints(Analyzer analyzer) {
-    if (tableHints_ == null) return;
+    if (tableHints_.isEmpty()) return;
     if (!(this instanceof BaseTableRef)) {
       analyzer.addWarning("Table hints not supported for inline view and collections");
       return;
@@ -347,17 +356,17 @@
         !(getResolvedPath().destTable() instanceof HdfsTable)) {
       analyzer.addWarning("Table hints only supported for Hdfs tables");
     }
-    for (String hint: tableHints_) {
-      if (hint.equalsIgnoreCase("SCHEDULE_CACHE_LOCAL")) {
+    for (PlanHint hint: tableHints_) {
+      if (hint.is("SCHEDULE_CACHE_LOCAL")) {
         analyzer.setHasPlanHints();
         replicaPreference_ = TReplicaPreference.CACHE_LOCAL;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_DISK_LOCAL")) {
+      } else if (hint.is("SCHEDULE_DISK_LOCAL")) {
         analyzer.setHasPlanHints();
         replicaPreference_ = TReplicaPreference.DISK_LOCAL;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_REMOTE")) {
+      } else if (hint.is("SCHEDULE_REMOTE")) {
         analyzer.setHasPlanHints();
         replicaPreference_ = TReplicaPreference.REMOTE;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_RANDOM_REPLICA")) {
+      } else if (hint.is("SCHEDULE_RANDOM_REPLICA")) {
         analyzer.setHasPlanHints();
         randomReplica_ = true;
       } else {
@@ -369,9 +378,9 @@
   }
 
   private void analyzeJoinHints(Analyzer analyzer) throws AnalysisException {
-    if (joinHints_ == null) return;
-    for (String hint: joinHints_) {
-      if (hint.equalsIgnoreCase("BROADCAST")) {
+    if (joinHints_.isEmpty()) return;
+    for (PlanHint hint: joinHints_) {
+      if (hint.is("BROADCAST")) {
         if (joinOp_ == JoinOperator.RIGHT_OUTER_JOIN
             || joinOp_ == JoinOperator.FULL_OUTER_JOIN
             || joinOp_ == JoinOperator.RIGHT_SEMI_JOIN
@@ -384,7 +393,7 @@
         }
         distrMode_ = DistributionMode.BROADCAST;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("SHUFFLE")) {
+      } else if (hint.is("SHUFFLE")) {
         if (joinOp_ == JoinOperator.CROSS_JOIN) {
           throw new AnalysisException("CROSS JOIN does not support SHUFFLE.");
         }
@@ -545,7 +554,7 @@
     }
 
     StringBuilder output = new StringBuilder(" " + joinOp_.toString() + " ");
-    if(joinHints_ != null) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
+    if(!joinHints_.isEmpty()) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
     output.append(tableRefToSql());
     if (usingColNames_ != null) {
       output.append(" USING (").append(Joiner.on(", ").join(usingColNames_)).append(")");
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index be4ab6f..35f7e79 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -389,8 +389,9 @@
    * commented plan hint style such that hinted views created by Impala are readable by
    * Hive (parsed as a comment by Hive).
    */
-  public static String getPlanHintsSql(List<String> hints) {
-    if (hints == null || hints.isEmpty()) return "";
+  public static String getPlanHintsSql(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    if (hints.isEmpty()) return "";
     StringBuilder sb = new StringBuilder();
     sb.append("\n-- +");
     sb.append(Joiner.on(",").join(hints));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index c7d2097..79e5072 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -535,7 +535,7 @@
     for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
       for (TPartitionKeyValue kv: partitionSpec) {
         if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
-          targetValues.add(kv.getValue().toLowerCase());
+          targetValues.add(kv.getValue());
           // Same key was specified twice
           if (!keys.add(kv.getName().toLowerCase())) {
             return null;
@@ -569,7 +569,7 @@
           // backwards compatibility with Hive, and is clearly broken.
           if (value.isEmpty()) value = getNullPartitionKeyValue();
         }
-        if (!targetValues.get(i).equals(value.toLowerCase())) {
+        if (!targetValues.get(i).equals(value)) {
           matchFound = false;
           break;
         }
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 580f5e0..cdb620c 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -309,7 +309,7 @@
     SlotRef ref = (SlotRef) predicate.getChild(0);
     LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
 
-    // Cannot push prediates with null literal values (KUDU-1595).
+    // Cannot push predicates with null literal values (KUDU-1595).
     if (literal instanceof NullLiteral) return false;
 
     String colName = ref.getDesc().getColumn().getName();
@@ -379,8 +379,13 @@
     List<Object> values = Lists.newArrayList();
     for (int i = 1; i < predicate.getChildren().size(); ++i) {
       if (!(predicate.getChild(i).isLiteral())) return false;
-      Object value = getKuduInListValue((LiteralExpr) predicate.getChild(i));
-      Preconditions.checkNotNull(value == null);
+      LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
+
+      // Cannot push predicates with null literal values (KUDU-1595).
+      if (literal instanceof NullLiteral) return false;
+
+      Object value = getKuduInListValue(literal);
+      Preconditions.checkNotNull(value);
       values.add(value);
     }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 3369686..297e9b2 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -144,7 +144,7 @@
             rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
       }
       // Add optional sort node to the plan, based on clustered/noclustered plan hint.
-      createClusteringSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
+      createPreInsertSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
       // set up table sink for root fragment
       rootFragment.setSink(insertStmt.createDataSink());
       resultExprs = insertStmt.getResultExprs();
@@ -512,21 +512,26 @@
   }
 
   /**
-   * Insert a sort node on top of the plan, depending on the clustered/noclustered plan
-   * hint. This will sort the data produced by 'inputFragment' by the clustering columns
-   * (key columns for Kudu tables), so that partitions can be written sequentially in the
-   * table sink.
+   * Insert a sort node on top of the plan, depending on the clustered/noclustered/sortby
+   * plan hint. If clustering is enabled in insertStmt, then the ordering columns will
+   * start with the clustering columns (key columns for Kudu tables), so that partitions
+   * can be written sequentially in the table sink. Any additional non-clustering columns
+   * specified by the sortby hint will be added to the ordering columns and after any
+   * clustering columns. If neither clustering nor a sortby hint are specified, then no
+   * sort node will be added to the plan.
    */
-  public void createClusteringSort(InsertStmt insertStmt, PlanFragment inputFragment,
+  public void createPreInsertSort(InsertStmt insertStmt, PlanFragment inputFragment,
        Analyzer analyzer) throws ImpalaException {
-    if (!insertStmt.hasClusteredHint()) return;
+    List<Expr> orderingExprs = Lists.newArrayList();
 
-    List<Expr> orderingExprs;
-    if (insertStmt.getTargetTable() instanceof KuduTable) {
-      orderingExprs = Lists.newArrayList(insertStmt.getPrimaryKeyExprs());
-    } else {
-      orderingExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
+    if (insertStmt.hasClusteredHint()) {
+      if (insertStmt.getTargetTable() instanceof KuduTable) {
+        orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
+      } else {
+        orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
+      }
     }
+    orderingExprs.addAll(insertStmt.getSortByExprs());
     // Ignore constants for the sake of clustering.
     Expr.removeConstants(orderingExprs);
 
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 982e9a2..6d1a773 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -301,7 +301,6 @@
 %}
 
 LineTerminator = \r|\n|\r\n
-NonTerminator = [^\r\n]
 Whitespace = {LineTerminator} | [ \t\f]
 
 // Order of rules to resolve ambiguity:
@@ -321,14 +320,30 @@
 SingleQuoteStringLiteral = \'(\\.|[^\\\'])*\'
 DoubleQuoteStringLiteral = \"(\\.|[^\\\"])*\"
 
+EolHintBegin = "--" " "* "+"
+CommentedHintBegin = "/*" " "* "+"
+CommentedHintEnd = "*/"
+
 // Both types of plan hints must appear within a single line.
-TraditionalCommentedPlanHints = "/*" [ ]* "+" [^\r\n*]* "*/"
-// Must end with a line terminator.
-EndOfLineCommentedPlanHints = "--" [ ]* "+" {NonTerminator}* {LineTerminator}
+HintContent = " "* "+" [^\r\n]*
 
 Comment = {TraditionalComment} | {EndOfLineComment}
-TraditionalComment = "/*" ~"*/"
-EndOfLineComment = "--" {NonTerminator}* {LineTerminator}?
+
+// Match anything that has a comment end (*/) in it.
+ContainsCommentEnd = [^]* "*/" [^]*
+// Match anything that has a line terminator in it.
+ContainsLineTerminator = [^]* {LineTerminator} [^]*
+
+// A traditional comment is anything that starts and ends like a comment and has neither a
+// plan hint inside nor a CommentEnd (*/).
+TraditionalComment = "/*" !({HintContent}|{ContainsCommentEnd}) "*/"
+// Similar for a end-of-line comment.
+EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminator}?
+
+// This additional state is needed because newlines signal the end of a end-of-line hint
+// if one has been started earlier. Hence we need to discern between newlines within and
+// outside of end-of-line hints.
+%state EOLHINT
 
 %%
 // Put '...' before '.'
@@ -412,18 +427,22 @@
   return newToken(SqlParserSymbols.STRING_LITERAL, yytext().substring(1, yytext().length()-1));
 }
 
-{TraditionalCommentedPlanHints} {
-  String text = yytext();
-  // Remove everything before the first '+' as well as the trailing "*/"
-  String hintStr = text.substring(text.indexOf('+') + 1, text.length() - 2);
-  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINTS, hintStr.trim());
+{CommentedHintBegin} {
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_START, null);
 }
 
-{EndOfLineCommentedPlanHints} {
-  String text = yytext();
-  // Remove everything before the first '+'
-  String hintStr = text.substring(text.indexOf('+') + 1);
-  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINTS, hintStr.trim());
+{CommentedHintEnd} {
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_END, null);
+}
+
+{EolHintBegin} {
+  yybegin(EOLHINT);
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_START, null);
+}
+
+<EOLHINT> {LineTerminator} {
+  yybegin(YYINITIAL);
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_END, null);
 }
 
 {Comment} { /* ignore */ }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 935edc5..5a08f98 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -424,7 +424,7 @@
     AnalyzesOk("alter table functional.stringpartitionkey PARTITION " +
                "(string_col='partition1') set fileformat parquet");
     AnalyzesOk("alter table functional.stringpartitionkey PARTITION " +
-               "(string_col='PaRtiTion1') set location '/a/b/c'");
+               "(string_col='partition1') set location '/a/b/c'");
     AnalyzesOk("alter table functional.alltypes PARTITION (year=2010, month=11) " +
                "set tblproperties('a'='1')");
     AnalyzesOk("alter table functional.alltypes PARTITION (year<=2010, month=11) " +
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 11f6842..9b641a0 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1604,14 +1604,6 @@
           String.format("select * from functional.alltypes a join %sbadhint%s " +
               "functional.alltypes b using (int_col)", prefix, suffix),
           "JOIN hint not recognized: badhint");
-      // Hints must be comma separated. Legacy-style hint does not parse because
-      // of space-separated identifiers.
-      if (!prefix.contains("[")) {
-        AnalyzesOk(String.format(
-            "select * from functional.alltypes a join %sbroadcast broadcast%s " +
-                "functional.alltypes b using (int_col)", prefix, suffix),
-            "JOIN hint not recognized: broadcast broadcast");
-      }
       AnalysisError(
           String.format("select * from functional.alltypes a cross join %sshuffle%s " +
           "functional.alltypes b", prefix, suffix),
@@ -1744,7 +1736,8 @@
       AnalysisError(String.format(
           "insert into table functional_hbase.alltypes %sshuffle%s " +
           "select * from functional_hbase.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs and Kudu tables.");
+          "INSERT hints are only supported for inserting into Hdfs and Kudu tables: " +
+          "functional_hbase.alltypes");
       // Conflicting plan hints.
       AnalysisError("insert into table functional.alltypessmall " +
           "partition (year, month) /* +shuffle,noshuffle */ " +
@@ -1766,6 +1759,46 @@
           "insert into table functional.alltypessmall partition (year, month) " +
           "/* +clustered,noclustered */ select * from functional.alltypes", prefix,
           suffix), "Conflicting INSERT hints: clustered and noclustered");
+
+      // Below are tests for hints that are not supported by the legacy syntax.
+      if (prefix == "[") continue;
+
+      // Tests for sortby hint
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(int_col)%s select * from functional.alltypes",
+          prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %sshuffle,clustered,sortby(int_col)%s select * from " +
+          "functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(int_col, bool_col)%s select * from " +
+          "functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %sshuffle,clustered,sortby(int_col,bool_col)%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %sshuffle,sortby(int_col,bool_col),clustered%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(int_col,bool_col),shuffle,clustered%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      // Column in sortby hint must exist.
+      AnalysisError(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(foo)%s select * from functional.alltypes",
+          prefix, suffix), "Could not find SORTBY hint column 'foo' in table.");
+      // Column in sortby hint must not be a Hdfs partition column.
+      AnalysisError(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(year)%s select * from " +
+          "functional.alltypes", prefix, suffix),
+          "SORTBY hint column list must not contain Hdfs partition column: 'year'");
+      // Column in sortby hint must not be a Kudu primary key column.
+      AnalysisError(String.format("insert into functional_kudu.alltypessmall " +
+          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
+          "SORTBY hint column list must not contain Kudu primary key column: 'id'");
+      // sortby() hint is not supported in UPSERT queries
+      AnalysisError(String.format("upsert into functional_kudu.alltypessmall " +
+          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
+          "SORTBY hint is not supported in UPSERT statements.");
     }
 
     // Multiple non-conflicting hints and case insensitivity of hints.
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index ab2eaec..17a90b1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -209,6 +209,13 @@
     ParsesOk("/* select 1; */ select 1");
     ParsesOk("/** select 1; */ select 1");
     ParsesOk("/* select */ select 1 /* 1 */");
+    ParsesOk("select 1 /* sortby(() */");
+    // Empty columns list in sortby hint
+    ParserError("select 1 /*+ sortby() */");
+    // Mismatching parentheses
+    ParserError("select 1 /*+ sortby(() */");
+    ParserError("select 1 /*+ sortby(a) \n");
+    ParserError("select 1 --+ sortby(a) */\n from t");
   }
 
   /**
@@ -230,6 +237,9 @@
     ParserError("-- baz /*\nselect 1*/");
     ParsesOk("select -- blah\n 1");
     ParsesOk("select -- select 1\n 1");
+    ParsesOk("select 1 -- sortby(()");
+    // Mismatching parentheses
+    ParserError("select 1 -- +sortby(()\n");
   }
 
   /**
@@ -241,10 +251,10 @@
     SelectStmt selectStmt = (SelectStmt) ParsesOk(stmt);
     Preconditions.checkState(selectStmt.getTableRefs().size() > 1);
     List<String> actualHints = Lists.newArrayList();
-    assertEquals(null, selectStmt.getTableRefs().get(0).getJoinHints());
+    assertTrue(selectStmt.getTableRefs().get(0).getJoinHints().isEmpty());
     for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
-      List<String> hints = selectStmt.getTableRefs().get(i).getJoinHints();
-      if (hints != null) actualHints.addAll(hints);
+      List<PlanHint> hints = selectStmt.getTableRefs().get(i).getJoinHints();
+      for (PlanHint hint: hints) actualHints.add(hint.toString());
     }
     if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -255,8 +265,8 @@
     Preconditions.checkState(selectStmt.getTableRefs().size() > 0);
     List<String> actualHints = Lists.newArrayList();
     for (int i = 0; i < selectStmt.getTableRefs().size(); ++i) {
-      List<String> hints = selectStmt.getTableRefs().get(i).getTableHints();
-      if (hints != null) actualHints.addAll(hints);
+      List<PlanHint> hints = selectStmt.getTableRefs().get(i).getTableHints();
+      for (PlanHint hint: hints) actualHints.add(hint.toString());
     }
     if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -267,10 +277,10 @@
     Preconditions.checkState(selectStmt.getTableRefs().size() > 0);
     List<String> actualHints = Lists.newArrayList();
     for (int i = 0; i < selectStmt.getTableRefs().size(); ++i) {
-      List<String> joinHints = selectStmt.getTableRefs().get(i).getJoinHints();
-      if (joinHints != null) actualHints.addAll(joinHints);
-      List<String> tableHints = selectStmt.getTableRefs().get(i).getTableHints();
-      if (tableHints != null) actualHints.addAll(tableHints);
+      List<PlanHint> joinHints = selectStmt.getTableRefs().get(i).getJoinHints();
+      for (PlanHint hint: joinHints) actualHints.add(hint.toString());
+      List<PlanHint> tableHints = selectStmt.getTableRefs().get(i).getTableHints();
+      for (PlanHint hint: tableHints) actualHints.add(hint.toString());
     }
     if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -282,8 +292,10 @@
    */
   private void TestSelectListHints(String stmt, String... expectedHints) {
     SelectStmt selectStmt = (SelectStmt) ParsesOk(stmt);
-    List<String> actualHints = selectStmt.getSelectList().getPlanHints();
-    if (actualHints == null) actualHints = Lists.newArrayList((String) null);
+    List<String> actualHints = Lists.newArrayList();
+    List<PlanHint> hints = selectStmt.getSelectList().getPlanHints();
+    for (PlanHint hint: hints) actualHints.add(hint.toString());
+    if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
   }
 
@@ -292,8 +304,10 @@
    */
   private void TestInsertHints(String stmt, String... expectedHints) {
     InsertStmt insertStmt = (InsertStmt) ParsesOk(stmt);
-    List<String> actualHints = insertStmt.getPlanHints();
-    if (actualHints == null) actualHints = Lists.newArrayList((String) null);
+    List<String> actualHints = Lists.newArrayList();
+    List<PlanHint> hints = insertStmt.getPlanHints();
+    for (PlanHint hint: hints) actualHints.add(hint.toString());
+    if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
   }
 
@@ -407,22 +421,26 @@
           suffix), "schedule_cache_local", "schedule_random_replica", "broadcast",
           "schedule_remote");
 
+      TestSelectListHints(String.format(
+          "select %sfoo,bar,baz%s * from functional.alltypes a", prefix, suffix),
+          "foo", "bar", "baz");
+
       // Test select-list hints (e.g., straight_join). The legacy-style hint has no
       // prefix and suffix.
-      if (prefix.contains("[")) {
-        prefix = "";
-        suffix = "";
-      }
-      TestSelectListHints(String.format(
-          "select %sstraight_join%s * from functional.alltypes a", prefix, suffix),
-          "straight_join");
-      // Only the new hint-style is recognized
-      if (!prefix.equals("")) {
+      {
+        String localPrefix = prefix;
+        String localSuffix = suffix;
+        if (prefix == "[") {
+          localPrefix = "";
+          localSuffix = "";
+        }
         TestSelectListHints(String.format(
-            "select %sfoo,bar,baz%s * from functional.alltypes a", prefix, suffix),
-            "foo", "bar", "baz");
+            "select %sstraight_join%s * from functional.alltypes a", localPrefix,
+            localSuffix), "straight_join");
       }
-      if (prefix.isEmpty()) continue;
+
+      // Below are tests for hints that are not supported by the legacy syntax.
+      if (prefix == "[") continue;
 
       // Test mixing commented hints and comments.
       for (String[] commentStyle: commentStyles) {
@@ -439,6 +457,22 @@
         TestSelectListHints(query, "straight_join");
         TestJoinHints(query, "shuffle");
       }
+
+      // Tests for hints with arguments.
+      TestInsertHints(String.format(
+          "insert into t %ssortby(a)%s select * from t", prefix, suffix),
+          "sortby(a)");
+      TestInsertHints(String.format(
+          "insert into t %sclustered,shuffle,sortby(a)%s select * from t", prefix,
+          suffix), "clustered", "shuffle", "sortby(a)");
+      TestInsertHints(String.format(
+          "insert into t %ssortby(a,b)%s select * from t", prefix, suffix),
+          "sortby(a,b)");
+      TestInsertHints(String.format(
+          "insert into t %ssortby(a  , b)%s select * from t", prefix, suffix),
+          "sortby(a,b)");
+      ParserError(String.format(
+          "insert into t %ssortby(  a  ,  , ,,, b  )%s select * from t", prefix, suffix));
     }
     // No "+" at the beginning so the comment is not recognized as a hint.
     TestJoinHints("select * from functional.alltypes a join /* comment */" +
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index b5cf446..9514cc6 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -463,11 +463,11 @@
       // Insert hint.
       testToSql(String.format(
           "insert into functional.alltypes(int_col, bool_col) " +
-          "partition(year, month) %snoshuffle%s " +
+          "partition(year, month) %snoshuffle,sortby(int_col)%s " +
           "select int_col, bool_col, year, month from functional.alltypes",
           prefix, suffix),
           "INSERT INTO TABLE functional.alltypes(int_col, bool_col) " +
-              "PARTITION (year, month) \n-- +noshuffle\n " +
+              "PARTITION (year, month) \n-- +noshuffle,sortby(int_col)\n " +
           "SELECT int_col, bool_col, year, month FROM functional.alltypes");
 
       // Table hint
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index c3b8648..9831d21 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -46,7 +46,7 @@
 hdfs == 2.0.2
   docopt == 0.6.2
   execnet == 1.4.0
-impyla == 0.11.2
+impyla == 0.14.0
   bitarray == 0.8.1
   sasl == 0.1.3
   six == 1.9.0
diff --git a/testdata/bin/check-hbase-nodes.py b/testdata/bin/check-hbase-nodes.py
index 999a657..ffe7a7c 100755
--- a/testdata/bin/check-hbase-nodes.py
+++ b/testdata/bin/check-hbase-nodes.py
@@ -30,7 +30,7 @@
 
 from contextlib import closing
 from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NoNodeError, ConnectionLoss
 from kazoo.handlers.threading import KazooTimeoutError
 
 LOGGER = logging.getLogger('hbase_check')
@@ -43,6 +43,7 @@
 ZK_HOSTS = '127.0.0.1:2181'
 HBASE_NODES = ['/hbase/master', '/hbase/rs']
 ADMIN_USER = 'admin'
+MAX_ZOOKEEPER_CONNECTION_RETRIES = 3
 
 
 def parse_args():
@@ -128,12 +129,31 @@
         timeout_seconds: Number of seconds to attempt to get node
 
     Returns:
-        0 success, or else the number of unresponsive nodes
+        0 success, or else the number of errors
     """
-    with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
-        errors = sum([check_znode(node, zk_client, timeout) for node in nodes])
-        zk_client.stop()
-    return errors
+    connection_retries = 0
+
+    while True:
+        with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
+            try:
+                return sum([check_znode(node, zk_client, timeout) for node in nodes])
+            except ConnectionLoss as e:
+                connection_retries += 1
+                if connection_retries > MAX_ZOOKEEPER_CONNECTION_RETRIES:
+                    LOGGER.error("Max connection retries exceeded: {0}".format(str(e)))
+                    raise
+                else:
+                    err_msg = ("Zookeeper connection loss: retrying connection "
+                               "({0} of {1} attempts)")
+                    LOGGER.warn(err_msg.format(connection_retries,
+                                               MAX_ZOOKEEPER_CONNECTION_RETRIES))
+                    time.sleep(1)
+            except Exception as e:
+                LOGGER.error("Unexpected error checking HBase node: {0}".format(str(e)))
+                raise
+            finally:
+                LOGGER.info("Stopping Zookeeper client")
+                zk_client.stop()
 
 
 def is_hdfs_running(host, admin_user):
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index f201d0a..8e3adb9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -705,3 +705,76 @@
 00:SCAN HDFS [functional.alltypesnopart]
    partitions=1/1 files=0 size=0B
 ====
+# IMPALA-4163: sortby hint in insert statement adds sort node.
+insert into table functional.alltypes partition(year, month)
+       /*+ sortby(int_col, bool_col),shuffle */
+       select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4163: sortby hint in insert statement with noshuffle adds sort node.
+insert into table functional.alltypes partition(year, month)
+       /*+ sortby(int_col, bool_col),noshuffle */
+       select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4163: sortby hint in insert statement adds ordering columns to clustering sort.
+insert into table functional.alltypes partition(year, month)
+       /*+ sortby(int_col, bool_col),clustered */
+       select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 776882d..9f2270f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -314,3 +314,15 @@
    predicates: CAST(a.id AS STRING) > '123'
    kudu predicates: a.id > 10
 ====
+# IMPALA-4662: Kudu analysis failure for NULL literal in IN list
+# NULL literal in values list results in applying predicate at scan node
+select id from functional_kudu.alltypestiny where
+id in (1, null) and string_col in (null) and bool_col in (null) and double_col in (null)
+and float_col in (null) and tinyint_col in (null) and smallint_col in (null) and
+bigint_col in (null)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.alltypestiny]
+   predicates: id IN (1, NULL), bigint_col IN (NULL), bool_col IN (NULL), double_col IN (NULL), float_col IN (NULL), smallint_col IN (NULL), string_col IN (NULL), tinyint_col IN (NULL)
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index 5601b35..9dfaf34 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -945,3 +945,29 @@
 ---- RESULTS
 : 100
 ====
+---- QUERY
+# IMPALA-4163: insert into table sortby() plan hint
+insert into table alltypesinsert
+partition (year, month) /*+ clustered,shuffle,sortby(int_col, bool_col) */
+select * from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 2
+year=2009/month=2/: 2
+year=2009/month=3/: 2
+year=2009/month=4/: 2
+====
+---- QUERY
+# IMPALA-4163: insert into table sortby() plan hint
+insert into table alltypesnopart_insert
+/*+ clustered,shuffle,sortby(int_col, bool_col) */
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
+double_col, date_string_col, string_col, timestamp_col from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+: 8
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test b/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
index 36104c3..1aa5c51 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
@@ -133,3 +133,23 @@
 ---- TYPES
 STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
 ====
+---- QUERY
+# Tests case-sensitivity of string-typed partition columns.
+alter table p1 add partition (j=2,k="D");
+alter table p1 add partition (j=2,k="E");
+alter table p1 add partition (j=2,k="F");
+====
+---- QUERY
+show partitions p1
+---- RESULTS
+'NULL','g',-1,0,regex:.+,regex:.+,regex:.+,regex:.+,regex:.+,regex:.*/test-warehouse/.+/p1/j=__HIVE_DEFAULT_PARTITION__/k=g
+'2','D',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=D
+'2','E',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=E
+'2','F',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=F
+'2','d',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=d
+'2','e',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=e
+'2','f',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=f
+'Total','',0,0,regex:.+,regex:.+,'','','',''
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index fb24f65..690c999 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -48,7 +48,8 @@
 #  7) If a query errored, verify that memory was overcommitted during execution and the
 #     error is a mem limit exceeded error. There is no other reason a query should error
 #     and any such error will cause the stress test to stop.
-#  8) Verify the result set hash of successful queries.
+#  8) Verify the result set hash of successful queries if there are no DML queries in the
+#     current run.
 
 from __future__ import print_function
 
@@ -74,6 +75,7 @@
 
 import tests.util.test_file_parser as test_file_parser
 from tests.comparison.cluster import Timeout
+from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
 from tests.comparison.model_translator import SqlWriter
 from tests.comparison.query_generator import QueryGenerator
 from tests.comparison.query_profile import DefaultProfile
@@ -91,7 +93,7 @@
 MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
 
 # The version of the file format containing the collected query runtime info.
-RUNTIME_INFO_FILE_VERSION = 2
+RUNTIME_INFO_FILE_VERSION = 3
 
 
 def create_and_start_daemon_thread(fn, name):
@@ -112,7 +114,8 @@
   thread_names = dict([(t.ident, t.name) for t in threading.enumerate()])
   stacks = list()
   for thread_id, stack in sys._current_frames().items():
-    stacks.append("\n# Thread: %s(%d)"
+    stacks.append(
+        "\n# Thread: %s(%d)"
         % (thread_names.get(thread_id, "No name"), thread_id))
     for filename, lineno, name, line in traceback.extract_stack(stack):
       stacks.append('File: "%s", line %d, in %s' % (filename, lineno, name))
@@ -128,8 +131,8 @@
 
 def print_crash_info_if_exists(impala, start_time):
   """If any impalads are found not running, they will assumed to have crashed and an
-     error message will be printed to stderr for each stopped impalad. Returns a value
-     that evaluates to True if any impalads are stopped.
+  error message will be printed to stderr for each stopped impalad. Returns a value
+  that evaluates to True if any impalads are stopped.
   """
   max_attempts = 5
   for remaining_attempts in xrange(max_attempts - 1, -1, -1):
@@ -137,11 +140,12 @@
       crashed_impalads = impala.find_crashed_impalads(start_time)
       break
     except Timeout as e:
-      LOG.info("Timeout checking if impalads crashed: %s." % e +
-          (" Will retry." if remaining_attempts else ""))
+      LOG.info(
+          "Timeout checking if impalads crashed: %s."
+          % e + (" Will retry." if remaining_attempts else ""))
   else:
-    LOG.error("Aborting after %s failed attempts to check if impalads crashed",
-        max_attempts)
+    LOG.error(
+        "Aborting after %s failed attempts to check if impalads crashed", max_attempts)
     raise e
   for message in crashed_impalads.itervalues():
     print(message, file=sys.stderr)
@@ -164,20 +168,20 @@
 
 class MemBroker(object):
   """Provides memory usage coordination for clients running in different processes.
-     The broker fulfills reservation requests by blocking as needed so total memory
-     used by clients never exceeds the total available memory (including an
-     'overcommitable' amount).
+  The broker fulfills reservation requests by blocking as needed so total memory
+  used by clients never exceeds the total available memory (including an
+  'overcommitable' amount).
 
-     The lock built in to _available is also used to protect access to other members.
+  The lock built in to _available is also used to protect access to other members.
 
-     The state stored in this class is actually an encapsulation of part of the state
-     of the StressRunner class below. The state here is separated for clarity.
+  The state stored in this class is actually an encapsulation of part of the state
+  of the StressRunner class below. The state here is separated for clarity.
   """
 
   def __init__(self, real_mem_mb, overcommitable_mem_mb):
     """'real_mem_mb' memory should be the amount of memory that each impalad is able
-       to use. 'overcommitable_mem_mb' is the amount of memory that will be dispensed
-       over the 'real' amount.
+    to use. 'overcommitable_mem_mb' is the amount of memory that will be dispensed
+    over the 'real' amount.
     """
     self._total_mem_mb = real_mem_mb + overcommitable_mem_mb
     self._available = Value("i", self._total_mem_mb)
@@ -210,16 +214,16 @@
   @contextmanager
   def reserve_mem_mb(self, mem_mb):
     """Blocks until the requested amount of memory is available and taken for the caller.
-       This function should be used in a 'with' block. The taken memory will
-       automatically be released when the 'with' context exits. A numeric id is returned
-       so clients can compare against 'last_overcommitted_reservation_id' to see if
-       memory was overcommitted since the reservation was obtained.
+    This function should be used in a 'with' block. The taken memory will
+    automatically be released when the 'with' context exits. A numeric id is returned
+    so clients can compare against 'last_overcommitted_reservation_id' to see if
+    memory was overcommitted since the reservation was obtained.
 
-       with broker.reserve_mem_mb(100) as reservation_id:
-         # Run query using 100 MB of memory
-         if <query failed>:
-           # Immediately check broker.was_overcommitted(reservation_id) to see if
-           # memory was overcommitted.
+    with broker.reserve_mem_mb(100) as reservation_id:
+      # Run query using 100 MB of memory
+      if <query failed>:
+        # Immediately check broker.was_overcommitted(reservation_id) to see if
+        # memory was overcommitted.
     """
     reservation_id = self._wait_until_reserved(mem_mb)
     try:
@@ -232,8 +236,9 @@
       with self._available.get_lock():
         if req <= self._available.value:
           self._available.value -= req
-          LOG.debug("Reserved %s MB; %s MB available; %s MB overcommitted", req,
-              self._available.value, self.overcommitted_mem_mb)
+          LOG.debug(
+              "Reserved %s MB; %s MB available; %s MB overcommitted",
+              req, self._available.value, self.overcommitted_mem_mb)
           reservation_id = self._next_reservation_id.value
           increment(self._next_reservation_id)
           if self.overcommitted_mem_mb > 0:
@@ -244,23 +249,24 @@
   def _release(self, req):
     with self._available.get_lock():
       self._available.value += req
-      LOG.debug("Released %s MB; %s MB available; %s MB overcommitted", req,
-          self._available.value, self.overcommitted_mem_mb)
+      LOG.debug(
+          "Released %s MB; %s MB available; %s MB overcommitted",
+          req, self._available.value, self.overcommitted_mem_mb)
 
   def was_overcommitted(self, reservation_id):
     """Returns True if memory was overcommitted since the given reservation was made.
-       For an accurate return value, this should be called just after the query ends
-       or while the query is still running.
+    For an accurate return value, this should be called just after the query ends
+    or while the query is still running.
     """
     return reservation_id <= self._last_overcommitted_reservation_id.value
 
 
 class StressRunner(object):
   """This class contains functionality related to producing/consuming queries for the
-     purpose of stress testing Impala.
+  purpose of stress testing Impala.
 
-     Queries will be executed in separate processes since python threading is limited
-     to the use of a single CPU.
+  Queries will be executed in separate processes since python threading is limited
+  to the use of a single CPU.
   """
 
   # This is the point at which the work queue will block because it is full.
@@ -270,6 +276,8 @@
     self.use_kerberos = False
     self.common_query_options = {}
     self._mem_broker = None
+    self._verify_results = True
+    self._select_probability = None
 
     # Synchronized blocking work queue for producer/consumers.
     self._query_queue = Queue(self.WORK_QUEUE_CAPACITY)
@@ -306,7 +314,8 @@
     self._num_successive_errors = Value("i", 0)
     self.result_hash_log_dir = gettempdir()
 
-    self._status_headers = [" Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
+    self._status_headers = [
+        " Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
         "Err", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
 
     self._num_queries_to_run = None
@@ -315,25 +324,31 @@
     self._query_consumer_thread = None
     self._mem_polling_thread = None
 
-  def run_queries(self, queries, impala, num_queries_to_run, mem_overcommit_pct,
-      should_print_status):
+  def run_queries(
+      self, queries, impala, num_queries_to_run, mem_overcommit_pct, should_print_status,
+      verify_results, select_probability
+  ):
     """Runs queries randomly chosen from 'queries' and stops after 'num_queries_to_run'
-       queries have completed.
+    queries have completed. 'select_probability' should be float between 0 and 1, it
+    determines the likelihood of choosing a select query (as opposed to a DML query,
+    for example).
 
-       Before a query is run, a mem limit will be chosen. 'spill_probability' determines
-       the likelihood of choosing a mem limit that will cause spilling. To induce
-       spilling, a value is randomly chosen below the min memory needed to avoid spilling
-       but above the min memory needed with spilling. So the min/max query memory
-       requirements must be determined before calling this method.
+    Before a query is run, a mem limit will be chosen. 'spill_probability' determines
+    the likelihood of choosing a mem limit that will cause spilling. To induce
+    spilling, a value is randomly chosen below the min memory needed to avoid spilling
+    but above the min memory needed with spilling. So the min/max query memory
+    requirements must be determined before calling this method.
 
-       If 'mem_overcommit_pct' is zero, an exception will be raised if any queries
-       fail for any reason other than cancellation (controlled by the 'cancel_probability'
-       property), since each query should have enough memory to run successfully. If
-       non-zero, failures due to insufficient memory will be ignored if memory was
-       overcommitted at any time during execution.
+    If 'mem_overcommit_pct' is zero, an exception will be raised if any queries
+    fail for any reason other than cancellation (controlled by the 'cancel_probability'
+    property), since each query should have enough memory to run successfully. If
+    non-zero, failures due to insufficient memory will be ignored if memory was
+    overcommitted at any time during execution.
 
-       If a query completes without error, the result will be verified. An error
-       will be raised upon a result mismatch.
+    If a query completes without error, the result will be verified if 'verify_results'
+    is True. An error will be raised upon a result mismatch. 'verify_results' should be
+    false for the case where the expected results are not known in advance, if we are
+    running DML queries, for example.
     """
     # TODO: The state from a previous run should be cleared out. This isn't really a
     #       problem now because the one caller (main()) never calls a second time.
@@ -346,9 +361,13 @@
     # If there is a crash, start looking for errors starting from this time.
     start_time = datetime.now()
 
-    self._mem_broker = MemBroker(impala.min_impalad_mem_mb,
+    self._mem_broker = MemBroker(
+        impala.min_impalad_mem_mb,
         int(impala.min_impalad_mem_mb * mem_overcommit_pct / 100))
 
+    self._verify_results = verify_results
+    self._select_probability = select_probability
+
     # Print the status to show the state before starting.
     if should_print_status:
       self._print_status_header()
@@ -363,9 +382,11 @@
 
     # Wait for everything to finish.
     sleep_secs = 0.1
-    while self._query_producer_thread.is_alive() \
-        or self._query_consumer_thread.is_alive() \
-        or self._query_runners:
+    while (
+        self._query_producer_thread.is_alive() or
+        self._query_consumer_thread.is_alive() or
+        self._query_runners
+    ):
       if self._query_producer_thread.error or self._query_consumer_thread.error:
         # This is bad enough to abort early. A failure here probably means there's a
         # bug in this script. The mem poller could be checked for an error too. It is
@@ -382,9 +403,12 @@
                 sys.exit(runner.exitcode)
               LOG.info("No crashes detected")
               checked_for_crashes = True
-            if self._num_successive_errors.value \
-                >= self.num_successive_errors_needed_to_abort:
-              print("Aborting due to %s successive errors encountered"
+            if (
+                self._num_successive_errors.value >=
+                self.num_successive_errors_needed_to_abort
+            ):
+              print(
+                  "Aborting due to %s successive errors encountered"
                   % self._num_successive_errors.value, file=sys.stderr)
               sys.exit(1)
           del self._query_runners[idx]
@@ -392,9 +416,11 @@
       if should_print_status:
         last_report_secs += sleep_secs
         if last_report_secs > 5:
-          if not self._query_producer_thread.is_alive() \
-              or not self._query_consumer_thread.is_alive() \
-              or not self._query_runners:
+          if (
+              not self._query_producer_thread.is_alive() or
+              not self._query_consumer_thread.is_alive() or
+              not self._query_runners
+          ):
             LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive())
             LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive())
             LOG.debug("Queue size: %s" % self._query_queue.qsize())
@@ -412,15 +438,32 @@
 
   def _start_producing_queries(self, queries):
     def enqueue_queries():
+      # Generate a dict(query type -> list of queries).
+      queries_by_type = {}
+      for query in queries:
+        if query.query_type not in queries_by_type:
+          queries_by_type[query.query_type] = []
+        queries_by_type[query.query_type].append(query)
       try:
         for _ in xrange(self._num_queries_to_run):
-          self._query_queue.put(choice(queries))
+          # First randomly determine a query type, then choose a random query of that
+          # type.
+          if (
+              QueryType.SELECT in queries_by_type and
+              (len(queries_by_type.keys()) == 1 or random() < self._select_probability)
+          ):
+            result = choice(queries_by_type[QueryType.SELECT])
+          else:
+            query_type = choice([
+                key for key in queries_by_type if key != QueryType.SELECT])
+            result = choice(queries_by_type[query_type])
+          self._query_queue.put(result)
       except Exception as e:
         LOG.error("Error producing queries: %s", e)
         current_thread().error = e
         raise e
-    self._query_producer_thread = create_and_start_daemon_thread(enqueue_queries,
-        "Query Producer")
+    self._query_producer_thread = create_and_start_daemon_thread(
+        enqueue_queries, "Query Producer")
 
   def _start_consuming_queries(self, impala):
     def start_additional_runners_if_needed():
@@ -467,9 +510,11 @@
             query_sumbission_is_locked = False
             ready_to_unlock = None
 
-          if not query_sumbission_is_locked \
-              and self.leak_check_interval_mins \
-              and time() > self._next_leak_check_unix_time.value:
+          if (
+              not query_sumbission_is_locked and
+              self.leak_check_interval_mins and
+              time() > self._next_leak_check_unix_time.value
+          ):
             assert self._num_queries_running <= len(self._query_runners), \
                 "Each running query should belong to a runner"
             LOG.debug("Stopping query submission")
@@ -505,8 +550,8 @@
         if query_sumbission_is_locked:
           LOG.debug("Resuming query submission")
           self._submit_query_lock.release()
-    self._mem_polling_thread = create_and_start_daemon_thread(poll_mem_usage,
-        "Mem Usage Poller")
+    self._mem_polling_thread = create_and_start_daemon_thread(
+        poll_mem_usage, "Mem Usage Poller")
 
   def _get_mem_usage_values(self, reset=False):
     reported = None
@@ -534,7 +579,7 @@
 
   def _start_single_runner(self, impalad):
     """Consumer function to take a query of the queue and run it. This is intended to
-       run in a separate process so validating the result set can use a full CPU.
+    run in a separate process so validating the result set can use a full CPU.
     """
     LOG.debug("New query runner started")
     runner = QueryRunner()
@@ -564,7 +609,8 @@
         mem_limit = query.required_mem_mb_without_spilling
         solo_runtime = query.solo_runtime_secs_without_spilling
       else:
-        mem_limit = randrange(query.required_mem_mb_with_spilling,
+        mem_limit = randrange(
+            query.required_mem_mb_with_spilling,
             query.required_mem_mb_without_spilling + 1)
         solo_runtime = query.solo_runtime_secs_with_spilling
 
@@ -583,8 +629,8 @@
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
-          timeout = solo_runtime * max(10, self._num_queries_started.value
-              - self._num_queries_finished.value)
+          timeout = solo_runtime * max(
+              10, self._num_queries_started.value - self._num_queries_finished.value)
         report = runner.run_query(query, timeout, mem_limit)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
@@ -609,26 +655,34 @@
           # The server may fail to respond to clients if the load is high. An error
           # message with "connect()...Connection timed out" comes from the impalad so
           # that will not be ignored.
-          if ("Connection timed out" in error_msg and "connect()" not in error_msg) \
-              or "ECONNRESET" in error_msg \
-              or "couldn't get a client" in error_msg \
-              or "timeout: timed out" in error_msg:
+          if (
+              ("Connection timed out" in error_msg and "connect()" not in error_msg) or
+              "ECONNRESET" in error_msg or
+              "couldn't get a client" in error_msg or
+              "timeout: timed out" in error_msg
+          ):
             self._num_successive_errors.value = 0
             continue
           increment(self._num_successive_errors)
           increment(self._num_other_errors)
           raise Exception("Query failed: %s" % str(report.non_mem_limit_error))
-        if report.mem_limit_exceeded \
-            and not self._mem_broker.was_overcommitted(reservation_id):
+        if (
+            report.mem_limit_exceeded and
+            not self._mem_broker.was_overcommitted(reservation_id)
+        ):
           increment(self._num_successive_errors)
-          raise Exception("Unexpected mem limit exceeded; mem was not overcommitted\n"
+          raise Exception(
+              "Unexpected mem limit exceeded; mem was not overcommitted\n"
               "Profile: %s" % report.profile)
-        if not report.mem_limit_exceeded \
-            and not report.timed_out \
-            and report.result_hash != query.result_hash:
+        if (
+            not report.mem_limit_exceeded and
+            not report.timed_out and
+            (self._verify_results and report.result_hash != query.result_hash)
+        ):
           increment(self._num_successive_errors)
           increment(self._num_result_mismatches)
-          raise Exception("Result hash mismatch; expected %s, got %s\nQuery: %s"
+          raise Exception(
+              "Result hash mismatch; expected %s, got %s\nQuery: %s"
               % (query.result_hash, report.result_hash, query.sql))
         self._num_successive_errors.value = 0
 
@@ -665,18 +719,34 @@
   pass
 
 
+class QueryType(object):
+  COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
+
+
 class Query(object):
   """Contains a SQL statement along with expected runtime information."""
 
   def __init__(self):
     self.name = None
     self.sql = None
+    # In order to be able to make good estimates for DML queries in the binary search,
+    # we need to bring the table to a good initial state before excuting the sql. Running
+    # set_up_sql accomplishes this task.
+    self.set_up_sql = None
     self.db_name = None
     self.result_hash = None
     self.required_mem_mb_with_spilling = None
     self.required_mem_mb_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
+    # Query options to set before running the query.
+    self.options = {}
+    # Determines the order in which we will populate query runtime info. Queries with the
+    # lowest population_order property will be handled first.
+    self.population_order = 0
+    # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
+    # UPSERT, DELETE.
+    self.query_type = QueryType.SELECT
 
   def __repr__(self):
     return dedent("""
@@ -686,13 +756,17 @@
         Solo Runtime: %(solo_runtime_secs_with_spilling)s
         Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
         DB: %(db_name)s
-        SQL: %(sql)s>""".strip() % self.__dict__)
+        Options: %(options)s
+        Set up SQL: %(set_up_sql)s>
+        SQL: %(sql)s>
+        Population order: %(population_order)r>
+        """.strip() % self.__dict__)
 
 
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
 
-  SPILLED_PATTERNS= [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
+  SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
   BATCH_SIZE = 1024
 
   def __init__(self):
@@ -711,8 +785,11 @@
       self.impalad_conn.close()
       self.impalad_conn = None
 
-  def run_query(self, query, timeout_secs, mem_limit_mb):
-    """Run a query and return an execution report."""
+  def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False):
+    """Run a query and return an execution report. If 'run_set_up' is True, set up sql
+    will be executed before the main query. This should be the case during the binary
+    search phase of the stress test.
+    """
     if not self.impalad_conn:
       raise Exception("connect() must first be called")
 
@@ -721,23 +798,32 @@
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
+        if query.db_name:
+          LOG.debug("Using %s database", query.db_name)
+          cursor.execute("USE %s" % query.db_name)
+        if run_set_up and query.set_up_sql:
+          LOG.debug("Running set up query:\n%s", self.set_up_sql)
+          cursor.execute(query.set_up_sql)
         for query_option, value in self.common_query_options.iteritems():
           cursor.execute(
               "SET {query_option}={value}".format(query_option=query_option, value=value))
+        for query_option, value in query.options.iteritems():
+          cursor.execute(
+              "SET {query_option}={value}".format(query_option=query_option, value=value))
         cursor.execute("SET ABORT_ON_ERROR=1")
         LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
         cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
-        if query.db_name:
-          LOG.debug("Using %s database", query.db_name)
-          cursor.execute("USE %s" % query.db_name)
-        LOG.debug("Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+        LOG.debug(
+            "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
             mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
         error = None
         try:
-          cursor.execute_async("/* Mem: %s MB. Coordinator: %s. */\n"
+          cursor.execute_async(
+              "/* Mem: %s MB. Coordinator: %s. */\n"
               % (mem_limit_mb, self.impalad.host_name) + query.sql)
-          LOG.debug("Query id is %s",
-              op_handle_to_query_id(cursor._last_operation_handle))
+          LOG.debug(
+              "Query id is %s", op_handle_to_query_id(cursor._last_operation.handle if
+                                                      cursor._last_operation else None))
           sleep_secs = 0.1
           secs_since_log = 0
           while cursor.is_executing():
@@ -749,24 +835,32 @@
               LOG.debug("Waiting for query to execute")
             sleep(sleep_secs)
             secs_since_log += sleep_secs
-          try:
-            report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
-          except QueryTimeout:
-            self._cancel(cursor, report)
-            return report
+          if query.query_type == QueryType.SELECT:
+            try:
+              report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
+            except QueryTimeout:
+              self._cancel(cursor, report)
+              return report
+          else:
+            # If query is in error state, this will raise an exception
+            cursor._wait_to_finish()
         except Exception as error:
-          LOG.debug("Error running query with id %s: %s",
-              op_handle_to_query_id(cursor._last_operation_handle), error)
+          LOG.debug(
+              "Error running query with id %s: %s",
+              op_handle_to_query_id(cursor._last_operation.handle if
+                                    cursor._last_operation else None), error)
           self._check_for_mem_limit_exceeded(report, cursor, error)
         if report.non_mem_limit_error or report.mem_limit_exceeded:
           return report
         report.runtime_secs = time() - start_time
-        if self.check_if_mem_was_spilled:
+        if cursor.execution_failed() or self.check_if_mem_was_spilled:
           # Producing a query profile can be somewhat expensive. A v-tune profile of
           # impalad showed 10% of cpu time spent generating query profiles.
           report.profile = cursor.get_profile()
-          report.mem_was_spilled = any([pattern.search(report.profile) is not None
-              for pattern in  QueryRunner.SPILLED_PATTERNS])
+          report.mem_was_spilled = any([
+              pattern.search(report.profile) is not None
+              for pattern in QueryRunner.SPILLED_PATTERNS])
+          report.mem_limit_exceeded = "Memory limit exceeded" in report.profile
     except Exception as error:
       # A mem limit error would have been caught above, no need to check for that here.
       report.non_mem_limit_error = error
@@ -776,7 +870,7 @@
     report.timed_out = True
 
     # Copy the operation handle in case another thread causes the handle to be reset.
-    operation_handle = cursor._last_operation_handle
+    operation_handle = cursor._last_operation.handle if cursor._last_operation else None
     if not operation_handle:
       return
 
@@ -795,14 +889,15 @@
 
   def _check_for_mem_limit_exceeded(self, report, cursor, caught_exception):
     """To be called after a query failure to check for signs of failed due to a
-       mem limit. The report will be updated accordingly.
+    mem limit. The report will be updated accordingly.
     """
-    if cursor._last_operation_handle:
+    if cursor._last_operation:
       try:
         report.profile = cursor.get_profile()
       except Exception as e:
-        LOG.debug("Error getting profile for query with id %s: %s",
-            op_handle_to_query_id(cursor._last_operation_handle), e)
+        LOG.debug(
+            "Error getting profile for query with id %s: %s",
+            op_handle_to_query_id(cursor._last_operation.handle), e)
     caught_msg = str(caught_exception).lower().strip()
 
     # Exceeding a mem limit may result in the message "cancelled".
@@ -817,25 +912,32 @@
     # exceeding the mem_limit could be something like:
     #   Metadata states that in group hdfs://<node>:8020<path> there are <X> rows,
     #   but only <Y> rows were read.
-    if "metadata states that in group" in caught_msg \
-        and "rows were read" in caught_msg:
+    if (
+        "metadata states that in group" in caught_msg and
+        "rows were read" in caught_msg
+    ):
       report.mem_limit_exceeded = True
       return
 
-    LOG.debug("Non-mem limit error for query with id %s: %s",
-        op_handle_to_query_id(cursor._last_operation_handle), caught_exception,
+    LOG.debug(
+        "Non-mem limit error for query with id %s: %s",
+        op_handle_to_query_id(
+            cursor._last_operation.handle if cursor._last_operation else None),
+        caught_exception,
         exc_info=True)
     report.non_mem_limit_error = caught_exception
 
   def _hash_result(self, cursor, timeout_unix_time, query):
     """Returns a hash that is independent of row order. 'query' is only used for debug
-       logging purposes (if the result is not as expected a log file will be left for
-       investigations).
+    logging purposes (if the result is not as expected a log file will be left for
+    investigations).
     """
-    query_id = op_handle_to_query_id(cursor._last_operation_handle)
+    query_id = op_handle_to_query_id(cursor._last_operation.handle if
+                                     cursor._last_operation else None)
 
     # A value of 1 indicates that the hash thread should continue to work.
     should_continue = Value("i", 1)
+
     def hash_result_impl():
       result_log = None
       try:
@@ -848,12 +950,16 @@
         result_log.write("\n")
         current_thread().result = 1
         while should_continue.value:
-          LOG.debug("Fetching result for query with id %s",
-              op_handle_to_query_id(cursor._last_operation_handle))
+          LOG.debug(
+              "Fetching result for query with id %s",
+              op_handle_to_query_id(
+                  cursor._last_operation.handle if cursor._last_operation else None))
           rows = cursor.fetchmany(self.BATCH_SIZE)
           if not rows:
-            LOG.debug("No more results for query with id %s",
-                op_handle_to_query_id(cursor._last_operation_handle))
+            LOG.debug(
+                "No more results for query with id %s",
+                op_handle_to_query_id(
+                    cursor._last_operation.handle if cursor._last_operation else None))
             return
           for row in rows:
             for idx, val in enumerate(row):
@@ -882,12 +988,14 @@
       finally:
         if result_log is not None:
           result_log.close()
-          if current_thread().error is not None \
-              and current_thread().result == query.result_hash:
+          if (
+              current_thread().error is not None and
+              current_thread().result == query.result_hash
+          ):
             os.remove(result_log.name)
 
-    hash_thread = create_and_start_daemon_thread(hash_result_impl,
-        "Fetch Results %s" % query_id)
+    hash_thread = create_and_start_daemon_thread(
+        hash_result_impl, "Fetch Results %s" % query_id)
     hash_thread.join(max(timeout_unix_time - time(), 0))
     if hash_thread.is_alive():
       should_continue.value = 0
@@ -900,11 +1008,12 @@
 def load_tpc_queries(workload, load_in_kudu=False):
   """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'.
   If 'load_in_kudu' is True, it loads only queries specified for the Kudu storage
-  engine."""
+  engine.
+  """
   LOG.info("Loading %s queries", workload)
   queries = list()
-  query_dir = os.path.join(os.path.dirname(__file__), "..", "..",
-      "testdata", "workloads", workload, "queries")
+  query_dir = os.path.join(
+      os.path.dirname(__file__), "..", "..", "testdata", "workloads", workload, "queries")
   engine = 'kudu-' if load_in_kudu else ''
   file_name_pattern = re.compile(r"%s-%s(q\d+).test$" % (workload, engine))
   for query_file in os.listdir(query_dir):
@@ -914,7 +1023,8 @@
     file_path = os.path.join(query_dir, query_file)
     file_queries = load_queries_from_test_file(file_path)
     if len(file_queries) != 1:
-      raise Exception("Expected exactly 1 query to be in file %s but got %s"
+      raise Exception(
+          "Expected exactly 1 query to be in file %s but got %s"
           % (file_path, len(file_queries)))
     query = file_queries[0]
     query.name = match.group(1)
@@ -934,13 +1044,15 @@
   return queries
 
 
-def load_random_queries_and_populate_runtime_info(query_generator, model_translator,
-    tables, db_name, impala, use_kerberos, query_count, query_timeout_secs,
-    result_hash_log_dir):
+def load_random_queries_and_populate_runtime_info(
+    query_generator, model_translator, tables, db_name, impala, use_kerberos, query_count,
+    query_timeout_secs, result_hash_log_dir
+):
   """Returns a list of random queries. Each query will also have its runtime info
-     populated. The runtime info population also serves to validate the query.
+  populated. The runtime info population also serves to validate the query.
   """
   LOG.info("Generating random queries")
+
   def generate_candidates():
     while True:
       query_model = query_generator.create_query(tables)
@@ -949,20 +1061,26 @@
       query.sql = sql
       query.db_name = db_name
       yield query
-  return populate_runtime_info_for_random_queries(impala, use_kerberos,
-      generate_candidates(), query_count, query_timeout_secs, result_hash_log_dir)
+  return populate_runtime_info_for_random_queries(
+      impala, use_kerberos, generate_candidates(), query_count, query_timeout_secs,
+      result_hash_log_dir)
 
 
-def populate_runtime_info_for_random_queries(impala, use_kerberos, candidate_queries,
-    query_count, query_timeout_secs, result_hash_log_dir):
+def populate_runtime_info_for_random_queries(
+    impala, use_kerberos, candidate_queries,
+    query_count, query_timeout_secs, result_hash_log_dir
+):
   """Returns a list of random queries. Each query will also have its runtime info
-     populated. The runtime info population also serves to validate the query.
+  populated. The runtime info population also serves to validate the query.
   """
   start_time = datetime.now()
   queries = list()
+  # TODO(IMPALA-4632): Consider running reset_databases() here if we want to extend DML
+  #                    functionality to random stress queries as well.
   for query in candidate_queries:
     try:
-      populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
+      populate_runtime_info(
+          query, impala, use_kerberos, result_hash_log_dir,
           timeout_secs=query_timeout_secs)
       queries.append(query)
     except Exception as e:
@@ -970,27 +1088,30 @@
       # query generator bugs).
       if print_crash_info_if_exists(impala, start_time):
         raise e
-      LOG.warn("Error running query (the test will continue)\n%s\n%s", e, query.sql,
-          exc_info=True)
+      LOG.warn(
+          "Error running query (the test will continue)\n%s\n%s",
+          e, query.sql, exc_info=True)
     if len(queries) == query_count:
       break
   return queries
 
 
-def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0):
+def populate_runtime_info(
+    query, impala, use_kerberos, result_hash_log_dir,
+    timeout_secs=maxint, samples=1, max_conflicting_samples=0
+):
   """Runs the given query by itself repeatedly until the minimum memory is determined
-     with and without spilling. Potentially all fields in the Query class (except
-     'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
-     the corresponding runtime field may still be None if the query could not be run
-     without spilling.
+  with and without spilling. Potentially all fields in the Query class (except
+  'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
+  the corresponding runtime field may still be None if the query could not be run
+  without spilling.
 
-     'samples' and 'max_conflicting_samples' control the reliability of the collected
-     information. The problem is that memory spilling or usage may differ (by a large
-     amount) from run to run due to races during execution. The parameters provide a way
-     to express "X out of Y runs must have resulted in the same outcome". Increasing the
-     number of samples and decreasing the tolerance (max conflicts) increases confidence
-     but also increases the time to collect the data.
+  'samples' and 'max_conflicting_samples' control the reliability of the collected
+  information. The problem is that memory spilling or usage may differ (by a large
+  amount) from run to run due to races during execution. The parameters provide a way
+  to express "X out of Y runs must have resulted in the same outcome". Increasing the
+  number of samples and decreasing the tolerance (max conflicts) increases confidence
+  but also increases the time to collect the data.
   """
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
   runner = QueryRunner()
@@ -1015,12 +1136,16 @@
   def update_runtime_info():
     required_mem = min(mem_limit, impala.min_impalad_mem_mb)
     if report.mem_was_spilled:
-      if query.required_mem_mb_with_spilling is None \
-          or required_mem < query.required_mem_mb_with_spilling:
+      if (
+          query.required_mem_mb_with_spilling is None or
+          required_mem < query.required_mem_mb_with_spilling
+      ):
         query.required_mem_mb_with_spilling = required_mem
         query.solo_runtime_secs_with_spilling = report.runtime_secs
-    elif query.required_mem_mb_without_spilling is None \
-        or required_mem < query.required_mem_mb_without_spilling:
+    elif (
+        query.required_mem_mb_without_spilling is None or
+        required_mem < query.required_mem_mb_without_spilling
+    ):
       query.required_mem_mb_without_spilling = required_mem
       query.solo_runtime_secs_without_spilling = report.runtime_secs
 
@@ -1028,7 +1153,7 @@
     reports_by_outcome = defaultdict(list)
     leading_outcome = None
     for remaining_samples in xrange(samples - 1, -1, -1):
-      report = runner.run_query(query, timeout_secs, mem_limit)
+      report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
       if report.timed_out:
         raise QueryTimeout()
       if report.non_mem_limit_error:
@@ -1038,8 +1163,9 @@
         if query.result_hash is None:
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
-          raise Exception("Result hash mismatch; expected %s, got %s"
-              % (query.result_hash, report.result_hash))
+          raise Exception(
+              "Result hash mismatch; expected %s, got %s" %
+              (query.result_hash, report.result_hash))
 
       if report.mem_limit_exceeded:
         outcome = "EXCEEDED"
@@ -1055,8 +1181,10 @@
         leading_outcome = outcome
       if len(reports_by_outcome[leading_outcome]) + max_conflicting_samples == samples:
         break
-      if len(reports_by_outcome[leading_outcome]) + remaining_samples \
-          < samples - max_conflicting_samples:
+      if (
+          len(reports_by_outcome[leading_outcome]) + remaining_samples <
+          samples - max_conflicting_samples
+      ):
         return
       if desired_outcome \
           and len(reports_by_outcome[desired_outcome]) + remaining_samples \
@@ -1076,8 +1204,8 @@
         if report and report.mem_limit_exceeded:
           limit_exceeded_mem = mem_limit
         if mem_limit == impala.min_impalad_mem_mb:
-          LOG.warn("Query could not be run even when using all available memory\n%s",
-              query.sql)
+          LOG.warn(
+              "Query couldn't be run even when using all available memory\n%s", query.sql)
           return
         mem_limit = min(2 * mem_limit, impala.min_impalad_mem_mb)
         continue
@@ -1097,8 +1225,8 @@
       old_required_mem_mb_without_spilling = None
     else:
       mem_limit = (lower_bound + upper_bound) / 2
-    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC \
-            or upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC or \
+        upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
     report = get_report(desired_outcome=("NOT_SPILLED" if spill_mem else None))
     if not report:
       lower_bound = mem_limit
@@ -1120,13 +1248,13 @@
         break
       lower_bound = upper_bound = impala.min_impalad_mem_mb
   # This value may be updated during the search for the absolute minimum.
-  LOG.info("Minimum memory to avoid spilling is %s MB"
-      % query.required_mem_mb_without_spilling)
+  LOG.info(
+      "Minimum memory to avoid spilling: %s MB" % query.required_mem_mb_without_spilling)
 
   LOG.info("Finding absolute minimum memory required")
   lower_bound = limit_exceeded_mem
-  upper_bound = min(spill_mem or maxint, non_spill_mem or maxint,
-      impala.min_impalad_mem_mb)
+  upper_bound = min(
+      spill_mem or maxint, non_spill_mem or maxint, impala.min_impalad_mem_mb)
   while True:
     if old_required_mem_mb_with_spilling:
       mem_limit = old_required_mem_mb_with_spilling
@@ -1147,13 +1275,16 @@
         query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
       break
   LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
-  if query.required_mem_mb_without_spilling is not None \
-      and query.required_mem_mb_without_spilling is not None \
-      and query.required_mem_mb_without_spilling < query.required_mem_mb_with_spilling:
+  if (
+      query.required_mem_mb_without_spilling is not None and
+      query.required_mem_mb_without_spilling is not None and
+      query.required_mem_mb_without_spilling < query.required_mem_mb_with_spilling
+  ):
     # Query execution is not deterministic and sometimes a query will run without spilling
     # at a lower mem limit than it did with spilling. In that case, just use the lower
     # value.
-    LOG.info("A lower memory limit to avoid spilling was found while searching for"
+    LOG.info(
+        "A lower memory limit to avoid spilling was found while searching for"
         " the absolute minimum memory.")
     query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
     query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
@@ -1162,12 +1293,15 @@
 
 def estimate_query_mem_mb_usage(query, query_runner):
   """Runs an explain plan then extracts and returns the estimated memory needed to run
-     the query.
+  the query.
   """
   with query_runner.impalad_conn.cursor() as cursor:
     LOG.debug("Using %s database", query.db_name)
     if query.db_name:
       cursor.execute('USE ' + query.db_name)
+    if query.query_type == QueryType.COMPUTE_STATS:
+      # Running "explain" on compute stats is not supported by Impala.
+      return
     LOG.debug("Explaining query\n%s", query.sql)
     cursor.execute('EXPLAIN ' + query.sql)
     first_val = cursor.fetchone()[0]
@@ -1186,13 +1320,16 @@
       store = json.load(file)
     _check_store_version(store)
   if not store:
-    store = {"host_names": list(), "db_names": dict(),
-        "version": RUNTIME_INFO_FILE_VERSION}
+    store = {
+        "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
   with open(path, "w+") as file:
     store["host_names"] = sorted([i.host_name for i in impala.impalads])
     queries = store["db_names"].get(query.db_name, dict())
-    queries[query.sql] = query
+    query_by_options = queries.get(query.sql, dict())
+    query_by_options[str(sorted(query.options.items()))] = query
+    queries[query.sql] = query_by_options
     store["db_names"][query.db_name] = queries
+
     class JsonEncoder(json.JSONEncoder):
       def default(self, obj):
         data = dict(obj.__dict__)
@@ -1200,61 +1337,66 @@
         if "sql" in data:
           del data["sql"]
         return data
-    json.dump(store, file, cls=JsonEncoder, sort_keys=True, indent=2,
-        separators=(',', ': '))
+    json.dump(
+        store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
 
 
 def load_runtime_info(path, impala=None):
   """Reads the query runtime information at 'path' and returns a
-     dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
-     instance do not match the data in 'path'.
+  dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
+  instance do not match the data in 'path'.
   """
-  queries_by_db_and_sql = defaultdict(dict)
+  queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
   if not os.path.exists(path):
     return queries_by_db_and_sql
   with open(path) as file:
     store = json.load(file)
     _check_store_version(store)
-    if impala and \
-        store.get("host_names") != sorted([i.host_name for i in impala.impalads]):
+    if (
+        impala and
+        store.get("host_names") != sorted([i.host_name for i in impala.impalads])
+    ):
       return queries_by_db_and_sql
     for db_name, queries_by_sql in store["db_names"].iteritems():
-      for sql, json_query in queries_by_sql.iteritems():
-        query = Query()
-        query.__dict__.update(json_query)
-        query.sql = sql
-        queries_by_db_and_sql[db_name][sql] = query
+      for sql, queries_by_options in queries_by_sql.iteritems():
+        for options, json_query in queries_by_options.iteritems():
+          query = Query()
+          query.__dict__.update(json_query)
+          query.sql = sql
+          queries_by_db_and_sql[db_name][sql][options] = query
   return queries_by_db_and_sql
 
 
 def _check_store_version(store):
   """Clears 'store' if the version is too old or raises an error if the version is too
-     new.
+  new.
   """
   if store["version"] < RUNTIME_INFO_FILE_VERSION:
     LOG.warn("Runtime file info version is old and will be ignored")
     store.clear()
   elif store["version"] > RUNTIME_INFO_FILE_VERSION:
-    raise Exception("Unexpected runtime file info version %s expected %s"
+    raise Exception(
+        "Unexpected runtime file info version %s expected %s"
         % (store["version"], RUNTIME_INFO_FILE_VERSION))
 
 
 def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
   # TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
   #       when it was used.
-  print(",".join(["Database", "Query",
-    "Old Mem MB w/Spilling",
-    "New Mem MB w/Spilling",
-    "Diff %",
-    "Old Runtime w/Spilling",
-    "New Runtime w/Spilling",
-    "Diff %",
-    "Old Mem MB wout/Spilling",
-    "New Mem MB wout/Spilling",
-    "Diff %",
-    "Old Runtime wout/Spilling",
-    "New Runtime wout/Spilling",
-    "Diff %"]))
+  print(",".join([
+      "Database", "Query",
+      "Old Mem MB w/Spilling",
+      "New Mem MB w/Spilling",
+      "Diff %",
+      "Old Runtime w/Spilling",
+      "New Runtime w/Spilling",
+      "Diff %",
+      "Old Mem MB wout/Spilling",
+      "New Mem MB wout/Spilling",
+      "Diff %",
+      "Old Runtime wout/Spilling",
+      "New Runtime wout/Spilling",
+      "Diff %"]))
   for db_name, old_queries in old_runtime_info.iteritems():
     new_queries = new_runtime_info.get(db_name)
     if not new_queries:
@@ -1267,8 +1409,10 @@
       sys.stdout.write(",")
       sys.stdout.write(old_query["name"])
       sys.stdout.write(",")
-      for attr in ["required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
-          "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"]:
+      for attr in [
+          "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
+          "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
+      ]:
         old_value = old_query[attr]
         sys.stdout.write(str(old_value))
         sys.stdout.write(",")
@@ -1283,90 +1427,345 @@
       print()
 
 
+def generate_DML_queries(cursor, dml_mod_values):
+  """Generate insert, upsert, update, delete DML statements.
+
+  For each table in the database that cursor is connected to, create 4 DML queries
+  (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
+  controls which rows will be affected. The generated queries assume that for each table
+  in the database, there exists a table with a '_original' suffix that is never modified.
+
+  This function has some limitations:
+  1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
+  2. Requires that the type of the first column of the primary key is an integer type.
+  """
+  LOG.info("Generating DML queries")
+  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+            if not t.endswith("_original")]
+  result = []
+  for table in tables:
+    if not table.primary_keys:
+      # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
+      # table.
+      LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
+      continue
+    if len(table.primary_keys) > 1:
+      # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
+      LOG.debug("Skipping table '{0}' because it has more than "
+                "1 primary key column.".format(table.name))
+      continue
+    primary_key = table.primary_keys[0]
+    if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
+      # We want to be able to apply the modulo operation on the primary key. If the
+      # the first primary key column happens to not be an integer, we will skip
+      # generating queries for this table
+      LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
+                "primary key is not an integer.".format(table.name, primary_key.name))
+      continue
+    for mod_value in dml_mod_values:
+      # Insert
+      insert_query = Query()
+      # Populate runtime info for Insert and Upsert queries before Update and Delete
+      # queries because tables remain in original state after running the Insert and
+      # Upsert queries. During the binary search in runtime info population for the
+      # Insert query, we first delete some rows and then reinsert them, so the table
+      # remains in the original state. For the delete, the order is reversed, so the table
+      # is not in the original state after running the the delete (or update) query. This
+      # is why population_order is smaller for Insert and Upsert queries.
+      insert_query.population_order = 1
+      insert_query.query_type = QueryType.INSERT
+      insert_query.name = "insert_{0}".format(table.name)
+      insert_query.db_name = cursor.db_name
+      insert_query.sql = (
+          "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
+          "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+      # Upsert
+      upsert_query = Query()
+      upsert_query.population_order = 1
+      upsert_query.query_type = QueryType.UPSERT
+      upsert_query.name = "upsert_{0}".format(table.name)
+      upsert_query.db_name = cursor.db_name
+      upsert_query.sql = (
+          "UPSERT INTO TABLE {0} SELECT * "
+          "FROM {0}_original WHERE {1} % {2} = 0").format(
+              table.name, primary_key.name, mod_value)
+      # Update
+      update_query = Query()
+      update_query.population_order = 2
+      update_query.query_type = QueryType.UPDATE
+      update_query.name = "update_{0}".format(table.name)
+      update_query.db_name = cursor.db_name
+      update_list = ', '.join(
+          'a.{0} = b.{0}'.format(col.name)
+          for col in table.cols if not col.is_primary_key)
+      update_query.sql = (
+          "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
+          "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
+              table_name=table.name, pk=primary_key.name, mod_value=mod_value,
+              update_list=update_list)
+      # Delete
+      delete_query = Query()
+      delete_query.population_order = 2
+      delete_query.query_type = QueryType.DELETE
+      delete_query.name = "delete_{0}".format(table.name)
+      delete_query.db_name = cursor.db_name
+      delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
+          table.name, primary_key.name, mod_value)
+
+      if table.name + "_original" in set(table.name for table in tables):
+        insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
+            table.name, primary_key.name, mod_value)
+        upsert_query.set_up_sql = insert_query.set_up_sql
+        update_query.set_up_sql = (
+            "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
+            "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+        delete_query.set_up_sql = update_query.set_up_sql
+
+      result.append(insert_query)
+      LOG.debug("Added insert query: {0}".format(insert_query))
+      result.append(update_query)
+      LOG.debug("Added update query: {0}".format(update_query))
+      result.append(upsert_query)
+      LOG.debug("Added upsert query: {0}".format(upsert_query))
+      result.append(delete_query)
+      LOG.debug("Added delete query: {0}".format(delete_query))
+  assert len(result) > 0, "No DML queries were added."
+  return result
+
+
+def generate_compute_stats_queries(cursor):
+  """For each table in the database that cursor is connected to, generate several compute
+  stats queries. Each query will have a different value for the MT_DOP query option.
+  """
+  LOG.info("Generating Compute Stats queries")
+  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+            if not t.endswith("_original")]
+  result = []
+  mt_dop_values = [str(2**k) for k in range(5)]
+  for table in tables:
+    for mt_dop_value in mt_dop_values:
+      compute_query = Query()
+      compute_query.population_order = 1
+      compute_query.query_type = QueryType.COMPUTE_STATS
+      compute_query.sql = "COMPUTE STATS {0}".format(table.name)
+      compute_query.options["MT_DOP"] = mt_dop_value
+      compute_query.db_name = cursor.db_name
+      compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
+          table.name, compute_query.options["MT_DOP"])
+      result.append(compute_query)
+      LOG.debug("Added compute stats query: {0}".format(compute_query))
+  return result
+
+
+def prepare_database(cursor):
+  """For each table in the database that cursor is connected to, create an identical copy
+  with '_original' suffix. This function is idempotent.
+
+  Note: At this time we only support Kudu tables with a simple hash partitioning based on
+  the primary key. (SHOW CREATE TABLE would not work otherwise.)
+  """
+  tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+  for table_name in tables:
+    if not table_name.endswith("_original") and table_name + "_original" not in tables:
+      LOG.debug("Creating original table: {0}".format(table_name))
+      cursor.execute("SHOW CREATE TABLE " + table_name)
+      create_sql = cursor.fetchone()[0]
+      search_pattern = r"CREATE TABLE (\w*)\.(.*) \("
+      replacement = "CREATE TABLE {tbl} (".format(tbl=table_name + "_original")
+      create_original_sql = re.sub(
+          search_pattern, replacement, create_sql, count=1)
+      LOG.debug("Create original SQL:\n{0}".format(create_original_sql))
+      cursor.execute(create_original_sql)
+      cursor.execute("INSERT INTO {0}_original SELECT * FROM {0}".format(table_name))
+      cursor.execute("COMPUTE STATS {0}".format(table_name + "_original"))
+
+
+def reset_databases(cursor):
+  """Reset the database to the initial state. This is done by overwriting tables which
+  don't have the _original suffix with data from tables with the _original suffix.
+
+  Note: At this time we only support Kudu tables with a simple hash partitioning based on
+  the primary key. (SHOW CREATE TABLE would not work otherwise.)
+  """
+  LOG.info("Resetting {0} database".format(cursor.db_name))
+  tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+  for table_name in tables:
+    if not table_name.endswith("_original"):
+      if table_name + "_original" in tables:
+        cursor.execute("SHOW CREATE TABLE " + table_name)
+        create_table_command = cursor.fetchone()[0]
+        cursor.execute("DROP TABLE {0}".format(table_name))
+        cursor.execute(create_table_command)
+        cursor.execute("INSERT INTO {0} SELECT * FROM {0}_original".format(table_name))
+        cursor.execute("COMPUTE STATS {0}".format(table_name))
+      else:
+        LOG.debug("Table '{0}' cannot be reset because '{0}_original' does not"
+                  " exist in '{1}' database.".format(table_name, cursor.db_name))
+
+
+def populate_all_queries(queries, impala, args, runtime_info_path,
+                         queries_with_runtime_info_by_db_sql_and_options):
+  """Populate runtime info for all queries, ordered by the population_order property."""
+  result = []
+  queries_by_order = {}
+  for query in queries:
+    if query.population_order not in queries_by_order:
+      queries_by_order[query.population_order] = []
+    queries_by_order[query.population_order].append(query)
+  for population_order in sorted(queries_by_order.keys()):
+    for query in queries_by_order[population_order]:
+      if (
+          query.sql in
+          queries_with_runtime_info_by_db_sql_and_options[query.db_name] and
+          str(sorted(query.options.items())) in
+          queries_with_runtime_info_by_db_sql_and_options[query.db_name][query.sql]
+      ):
+        LOG.debug("Reusing previous runtime data for query: " + query.sql)
+        result.append(queries_with_runtime_info_by_db_sql_and_options[
+            query.db_name][query.sql][str(sorted(query.options.items()))])
+      else:
+        populate_runtime_info(
+            query, impala, args.use_kerberos, args.result_hash_log_dir,
+            samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+        save_runtime_info(runtime_info_path, query, impala)
+        result.append(query)
+  return result
+
+
 def main():
   from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
   from random import shuffle
   import tests.comparison.cli_options as cli_options
 
-  parser = ArgumentParser(epilog=dedent("""
-         Before running this script a CM cluster must be setup and any needed data
-         such as TPC-H/DS must be loaded. The first time this script is run it will
-         find memory limits and runtimes for each query and save the data to disk (since
-         collecting the data is slow) at --runtime-info-path then run the stress test.
-         Later runs will reuse the saved memory limits and timings. If the cluster changes
-         significantly the memory limits should be re-measured (deleting the file at
-         --runtime-info-path will cause re-measuring to happen).""").strip(),
-         formatter_class=ArgumentDefaultsHelpFormatter)
+  parser = ArgumentParser(
+      epilog=dedent("""
+      Before running this script a CM cluster must be setup and any needed data
+      such as TPC-H/DS must be loaded. The first time this script is run it will
+      find memory limits and runtimes for each query and save the data to disk (since
+      collecting the data is slow) at --runtime-info-path then run the stress test.
+      Later runs will reuse the saved memory limits and timings. If the cluster changes
+      significantly the memory limits should be re-measured (deleting the file at
+      --runtime-info-path will cause re-measuring to happen).""").strip(),
+      formatter_class=ArgumentDefaultsHelpFormatter)
   cli_options.add_logging_options(parser)
   cli_options.add_cluster_options(parser)
   cli_options.add_kerberos_options(parser)
-  parser.add_argument("--runtime-info-path",
+  parser.add_argument(
+      "--runtime-info-path",
       default=os.path.join(gettempdir(), "{cm_host}_query_runtime_info.json"),
       help="The path to store query runtime info at. '{cm_host}' will be replaced with"
       " the actual host name from --cm-host.")
-  parser.add_argument("--samples", default=1, type=int,
+  parser.add_argument(
+      "--samples", default=1, type=int,
       help='Used when collecting "runtime info" - the number of samples to collect when'
       ' testing a particular mem limit value.')
-  parser.add_argument("--max-conflicting-samples", default=0, type=int,
+  parser.add_argument(
+      "--max-conflicting-samples", default=0, type=int,
       help='Used when collecting "runtime info" - the number of samples outcomes that'
       ' can disagree when deciding to accept a particular mem limit. Ex, when trying to'
       ' determine the mem limit that avoids spilling with samples=5 and'
       ' max-conflicting-samples=1, then 4/5 queries must not spill at a particular mem'
       ' limit.')
-  parser.add_argument("--result-hash-log-dir", default=gettempdir(),
+  parser.add_argument(
+      "--result-hash-log-dir", default=gettempdir(),
       help="If query results do not match, a log file will be left in this dir. The log"
       " file is also created during the first run when runtime info is collected for"
       " each query.")
-  parser.add_argument("--no-status", action="store_true",
-      help="Do not print the status table.")
-  parser.add_argument("--cancel-current-queries", action="store_true",
+  parser.add_argument(
+      "--no-status", action="store_true", help="Do not print the status table.")
+  parser.add_argument(
+      "--cancel-current-queries", action="store_true",
       help="Cancel any queries running on the cluster before beginning.")
-  parser.add_argument("--filter-query-mem-ratio", type=float, default=0.333,
+  parser.add_argument(
+      "--filter-query-mem-ratio", type=float, default=0.333,
       help="Queries that require this ratio of total available memory will be filtered.")
-  parser.add_argument("--startup-queries-per-second", type=float, default=2.0,
+  parser.add_argument(
+      "--startup-queries-per-second", type=float, default=2.0,
       help="Adjust this depending on the cluster size and workload. This determines"
       " the minimum amount of time between successive query submissions when"
       " the workload is initially ramping up.")
-  parser.add_argument("--fail-upon-successive-errors", type=int, default=1,
+  parser.add_argument(
+      "--fail-upon-successive-errors", type=int, default=1,
       help="Continue running until N query errors are encountered in a row. Set"
       " this to a high number to only stop when something catastrophic happens. A"
       " value of 1 stops upon the first error.")
-  parser.add_argument("--mem-limit-padding-pct", type=int, default=25,
+  parser.add_argument(
+      "--mem-limit-padding-pct", type=int, default=25,
       help="Pad query mem limits found by solo execution with this percentage when"
       " running concurrently. After padding queries will not be expected to fail"
       " due to mem limit exceeded.")
-  parser.add_argument("--timeout-multiplier", type=float, default=1.0,
+  parser.add_argument(
+      "--mem-limit-padding-abs", type=int, default=0,
+      help="Pad query mem limits found by solo execution with this value (in megabytes)"
+      " running concurrently. After padding queries will not be expected to fail"
+      " due to mem limit exceeded. This is useful if we want to be able to add the same"
+      " amount of memory to smaller queries as to the big ones.")
+  parser.add_argument(
+      "--timeout-multiplier", type=float, default=1.0,
       help="Query timeouts will be multiplied by this value.")
   parser.add_argument("--max-queries", type=int, default=100)
+  parser.add_argument(
+      "--reset-databases-before-binary-search", action="store_true",
+      help="If True, databases will be reset to their original state before the binary"
+      " search.")
+  parser.add_argument(
+      "--reset-databases-after-binary-search", action="store_true",
+      help="If True, databases will be reset to their original state after the binary"
+      " search and before starting the stress test. The primary intent of this option is"
+      " to undo the changes made to the databases by the binary search. This option can"
+      " also be used to reset the databases before running other (non stress) tests on"
+      " the same data.")
+  parser.add_argument(
+      "--generate-dml-queries", action="store_true",
+      help="If True, DML queries will be generated for Kudu databases.")
+  parser.add_argument(
+      "--dml-mod-values", nargs="+", type=int, default=[11],
+      help="List of mod values to use for the DML queries. There will be 4 DML (delete,"
+      " insert, update, upsert) queries generated per mod value per table. The smaller"
+      " the value, the more rows the DML query would touch (the query should touch about"
+      " 1/mod_value rows.)")
+  parser.add_argument(
+      "--generate-compute-stats-queries", action="store_true",
+      help="If True, Compute Stats queries will be generated.")
+  parser.add_argument(
+      "--select-probability", type=float, default=0.5,
+      help="Probability of choosing a select query (as opposed to a DML query).")
   parser.add_argument("--tpcds-db", help="If provided, TPC-DS queries will be used.")
   parser.add_argument("--tpch-db", help="If provided, TPC-H queries will be used.")
-  parser.add_argument("--tpch-nested-db",
-      help="If provided, nested TPC-H queries will be used.")
-  parser.add_argument("--tpch-kudu-db",
-      help="If provided, TPC-H queries for Kudu will be used.")
-  parser.add_argument("--tpcds-kudu-db",
-      help="If provided, TPC-DS queries for Kudu will be used.")
-  parser.add_argument("--random-db",
-      help="If provided, random queries will be used.")
-  parser.add_argument("--random-query-count", type=int, default=50,
+  parser.add_argument(
+      "--tpch-nested-db", help="If provided, nested TPC-H queries will be used.")
+  parser.add_argument(
+      "--tpch-kudu-db", help="If provided, TPC-H queries for Kudu will be used.")
+  parser.add_argument(
+      "--tpcds-kudu-db", help="If provided, TPC-DS queries for Kudu will be used.")
+  parser.add_argument(
+      "--random-db", help="If provided, random queries will be used.")
+  parser.add_argument(
+      "--random-query-count", type=int, default=50,
       help="The number of random queries to generate.")
-  parser.add_argument("--random-query-timeout-seconds", type=int, default=(5 * 60),
+  parser.add_argument(
+      "--random-query-timeout-seconds", type=int, default=(5 * 60),
       help="A random query that runs longer than this time when running alone will"
       " be discarded.")
-  parser.add_argument("--query-file-path", help="Use queries in the given file. The file"
+  parser.add_argument(
+      "--query-file-path", help="Use queries in the given file. The file"
       " format must be the same as standard test case format. Queries are expected to "
       " be randomly generated and will be validated before running in stress mode.")
-  parser.add_argument("--query-file-db", help="The name of the database to use with the "
-      "queries from --query-file-path.")
+  parser.add_argument(
+      "--query-file-db",
+      help="The name of the database to use with the queries from --query-file-path.")
   parser.add_argument("--mem-overcommit-pct", type=float, default=0)
-  parser.add_argument("--mem-spill-probability", type=float, default=0.33,
-      dest="spill_probability",
+  parser.add_argument(
+      "--mem-spill-probability", type=float, default=0.33, dest="spill_probability",
       help="The probability that a mem limit will be set low enough to induce spilling.")
-  parser.add_argument("--mem-leak-check-interval-mins", type=int, default=None,
+  parser.add_argument(
+      "--mem-leak-check-interval-mins", type=int, default=None,
       help="Periodically stop query execution and check that memory levels have reset.")
-  parser.add_argument("--cancel-probability", type=float, default=0.1,
+  parser.add_argument(
+      "--cancel-probability", type=float, default=0.1,
       help="The probability a query will be cancelled.")
-  parser.add_argument("--nlj-filter", choices=("in", "out", None),
+  parser.add_argument(
+      "--nlj-filter", choices=("in", "out", None),
       help="'in' means only nested-loop queries will be used, 'out' means no NLJ queries"
       " will be used. The default is to not filter either way.")
   parser.add_argument(
@@ -1377,14 +1776,18 @@
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
 
-  cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file,
-      log_thread_id=True, log_process_id=True)
+  cli_options.configure_logging(
+      args.log_level, debug_log_file=args.debug_log_file, log_thread_id=True,
+      log_process_id=True)
   LOG.debug("CLI args: %s" % (args, ))
 
-  if not args.tpcds_db and not args.tpch_db and not args.random_db \
-      and not args.tpch_nested_db and not args.tpch_kudu_db \
-      and not args.tpcds_kudu_db and not args.query_file_path:
-    raise Exception("At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
+  if (
+      not args.tpcds_db and not args.tpch_db and not args.random_db and not
+      args.tpch_nested_db and not args.tpch_kudu_db and not
+      args.tpcds_kudu_db and not args.query_file_path
+  ):
+    raise Exception(
+        "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
         "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
 
   # The stress test sets these, so callers cannot override them.
@@ -1437,7 +1840,8 @@
   runtime_info_path = args.runtime_info_path
   if "{cm_host}" in runtime_info_path:
     runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
-  queries_with_runtime_info_by_db_and_sql = load_runtime_info(runtime_info_path, impala)
+  queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
+      runtime_info_path, impala)
 
   # Start loading the test queries.
   queries = list()
@@ -1449,36 +1853,57 @@
     for query in tpcds_queries:
       query.db_name = args.tpcds_db
     queries.extend(tpcds_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpcds_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_db:
     tpch_queries = load_tpc_queries("tpch")
     for query in tpch_queries:
       query.db_name = args.tpch_db
     queries.extend(tpch_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpch_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_nested_db:
     tpch_nested_queries = load_tpc_queries("tpch_nested")
     for query in tpch_nested_queries:
       query.db_name = args.tpch_nested_db
     queries.extend(tpch_nested_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpch_nested_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_kudu_db:
     tpch_kudu_queries = load_tpc_queries("tpch", load_in_kudu=True)
     for query in tpch_kudu_queries:
       query.db_name = args.tpch_kudu_db
     queries.extend(tpch_kudu_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpch_kudu_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
+    if args.generate_dml_queries:
+      with impala.cursor(db_name=args.tpch_kudu_db) as cursor:
+        prepare_database(cursor)
+        queries.extend(generate_DML_queries(cursor, args.dml_mod_values))
   if args.tpcds_kudu_db:
     tpcds_kudu_queries = load_tpc_queries("tpcds", load_in_kudu=True)
     for query in tpcds_kudu_queries:
       query.db_name = args.tpcds_kudu_db
     queries.extend(tpcds_kudu_queries)
-  for idx in xrange(len(queries) - 1, -1, -1):
-    query = queries[idx]
-    if query.sql in queries_with_runtime_info_by_db_and_sql[query.db_name]:
-      query = queries_with_runtime_info_by_db_and_sql[query.db_name][query.sql]
-      LOG.debug("Reusing previous runtime data for query: " + query.sql)
-      queries[idx] = query
-    else:
-      populate_runtime_info(query, impala, args.use_kerberos, args.result_hash_log_dir,
-          samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
-      save_runtime_info(runtime_info_path, query, impala)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpcds_kudu_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
+    if args.generate_dml_queries:
+      with impala.cursor(db_name=args.tpcds_kudu_db) as cursor:
+        prepare_database(cursor)
+        queries.extend(generate_DML_queries(cursor, args.dml_mod_values))
+
+  if args.reset_databases_before_binary_search:
+    for database in set(query.db_name for query in queries):
+      with impala.cursor(db_name=database) as cursor:
+        reset_databases(cursor)
+
+  queries = populate_all_queries(queries, impala, args, runtime_info_path,
+                                 queries_with_runtime_info_by_db_sql_and_options)
 
   # A particular random query may either fail (due to a generator or Impala bug) or
   # take a really long time to complete. So the queries needs to be validated. Since the
@@ -1487,28 +1912,30 @@
     query_generator = QueryGenerator(DefaultProfile())
     with impala.cursor(db_name=args.random_db) as cursor:
       tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
-    queries.extend(load_random_queries_and_populate_runtime_info(query_generator,
-        SqlWriter.create(), tables, args.random_db, impala, args.use_kerberos,
-        args.random_query_count, args.random_query_timeout_seconds,
+    queries.extend(load_random_queries_and_populate_runtime_info(
+        query_generator, SqlWriter.create(), tables, args.random_db, impala,
+        args.use_kerberos, args.random_query_count, args.random_query_timeout_seconds,
         args.result_hash_log_dir))
 
   if args.query_file_path:
-    file_queries = load_queries_from_test_file(args.query_file_path,
-        db_name=args.query_file_db)
+    file_queries = load_queries_from_test_file(
+        args.query_file_path, db_name=args.query_file_db)
     shuffle(file_queries)
-    queries.extend(populate_runtime_info_for_random_queries(impala, args.use_kerberos,
-        file_queries, args.random_query_count, args.random_query_timeout_seconds,
-        args.result_hash_log_dir))
+    queries.extend(populate_runtime_info_for_random_queries(
+        impala, args.use_kerberos, file_queries, args.random_query_count,
+        args.random_query_timeout_seconds, args.result_hash_log_dir))
 
   # Apply tweaks to the query's runtime info as requested by CLI options.
   for idx in xrange(len(queries) - 1, -1, -1):
     query = queries[idx]
     if query.required_mem_mb_with_spilling:
-      query.required_mem_mb_with_spilling += int(query.required_mem_mb_with_spilling
-          * args.mem_limit_padding_pct / 100.0)
+      query.required_mem_mb_with_spilling += int(
+          query.required_mem_mb_with_spilling * args.mem_limit_padding_pct / 100.0) + \
+          args.mem_limit_padding_abs
     if query.required_mem_mb_without_spilling:
-      query.required_mem_mb_without_spilling += int(query.required_mem_mb_without_spilling
-          * args.mem_limit_padding_pct / 100.0)
+      query.required_mem_mb_without_spilling += int(
+          query.required_mem_mb_without_spilling * args.mem_limit_padding_pct / 100.0) + \
+          args.mem_limit_padding_abs
     if query.solo_runtime_secs_with_spilling:
       query.solo_runtime_secs_with_spilling *= args.timeout_multiplier
     if query.solo_runtime_secs_without_spilling:
@@ -1522,6 +1949,7 @@
       LOG.debug("Filtered query due to mem ratio option: " + query.sql)
       del queries[idx]
 
+  # Remove queries that have a nested loop join in the plan.
   if args.nlj_filter:
     with impala.cursor(db_name=args.random_db) as cursor:
       for idx in xrange(len(queries) - 1, -1, -1):
@@ -1548,6 +1976,15 @@
     raise Exception("All queries were filtered")
   print("Using %s queries" % len(queries))
 
+  # After the binary search phase finishes, it may be a good idea to reset the database
+  # again to start the stress test from a clean state.
+  if args.reset_databases_after_binary_search:
+    for database in set(query.db_name for query in queries):
+      with impala.cursor(db_name=database) as cursor:
+        reset_databases(cursor)
+
+  LOG.info("Number of queries in the list: {0}".format(len(queries)))
+
   stress_runner = StressRunner()
   stress_runner.result_hash_log_dir = args.result_hash_log_dir
   stress_runner.startup_queries_per_sec = args.startup_queries_per_second
@@ -1557,8 +1994,12 @@
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
   stress_runner.common_query_options = common_query_options
-  stress_runner.run_queries(queries, impala, args.max_queries, args.mem_overcommit_pct,
-      not args.no_status)   # This is the value of 'should_print_status'.
+  stress_runner.run_queries(
+      queries, impala, args.max_queries, args.mem_overcommit_pct,
+      should_print_status=not args.no_status,
+      verify_results=not args.generate_dml_queries,
+      select_probability=args.select_probability)
+
 
 if __name__ == "__main__":
   main()
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 207f4b5..ad40b68 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -52,7 +52,7 @@
   mem = float(mem)
   if mem <= 0:
     return
-  units = units.strip().upper()
+  units = units.strip().upper() if units else ""
   if units.endswith("B"):
     units = units[:-1]
   if not units: