IMPALA-9636: Don't run retried query on the blacklisted nodes

When a node is blacklisted, it is only placed on the blacklist for a
certain period of time. For the current implementation, it is possible
that the retried query could end up running on the node that it
blacklisted during its original attempt. To avoid same failure for
the retried query, we should not schedule query fragment instances on
the blacklisted nodes which caused the original query to fail.

This patch filters out the executors from executor group for those
nodes which are blacklisted during its original attempt when make
schedule for the retried query.
Adds new test cases test_retry_exec_rpc_failure_before_admin_delay()
and test_retry_query_failure_all_executors_blacklisted() for retried
queries which are triggered by RPC failure and blacklist timeout
are triggered by adding delay before admission.

Testing:
 - Passed test_query_retries.py, including the new test cases.
 - Passed core tests.

Change-Id: I00bc1b5026efbd0670ffbe57bcebc457d34cb105
Reviewed-on: http://gerrit.cloudera.org:8080/16369
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 104a639..529ce2d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -509,6 +509,8 @@
         const UniqueIdPB& backend_id = backend_state->exec_params().backend_id();
         ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(backend_id,
             FromKuduStatus(backend_state->exec_rpc_status(), "Exec() rpc failed"));
+        parent_request_state_->AddBlacklistedExecutorAddress(
+            backend_state->exec_params().address());
       }
     }
     if (!failed_backend_states.empty()) {
@@ -1094,6 +1096,8 @@
 
         ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
             dest_node_exec_params.backend_id(), retryable_status);
+        parent_request_state_->AddBlacklistedExecutorAddress(
+            dest_node_exec_params.address());
 
         // Only blacklist one node per report.
         return retryable_status;
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 277d619..b4a4216 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -605,7 +605,7 @@
   /// fragment on that node failed due to a disk IO error.
   /// 'status' is the Status of the given BackendState. 'backend_state' is the
   /// BackendState that reported an error.
-  /// Returns the Status object used when blacklising a backend, or Status::OK if no
+  /// Returns the Status object used when blacklisting a backend, or Status::OK if no
   /// backends were blacklisted.
   Status UpdateBlacklistWithAuxErrorInfo(std::vector<AuxErrorInfoPB>* aux_error_info,
       const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 47725a6..d178f43 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -236,6 +236,11 @@
     return;
   }
 
+  // Transfer the blacklisted_executor_addresses from the original query to the query to
+  // be retried.
+  retry_request_state->SetBlacklistedExecutorAddresses(
+      client_request_state_->GetBlacklistedExecutorAddresses());
+
   // Run the new query.
   status = retry_request_state->Exec();
   if (!status.ok()) {
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 3575d6b..4f58495 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1546,6 +1546,22 @@
     DCHECK(executor_group->IsHealthy()
         || cluster_membership_mgr_->GetEmptyExecutorGroup() == executor_group)
         << executor_group->name();
+    // Create a temporary ExecutorGroup if we need to filter out executors with the
+    // the set of blacklisted executor addresses in the request.
+    // Note: Coordinator-only query should not be failed due to RPC error, nor make
+    // executor to be blacklisted.
+    const ExecutorGroup* orig_executor_group = executor_group;
+    std::unique_ptr<ExecutorGroup> temp_executor_group;
+    if (!request.blacklisted_executor_addresses.empty()
+        && cluster_membership_mgr_->GetEmptyExecutorGroup() != executor_group) {
+      temp_executor_group.reset(ExecutorGroup::GetFilteredExecutorGroup(
+          executor_group, request.blacklisted_executor_addresses));
+      // If all executors are blacklisted, the retried query cannot be executed so
+      // the Scheduler::Schedule() can be skipped.
+      if (temp_executor_group.get()->NumExecutors() == 0) continue;
+      executor_group = temp_executor_group.get();
+    }
+
     unique_ptr<ScheduleState> group_state =
         make_unique<ScheduleState>(request.query_id, request.request,
             request.query_options, request.summary_profile, request.query_events);
@@ -1556,9 +1572,15 @@
     RETURN_IF_ERROR(
         ExecEnv::GetInstance()->scheduler()->Schedule(group_config, group_state.get()));
     DCHECK(!group_state->executor_group().empty());
-    output_schedules->emplace_back(std::move(group_state), *executor_group);
+    output_schedules->emplace_back(std::move(group_state), *orig_executor_group);
   }
