IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails

Introduces a new optional field to FragmentInstanceExecStatusPB:
AuxErrorInfoPB. AuxErrorInfoPB contains optional metadata associated
with a failed fragment instance. Currently, AuxErrorInfoPB only contains
one field: RPCErrorInfoPB, which is only set if the fragment failed
because a RPC to another impalad failed. The RPCErrorInfoPB contains
the destination node of the failed RPC and the posix error code of the
failed RPC.

Coordinator::UpdateBackendExecStatus(ReportExecStatusRequestPB, ...)
uses the information in RPCErrorInfoPB (if one is set) to blacklist
the target node. While RPCErrorInfoPB::dest_node can be set to the address
of the Coordinator, the Coordinator will not blacklist itself. The
Coordinator only blacklists the node if the RPC failed with a specific
error code (currently either ENOTCONN, ECONNREFUSED, ESHUTDOWN).

Testing:
* Ran core tests
* Added new test to test_blacklist.py

Change-Id: I733cca13847fde43c8ea2ae574d3ae04bd06419c
Reviewed-on: http://gerrit.cloudera.org:8080/14677
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1051695..7bebdd0 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -17,6 +17,7 @@
 
 #include "runtime/coordinator.h"
 
+#include <cerrno>
 #include <unordered_set>
 
 #include <thrift/protocol/TDebugProtocol.h>
@@ -236,6 +237,15 @@
         schedule_, query_ctx(), backend_idx, filter_mode_, entry.second));
     backend_state->Init(fragment_stats_, host_profiles_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
+    // was_inserted is true if the pair was successfully inserted into the map, false
+    // otherwise.
+    bool was_inserted = addr_to_backend_state_
+                            .emplace(backend_state->krpc_impalad_address(), backend_state)
+                            .second;
+    if (UNLIKELY(!was_inserted)) {
+      DCHECK(false) << "Network address " << backend_state->krpc_impalad_address()
+                    << " associated with multiple BackendStates";
+    }
   }
   backend_resource_state_ =
       obj_pool()->Add(new BackendResourceState(backend_states_, schedule_));
@@ -827,6 +837,11 @@
       // We may start receiving status reports before all exec rpcs are complete.
       // Can't apply state transition until no more exec rpcs will be sent.
       exec_rpcs_complete_barrier_.Wait();
+
+      // Iterate through all instance exec statuses, and use each fragment's AuxErrorInfo
+      // to possibly blacklist any "faulty" nodes.
+      UpdateBlacklistWithAuxErrorInfo(request);
+
       // Transition the status if we're not already in a terminal state. This won't block
       // because either this transitions to an ERROR state or the query is already in
       // a terminal state.
@@ -855,6 +870,64 @@
   return IsExecuting() ? Status::OK() : Status::CANCELLED;
 }
 
