IMPALA-10183: Fix hitting DCHECK when cancelling a query with result spooling

BufferedPlanRootSink has a Promise, all_results_spooled_, that could be
accessed by different threads, e.g. the fragment execution thread and
cancellation threads. The main purpose of setting this Promise is to
unblock the coordinator if it's waiting for this. So we can simply
declare this Promise's mode to be MULTIPLE_PRODUCER to avoid hitting the
DCHECK in Promise.Set().

Tests:
 - Run TestResultSpoolingFailpoints::test_failpoints for more than 4000
   iterations

Change-Id: Iaba0ed729ef984f9c51347df02e9fb6f90bc71e0
Reviewed-on: http://gerrit.cloudera.org:8080/16489
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 19bcee2..daaa97e 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -78,9 +78,8 @@
     while (!state->is_cancelled() && batch_queue_->IsFull()) {
       SCOPED_TIMER(profile()->inactive_timer());
       SCOPED_TIMER(row_batches_send_wait_timer_);
-      if (!all_results_spooled_.IsSet()) {
-        discard_result(all_results_spooled_.Set(true));
-      }
+      // Set this to true means the batch queue is full.
+      discard_result(all_results_spooled_.Set(true));
       batch_queue_has_capacity_.Wait(l);
     }
     RETURN_IF_CANCELLED(state);
@@ -105,9 +104,7 @@
   DCHECK(!closed_);
   unique_lock<mutex> l(lock_);
   sender_state_ = SenderState::EOS;
-  if (!all_results_spooled_.IsSet()) {
-    discard_result(all_results_spooled_.Set(false));
-  }
+  discard_result(all_results_spooled_.Set(false));
   // If no batches are ever added, wake up the consumer thread so it can check the
   // SenderState and return appropriately.
   rows_available_.NotifyAll();
@@ -129,9 +126,7 @@
   if (sender_state_ == SenderState::ROWS_PENDING) {
     sender_state_ = SenderState::CLOSED_NOT_EOS;
   }
-  if (!all_results_spooled_.IsSet()) {
-    discard_result(all_results_spooled_.Set(false));
-  }
+  discard_result(all_results_spooled_.Set(false));
   if (current_batch_row_ != 0) {
     current_batch_->Reset();
   }
@@ -161,9 +156,7 @@
   rows_available_.NotifyAll();
   consumer_eos_.NotifyAll();
   batch_queue_has_capacity_.NotifyAll();
-  if (!all_results_spooled_.IsSet()) {
-    discard_result(all_results_spooled_.Set(false));
-  }
+  discard_result(all_results_spooled_.Set(false));
 }
 
 Status BufferedPlanRootSink::GetNext(RuntimeState* state, QueryResultSet* results,
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index d62939a..ae684ca 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -146,9 +146,12 @@
   /// 'GetNext'. If 'current_batch_' is nullptr, the value of 'current_batch_row_' is 0.
   int current_batch_row_ = 0;
 
-  /// Set when all results are spooled or we fail to do this due to batch_queue_ full or
-  /// any errors.
-  Promise<bool> all_results_spooled_;
+  /// Set when all results are spooled or we fail to do this due to batch_queue_ full,
+  /// cancellation or any errors. Set by either the fragment instance execution thread or
+  /// the cancellation thread. The boolean result is just used to decide whether to log
+  /// a warning. The result is true when batch_queue_ is full so we can't spool all
+  /// results. Coordinator will log a warning on this if it's waiting on this promise.
+  Promise<bool, PromiseMode::MULTIPLE_PRODUCER> all_results_spooled_;
 
   /// Returns true if the 'queue' (not the 'batch_queue_') is empty. 'queue' refers to
   /// the logical queue of RowBatches and thus includes any RowBatch that