-  DCHECK(!output_schedules->empty());
+  if (output_schedules->empty()) {
+    // Retried query could not be scheduled since all executors are blacklisted.
+    // To keep consistent with the other blacklisting logic, set not_admitted_reason as
+    // REASON_NO_EXECUTOR_GROUPS.
+    queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS;
+    LOG(WARNING) << queue_node->not_admitted_reason;
+  }
   return Status::OK();
 }
 
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 4de33fd..a9c3805 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -321,6 +321,7 @@
     const TQueryOptions& query_options;
     RuntimeProfile* summary_profile;
     RuntimeProfile::EventSequence* query_events;
+    std::unordered_set<NetworkAddressPB>& blacklisted_executor_addresses;
   };
 
   /// Submits the request for admission. May returns immediately if rejected, but
diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc
index 2043b81..93dadc1 100644
--- a/be/src/scheduling/executor-group.cc
+++ b/be/src/scheduling/executor-group.cc
@@ -36,6 +36,18 @@
 ExecutorGroup::ExecutorGroup(const ExecutorGroupDescPB& desc)
   : ExecutorGroup(desc.name(), desc.min_size()) {}
 
+ExecutorGroup* ExecutorGroup::GetFilteredExecutorGroup(const ExecutorGroup* group,
+    const std::unordered_set<NetworkAddressPB>& blacklisted_executor_addresses) {
+  ExecutorGroup* filtered_group = new ExecutorGroup(*group);
+  for (const NetworkAddressPB& be_address : blacklisted_executor_addresses) {
+    const BackendDescriptorPB* be_desc = filtered_group->LookUpBackendDesc(be_address);
+    if (be_desc != nullptr) {
+      filtered_group->RemoveExecutor(BackendDescriptorPB(*be_desc));
+    }
+  }
+  return filtered_group;
+}
+
 const ExecutorGroup::Executors& ExecutorGroup::GetExecutorsForHost(
     const IpAddr& ip) const {
   ExecutorMap::const_iterator it = executor_map_.find(ip);
diff --git a/be/src/scheduling/executor-group.h b/be/src/scheduling/executor-group.h
index 9c47f14..a191f29 100644
--- a/be/src/scheduling/executor-group.h
+++ b/be/src/scheduling/executor-group.h
@@ -50,6 +50,11 @@
   explicit ExecutorGroup(const ExecutorGroupDescPB& desc);
   ExecutorGroup(const ExecutorGroup& other) = default;
 
+  /// Get an ExecutorGroup with the executors from the given ExecutorGroup, excluding
+  /// the executors with address in the given set of blacklisted executor addresses.
+  static ExecutorGroup* GetFilteredExecutorGroup(const ExecutorGroup* group,
+      const std::unordered_set<NetworkAddressPB>& blacklisted_executor_addresses);
+
   /// List of backends, in this case they're all executors.
   typedef std::vector<BackendDescriptorPB> Executors;
   typedef std::vector<IpAddr> IpAddrs;
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index c9b68d4..a55cb37 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -188,6 +188,20 @@
   frontend_profile_->Update(prof_tree);
 }
 