+void Coordinator::UpdateBlacklistWithAuxErrorInfo(
+    const ReportExecStatusRequestPB& request) {
+  // If the Backend failed due to a RPC failure, blacklist the destination node of
+  // the failed RPC. Only blacklist one node per ReportExecStatusRequestPB to avoid
+  // blacklisting nodes too aggressively. Currently, only blacklist the first node
+  // that contains a valid RPCErrorInfoPB object.
+  for (auto instance_exec_status : request.instance_exec_status()) {
+    if (instance_exec_status.has_aux_error_info()
+        && instance_exec_status.aux_error_info().has_rpc_error_info()) {
+      RPCErrorInfoPB rpc_error_info =
+          instance_exec_status.aux_error_info().rpc_error_info();
+      DCHECK(rpc_error_info.has_dest_node());
+      DCHECK(rpc_error_info.has_posix_error_code());
+      const NetworkAddressPB& dest_node = rpc_error_info.dest_node();
+
+      auto dest_node_and_be_state =
+          addr_to_backend_state_.find(FromNetworkAddressPB(dest_node));
+
+      // If the target address of the RPC is not known to the Coordinator, it cannot
+      // be blacklisted.
+      if (dest_node_and_be_state == addr_to_backend_state_.end()) {
+        string err_msg = "Query failed due to a failed RPC to an unknown target address "
+            + NetworkAddressPBToString(dest_node);
+        DCHECK(false) << err_msg;
+        LOG(ERROR) << err_msg;
+        continue;
+      }
+
+      // The execution parameters of the destination node for the failed RPC.
+      const BackendExecParams* dest_node_exec_params =
+          dest_node_and_be_state->second->exec_params();
+
+      // The Coordinator for the query should never be blacklisted.
+      if (dest_node_exec_params->is_coord_backend) {
+        VLOG_QUERY << "Query failed due to a failed RPC to the Coordinator";
+        continue;
+      }
+
+      // A set of RPC related posix error codes that should cause the target node
+      // of the failed RPC to be blacklisted.
+      static const set<int32_t> blacklistable_rpc_error_codes = {
+          ENOTCONN, // 107: Transport endpoint is not connected
+          ESHUTDOWN, // 108: Cannot send after transport endpoint shutdown
+          ECONNREFUSED  // 111: Connection refused
+      };
+
+      if (blacklistable_rpc_error_codes.find(rpc_error_info.posix_error_code())
+          != blacklistable_rpc_error_codes.end()) {
+        LOG(INFO) << "Blacklisting " << NetworkAddressPBToString(dest_node)
+                  << " because a RPC to it failed.";
+        ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
+            dest_node_exec_params->be_desc);
+        break;
+      }
+    }
+  }
+}
+
 int64_t Coordinator::GetMaxBackendStateLagMs(TNetworkAddress* address) {
   if (exec_rpcs_complete_barrier_.pending() > 0) {
     // Exec() hadn't completed for all the backends, so we can't rely on
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0185f99..816018a 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -248,6 +248,13 @@
   /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
   std::vector<BackendState*> backend_states_;
 
+  /// A map from the TNetworkAddress of a backend to the BackendState running on the
+  /// TNetworkAddress. All values are non-nullptr and owned by obj_pool(). The address
+  /// is the kRPC address (Coordinator::BackendState::krpc_impalad_address) of the
+  /// Backend. This map is distinct from QuerySchedule::per_backend_exec_params(),
+  /// which uses the Thrift address as the key rather than the kRPC address.
+  boost::unordered_map<TNetworkAddress, BackendState*> addr_to_backend_state_;
+
   /// Protects the population of backend_states_ vector (not the BackendState objects).
   /// Used when accessing backend_states_ if it's not guaranteed that
   /// InitBackendStates() has completed.
@@ -541,6 +548,16 @@
   /// Checks the exec_state_ of the query and returns true if the query is executing.
   bool IsExecuting();
 
+  /// Helper function for UpdateBackendExecStatus that iterates through the
+  /// FragmentInstanceExecStatusPB for each fragment and uses AuxErrorInfoPB to check if
+  /// any nodes should be blacklisted. AuxErrorInfoPB contains additional error
+  /// information about why the fragment failed, beyond what is available in the
+  /// ReportExecStatusRequestPB::overall_status field. This method uses information in
+  /// AuxErrorInfoPB to classify specific nodes as "faulty" and then blacklists them. A
+  /// node might be considered "faulty" if, for example, a RPC to that node failed, or a
+  /// fragment on that node failed due to a disk IO error.
+  void UpdateBlacklistWithAuxErrorInfo(const ReportExecStatusRequestPB& request);
+
   /// BackendState and BackendResourceState are private to the Coordinator class, so mark
   /// all tests in CoordinatorBackendStateTest as friends.
   friend class CoordinatorBackendStateTest;
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index dde5b79..02bc9cf 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -290,6 +290,10 @@
     stateful_report->set_report_seq_no(report_seq_no_);
     runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
   }
+  // If set in the RuntimeState, set the AuxErrorInfoPB field.
+  if (runtime_state()->HasAuxErrorInfo()) {
+    runtime_state()->GetAuxErrorInfo(instance_status->mutable_aux_error_info());
+  }
 }
 
 void FragmentInstanceState::ReportSuccessful(
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 972925c..f1a9951 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -443,6 +443,10 @@
         MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
     return;
   }
+  // If the RPC failed due to a network error, set the RPC error info in RuntimeState.
+  if (controller_status.IsNetworkError()) {
+    parent_->state_->SetRPCErrorInfo(address_, controller_status.posix_code());
+  }
   MarkDone(FromKuduStatus(controller_status, prepend));
 }
 
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 6d1ab84..a7ff96d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -311,6 +311,25 @@
   released_resources_ = true;
 }
 
+void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) {
+  boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+  if (aux_error_info_ == nullptr) {
+    aux_error_info_.reset(new AuxErrorInfoPB());
+    RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
+    NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
+    network_addr->set_hostname(dest_node.hostname);
+    network_addr->set_port(dest_node.port);
+    rpc_error_info->set_posix_error_code(posix_error_code);
+  }
+}
+
+void RuntimeState::GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
+  boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+  if (aux_error_info_ != nullptr) {
+    aux_error_info->CopyFrom(*aux_error_info_);
+  }
+}
+
 const std::string& RuntimeState::GetEffectiveUser() const {
   return impala::GetEffectiveUser(query_ctx().session);
 }
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 16c6df4..884fae6 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -302,6 +302,26 @@
   /// Release resources and prepare this object for destruction. Can only be called once.
   void ReleaseResources();
 
+  /// If the fragment instance associated with this RuntimeState failed due to a RPC
+  /// failure, use this method to set the network address of the RPC's target node and
+  /// the posix error code of the failed RPC. The target node address and posix error code
+  /// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This method is
+  /// idempotent.
+  void SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code);
+
+  /// Returns true if this RuntimeState has any auxiliary error information, false
+  /// otherwise. Currently, only SetRPCErrorInfo() sets aux error info.
+  bool HasAuxErrorInfo() {
+    boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+    return aux_error_info_ != nullptr;
+  }
+
+  /// Sets the given AuxErrorInfoPB with all relevant aux error info from the fragment
+  /// instance associated with this RuntimeState. If no aux error info for this
+  /// RuntimeState has been set, this method does nothing. Currently, only
+  /// SetRPCErrorInfo() sets aux error info.
+  void GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info);
+
   static const char* LLVM_CLASS_NAME;
 
  private:
@@ -414,6 +434,13 @@
   /// nodes that share this runtime state.
   boost::scoped_ptr<RuntimeFilterBank> filter_bank_;
 
+  /// Lock protecting aux_error_info_.
+  SpinLock aux_error_info_lock_;
+
+  /// Auxiliary error information, only set if the fragment instance failed (e.g.
+  /// query_status_ != Status::OK()). Owned by this RuntimeState.
+  std::unique_ptr<AuxErrorInfoPB> aux_error_info_;
+
   /// prohibit copies
   RuntimeState(const RuntimeState&);
 
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index d29e3e9..d31076b 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -167,6 +167,19 @@
   return ss.str();
 }
 
+string NetworkAddressPBToString(const NetworkAddressPB& address) {
+  stringstream ss;
+  ss << address.hostname() << ":" << dec << address.port();
+  return ss.str();
+}
+
+TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) {
+  TNetworkAddress t_address;
+  t_address.__set_hostname(address.hostname());
+  t_address.__set_port(address.port());
+  return t_address;
+}
+
 /// Pick a random port in the range of ephemeral ports
 /// https://tools.ietf.org/html/rfc6335
 int FindUnusedEphemeralPort() {
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 947ea03..b6629af 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "common/status.h"
+#include "gen-cpp/common.pb.h"
 #include "gen-cpp/StatestoreService_types.h"
 #include "gen-cpp/Types_types.h"
 #include <vector>
@@ -64,6 +65,12 @@
 /// Utility method to print address as address:port
 std::string TNetworkAddressToString(const TNetworkAddress& address);
 
+/// Utility method to print a NetworkAddressPB as address:port.
+std::string NetworkAddressPBToString(const NetworkAddressPB& address);
+
+/// Utility method to convert a NetworkAddressPB to a TNetworkAddress.
+TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address);
+
 /// Utility method to convert TNetworkAddress to Kudu sock addr.
 /// Note that 'address' has to contain a resolved IP address.
 Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 6c265a3..0bf2a97 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -21,6 +21,12 @@
 
 package impala;
 
+// Refer to Types.thrift for documentation.
+message NetworkAddressPB {
+  required string hostname = 1;
+  required int32 port = 2;
+}
+
 // Proto-serialized version of Impala's Status object.
 message StatusPB {
   optional int32 status_code = 1;
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 50cad00..085be7a 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -124,6 +124,25 @@
   map<int32, ErrorLogEntryPB> error_log = 2;
 }
 
+// RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
+// RPC to another node failed.
+message RPCErrorInfoPB {
+  // The address of the RPC's target node.
+  required NetworkAddressPB dest_node = 1;
+
+  // The posix error code of the failed RPC.
+  required int32 posix_error_code = 2;
+}
+
+// Error metadata that can be associated with a failed fragment instance. Used to store
+// extra info about errors encountered during fragment execution. This information is
+// used by the Coordinator to blacklist potentially unhealthy nodes.
+message AuxErrorInfoPB {
+  // Set if the fragment instance failed because a RPC to another node failed. Only set
+  // if the RPC failed due to a network error.
+  optional RPCErrorInfoPB rpc_error_info = 1;
+}
+
 message FragmentInstanceExecStatusPB {
   // Sequence number prevents out-of-order or duplicated updates from being applied.
   optional int64 report_seq_no = 1;
@@ -144,6 +163,10 @@
   // The non-idempotent parts of the report, and any prior reports that are not known to
   // have been received by the coordinator.
   repeated StatefulStatusPB stateful_report = 6;
+
+  // Metadata associated with a failed fragment instance. Only set for failed fragment
+  // instances.
+  optional AuxErrorInfoPB aux_error_info = 7;
 }
 
 message ReportExecStatusRequestPB {
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 4b47039..b278b8b 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -20,6 +20,7 @@
 import pytest
 import re
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.common.skip import SkipIfNotHdfsMinicluster
 from time import sleep
 
@@ -113,3 +114,39 @@
     assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
         result.runtime_profile
     assert re.search("NumBackends: 3", result.runtime_profile), result.runtime_profile
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
+  def test_kill_impalad_with_running_queries(self, cursor):
+    """Verifies that when an Impala executor is killed while running a query, that the
+    Coordinator blacklists the killed executor."""
+
+    # Run a query asynchronously. Normally, this query should take a few seconds to
+    # complete.
+    query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
+        where t1.l_orderkey = t2.l_orderkey"
+    handle = self.execute_query_async(query)
+
+    # Wait for the query to start running
+    self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)
+
+    # Kill one of the Impala executors
+    killed_impalad = self.cluster.impalads[2]
+    killed_impalad.kill()
+
+    # Try to fetch results from the query. Fetch requests should fail because one of the
+    # impalads running the query was killed. When the query fails, the Coordinator should
+    # add the killed Impala executor to the blacklist (since a RPC to that node failed).
+    try:
+      self.client.fetch(query, handle)
+      assert False, "Query was expected to fail"
+    except Exception as e:
+      # The query should fail due to an RPC error.
+      assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)
+
+    # Run another query which should succeed and verify the impalad was blacklisted.
+    result = self.execute_query("select count(*) from tpch.lineitem")
+    match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
+    assert match is not None and match.group(1) == "%s:%s" % \
+      (killed_impalad.hostname, killed_impalad.service.be_port), \
+      result.runtime_profile