+void ClientRequestState::AddBlacklistedExecutorAddress(const NetworkAddressPB& addr) {
+  lock_guard<mutex> l(lock_);
+  if (!WasRetried()) blacklisted_executor_addresses_.emplace(addr);
+}
+
+void ClientRequestState::SetBlacklistedExecutorAddresses(
+    std::unordered_set<NetworkAddressPB>& executor_addresses) {
+  DCHECK(blacklisted_executor_addresses_.empty());
+  if (!executor_addresses.empty()) {
+    blacklisted_executor_addresses_.insert(
+        executor_addresses.begin(), executor_addresses.end());
+  }
+}
+
 Status ClientRequestState::Exec() {
   MarkActive();
 
@@ -534,7 +548,7 @@
   Status admit_status =
       ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
           {query_id_pb, exec_request_->query_exec_request, exec_request_->query_options,
-              summary_profile_, query_events_},
+              summary_profile_, query_events_, blacklisted_executor_addresses_},
           &admit_outcome_, &schedule_);
   {
     lock_guard<mutex> l(lock_);
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 424cce2..c5a4004 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -386,6 +386,26 @@
   /// Returns true if results cacheing is enabled, false otherwise.
   bool IsResultCacheingEnabled() const { return result_cache_max_size_ >= 0; }
 
+  /// Add an executor address to the set of blacklisted executor addresses. The scheduler
+  /// will not schedule any query fragment instance on the given executor.
+  /// Note: The given address is added to the set of blacklisted addresses only if the
+  /// request has not been retried, e.g retry_state_ is not equal RETRYING nor RETRIED.
+  void AddBlacklistedExecutorAddress(const NetworkAddressPB& addr);
+
+  /// Return blacklisted executor addresses for the query
+  /// This function should be called after the request is retried, no more executor
+  /// address will be added to the set of blacklisted addresses so there is no data race.
+  std::unordered_set<NetworkAddressPB>& GetBlacklistedExecutorAddresses() {
+    return blacklisted_executor_addresses_;
+  }
+
+  /// Set a set of blacklisted executor addresses. The scheduler will not schedule any
+  /// query fragment instance on the given set of executors.
+  /// This function is called before the retried request is executed so don't need
+  /// mutex protection.
+  void SetBlacklistedExecutorAddresses(
+      std::unordered_set<NetworkAddressPB>& executor_addresses);
+
  protected:
   /// Updates the end_time_us_ of this query if it isn't set. The end time is determined
   /// when this function is called for the first time, calling it multiple times does not
@@ -475,6 +495,10 @@
   /// the async-exec-thread only after Exec() has been successfully called on 'coord_'.
   AtomicBool coord_exec_called_;
 
+  /// Set of executor addresses, which allow caller to tell the scheduler to not schedule
+  /// the query on this set of executors.
+  std::unordered_set<NetworkAddressPB> blacklisted_executor_addresses_;
+
   /// Runs statements that query or modify the catalog via the CatalogService.
   boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_;
 
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index e7bd0d3..566e8ec 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -32,8 +32,15 @@
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.errors import Timeout
-from tests.common.skip import SkipIfEC
+from tests.common.skip import SkipIfEC, SkipIfBuildType
 
+# The BE krpc port of the impalad to simulate rpc errors in tests.
+FAILED_KRPC_PORT = 27001
+
+
+def _get_rpc_fail_action(port):
+  return "IMPALA_SERVICE_POOL:127.0.0.1:{port}:ExecQueryFInstances:FAIL" \
+      .format(port=port)
 
 # All tests in this class have SkipIfEC because all tests run a query and expect
 # the query to be retried when killing a random impalad. On EC this does not always work
@@ -269,6 +276,136 @@
     self.client.close_query(handle)
     self.__validate_web_ui_state()
 
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT),
+      statestored_args="--statestore_heartbeat_frequency_ms=1000 \
+          --statestore_max_missed_heartbeats=2")
+  def test_retry_exec_rpc_failure_before_admin_delay(self):
+    """Test retried query triggered by RPC failures by simulating RPC errors at the port
+    of the 2nd node in the cluster. Simulate admission delay for query with debug_action
+    so that the 2nd node is removed from the blacklist when making schedule for retried
+    query. Verify that retried query is executed successfully, while the 2nd node is not
+    in the executor blacklist and it is not assigned to any fragment instance."""
+
+    impalad_service = self.cluster.get_first_impalad().service
+    rpc_not_accessible_impalad = self.cluster.impalads[1]
+    assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    # The 2nd node cannot be accessible through KRPC so that it's added to blacklist
+    # and the query should be retried. Add delay before admission so that the 2nd node
+    # is removed from the blacklist before scheduler makes schedule for the retried
+    # query.
+    query = "select count(*) from tpch_parquet.lineitem"
+    handle = self.execute_query_async(query,
+        query_options={'retry_failed_queries': 'true',
+                       'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@18000'})
+    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 80)
+
+    # Validate that the query was retried.
+    self.__validate_runtime_profiles_from_service(impalad_service, handle)
+
+    # Assert that the query succeeded and returned the correct results.
+    results = self.client.fetch(query, handle)
+    assert results.success
+    assert len(results.data) == 1
+    assert "6001215" in results.data[0]
+
+    # The runtime profile of the retried query.
+    retried_runtime_profile = self.client.get_runtime_profile(handle)
+
+    # Assert that the 2nd node does not show up in the list of blacklisted executors
+    # from the runtime profile.
+    self.__assert_executors_not_blacklisted(rpc_not_accessible_impalad,
+        retried_runtime_profile)
+
+    # Assert that the 2nd node is not assigned any fragment instance for retried query
+    # execution.
+    self.__assert_executors_not_assigned_any_finstance(rpc_not_accessible_impalad,
+        retried_runtime_profile)
+
+    # Validate the live exec summary.
+    retried_query_id = self.__get_retried_query_id_from_summary(handle)
+    assert retried_query_id is not None
+
+    # Validate the state of the runtime profiles.
+    self.__validate_runtime_profiles(
+        retried_runtime_profile, handle.get_handle().id, retried_query_id)
+
+    # Validate the state of the client log.
+    self.__validate_client_log(handle, retried_query_id)
+
+    # Validate the state of the web ui. The query must be closed before validating the
+    # state since it asserts that no queries are in flight.
+    self.client.close_query(handle)
+    self.__validate_web_ui_state()
+
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT),
+      statestored_args="--statestore_heartbeat_frequency_ms=1000 \
+          --statestore_max_missed_heartbeats=2",
+      cluster_size=2, num_exclusive_coordinators=1)
+  def test_retry_query_failure_all_executors_blacklisted(self):
+    """Test retried query triggered by RPC failures by simulating RPC errors at the port
+    of the 2nd node, which is the only executor in the cluster. Simulate admission delay
+    for query with debug_action so that the 2nd node is removed from the blacklist and
+    added back to executor group when making schedule for retried query. Verify that
+    retried query fails due to no executor available even the 2nd node is not in the
+    executor blacklist."""
+
+    rpc_not_accessible_impalad = self.cluster.impalads[1]
+    assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    query = "select count(*) from tpch_parquet.lineitem"
+    handle = self.execute_query_async(query,
+        query_options={'retry_failed_queries': 'true',
+                       'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@18000'})
+    # Wait until the query fails.
+    self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 140)
+
+    # Validate the live exec summary.
+    retried_query_id = self.__get_retried_query_id_from_summary(handle)
+    assert retried_query_id is not None
+
+    # The runtime profile and client log of the retried query, need to be retrieved
+    # before fetching results, since the failed fetch attempt will close the
+    # query handle.
+    retried_runtime_profile = self.client.get_runtime_profile(handle)
+    self.__validate_client_log(handle, retried_query_id)
+
+    # Assert that the query failed since all executors are blacklisted and no executor
+    # available for scheduling the query to be retried. To keep consistent with the other
+    # blacklisting logic, Impalad return error message "Admission for query exceeded
+    # timeout 60000ms in pool default-pool. Queued reason: Waiting for executors to
+    # start...".
+    try:
+      self.client.fetch(self._shuffle_heavy_query, handle)
+      assert False
+    except ImpalaBeeswaxException, e:
+      assert "Admission for query exceeded timeout 60000ms in pool default-pool." \
+          in str(e)
+      assert "Queued reason: Waiting for executors to start. Only DDL queries and " \
+             "queries scheduled only on the coordinator (either NUM_NODES set to 1 " \
+             "or when small query optimization is triggered) can currently run" in str(e)
+      assert "Additional Details: Not Applicable" in str(e)
+
+    # Assert that the RPC un-reachable impalad not shows up in the list of blacklisted
+    # executors from the runtime profile.
+    self.__assert_executors_not_blacklisted(rpc_not_accessible_impalad,
+        retried_runtime_profile)
+
+    # Assert that the query id of the original query is in the runtime profile of the
+    # retried query.
+    self.__validate_original_id_in_profile(retried_runtime_profile,
+        handle.get_handle().id)
+
+    # Validate the state of the web ui. The query must be closed before validating the
+    # state since it asserts that no queries are in flight.
+    self.__validate_web_ui_state()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       statestored_args="-statestore_heartbeat_frequency_ms=1000")
@@ -809,6 +946,18 @@
     assert "Blacklisted Executors: {0}:{1}".format(blacklisted_impalad.hostname,
         blacklisted_impalad.service.be_port) in profile, profile
 
+  def __assert_executors_not_blacklisted(self, impalad, profile):
+    """Validate that the given profile indicates that the given impalad was not
+    blacklisted during retried query execution"""
+    assert not ("Blacklisted Executors: {0}:{1}".format(impalad.hostname,
+        impalad.service.be_port) in profile), profile
+
+  def __assert_executors_not_assigned_any_finstance(self, impalad, profile):
+    """Validate that the given profile indicates that the given impalad was not
+    assigned any fragment instance for query execution"""
+    assert not ("host={0}:{1}".format(impalad.hostname,
+        impalad.service.be_port) in profile), profile
+
   def __validate_client_log(self, handle, retried_query_id, use_hs2_client=False):
     """Validate the GetLog result contains query retry information"""
     if use_hs2_client: