IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails
Introduces a new optional field to FragmentInstanceExecStatusPB:
AuxErrorInfoPB. AuxErrorInfoPB contains optional metadata associated
with a failed fragment instance. Currently, AuxErrorInfoPB only contains
one field: RPCErrorInfoPB, which is only set if the fragment failed
because a RPC to another impalad failed. The RPCErrorInfoPB contains
the destination node of the failed RPC and the posix error code of the
failed RPC.
Coordinator::UpdateBackendExecStatus(ReportExecStatusRequestPB, ...)
uses the information in RPCErrorInfoPB (if one is set) to blacklist
the target node. While RPCErrorInfoPB::dest_node can be set to the address
of the Coordinator, the Coordinator will not blacklist itself. The
Coordinator only blacklists the node if the RPC failed with a specific
error code (currently either ENOTCONN, ECONNREFUSED, ESHUTDOWN).
Testing:
* Ran core tests
* Added new test to test_blacklist.py
Change-Id: I733cca13847fde43c8ea2ae574d3ae04bd06419c
Reviewed-on: http://gerrit.cloudera.org:8080/14677
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1051695..7bebdd0 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -17,6 +17,7 @@
#include "runtime/coordinator.h"
+#include <cerrno>
#include <unordered_set>
#include <thrift/protocol/TDebugProtocol.h>
@@ -236,6 +237,15 @@
schedule_, query_ctx(), backend_idx, filter_mode_, entry.second));
backend_state->Init(fragment_stats_, host_profiles_, obj_pool());
backend_states_[backend_idx++] = backend_state;
+ // was_inserted is true if the pair was successfully inserted into the map, false
+ // otherwise.
+ bool was_inserted = addr_to_backend_state_
+ .emplace(backend_state->krpc_impalad_address(), backend_state)
+ .second;
+ if (UNLIKELY(!was_inserted)) {
+ DCHECK(false) << "Network address " << backend_state->krpc_impalad_address()
+ << " associated with multiple BackendStates";
+ }
}
backend_resource_state_ =
obj_pool()->Add(new BackendResourceState(backend_states_, schedule_));
@@ -827,6 +837,11 @@
// We may start receiving status reports before all exec rpcs are complete.
// Can't apply state transition until no more exec rpcs will be sent.
exec_rpcs_complete_barrier_.Wait();
+
+ // Iterate through all instance exec statuses, and use each fragment's AuxErrorInfo
+ // to possibly blacklist any "faulty" nodes.
+ UpdateBlacklistWithAuxErrorInfo(request);
+
// Transition the status if we're not already in a terminal state. This won't block
// because either this transitions to an ERROR state or the query is already in
// a terminal state.
@@ -855,6 +870,64 @@
return IsExecuting() ? Status::OK() : Status::CANCELLED;
}
+void Coordinator::UpdateBlacklistWithAuxErrorInfo(
+ const ReportExecStatusRequestPB& request) {
+ // If the Backend failed due to a RPC failure, blacklist the destination node of
+ // the failed RPC. Only blacklist one node per ReportExecStatusRequestPB to avoid
+ // blacklisting nodes too aggressively. Currently, only blacklist the first node
+ // that contains a valid RPCErrorInfoPB object.
+ for (auto instance_exec_status : request.instance_exec_status()) {
+ if (instance_exec_status.has_aux_error_info()
+ && instance_exec_status.aux_error_info().has_rpc_error_info()) {
+ RPCErrorInfoPB rpc_error_info =
+ instance_exec_status.aux_error_info().rpc_error_info();
+ DCHECK(rpc_error_info.has_dest_node());
+ DCHECK(rpc_error_info.has_posix_error_code());
+ const NetworkAddressPB& dest_node = rpc_error_info.dest_node();
+
+ auto dest_node_and_be_state =
+ addr_to_backend_state_.find(FromNetworkAddressPB(dest_node));
+
+ // If the target address of the RPC is not known to the Coordinator, it cannot
+ // be blacklisted.
+ if (dest_node_and_be_state == addr_to_backend_state_.end()) {
+ string err_msg = "Query failed due to a failed RPC to an unknown target address "
+ + NetworkAddressPBToString(dest_node);
+ DCHECK(false) << err_msg;
+ LOG(ERROR) << err_msg;
+ continue;
+ }
+
+ // The execution parameters of the destination node for the failed RPC.
+ const BackendExecParams* dest_node_exec_params =
+ dest_node_and_be_state->second->exec_params();
+
+ // The Coordinator for the query should never be blacklisted.
+ if (dest_node_exec_params->is_coord_backend) {
+ VLOG_QUERY << "Query failed due to a failed RPC to the Coordinator";
+ continue;
+ }
+
+ // A set of RPC related posix error codes that should cause the target node
+ // of the failed RPC to be blacklisted.
+ static const set<int32_t> blacklistable_rpc_error_codes = {
+ ENOTCONN, // 107: Transport endpoint is not connected
+ ESHUTDOWN, // 108: Cannot send after transport endpoint shutdown
+ ECONNREFUSED // 111: Connection refused
+ };
+
+ if (blacklistable_rpc_error_codes.find(rpc_error_info.posix_error_code())
+ != blacklistable_rpc_error_codes.end()) {
+ LOG(INFO) << "Blacklisting " << NetworkAddressPBToString(dest_node)
+ << " because a RPC to it failed.";
+ ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
+ dest_node_exec_params->be_desc);
+ break;
+ }
+ }
+ }
+}
+
int64_t Coordinator::GetMaxBackendStateLagMs(TNetworkAddress* address) {
if (exec_rpcs_complete_barrier_.pending() > 0) {
// Exec() hadn't completed for all the backends, so we can't rely on
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0185f99..816018a 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -248,6 +248,13 @@
/// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
std::vector<BackendState*> backend_states_;
+ /// A map from the TNetworkAddress of a backend to the BackendState running on the
+ /// TNetworkAddress. All values are non-nullptr and owned by obj_pool(). The address
+ /// is the kRPC address (Coordinator::BackendState::krpc_impalad_address) of the
+ /// Backend. This map is distinct from QuerySchedule::per_backend_exec_params(),
+ /// which uses the Thrift address as the key rather than the kRPC address.
+ boost::unordered_map<TNetworkAddress, BackendState*> addr_to_backend_state_;
+
/// Protects the population of backend_states_ vector (not the BackendState objects).
/// Used when accessing backend_states_ if it's not guaranteed that
/// InitBackendStates() has completed.
@@ -541,6 +548,16 @@
/// Checks the exec_state_ of the query and returns true if the query is executing.
bool IsExecuting();
+ /// Helper function for UpdateBackendExecStatus that iterates through the
+ /// FragmentInstanceExecStatusPB for each fragment and uses AuxErrorInfoPB to check if
+ /// any nodes should be blacklisted. AuxErrorInfoPB contains additional error
+ /// information about why the fragment failed, beyond what is available in the
+ /// ReportExecStatusRequestPB::overall_status field. This method uses information in
+ /// AuxErrorInfoPB to classify specific nodes as "faulty" and then blacklists them. A
+ /// node might be considered "faulty" if, for example, a RPC to that node failed, or a
+ /// fragment on that node failed due to a disk IO error.
+ void UpdateBlacklistWithAuxErrorInfo(const ReportExecStatusRequestPB& request);
+
/// BackendState and BackendResourceState are private to the Coordinator class, so mark
/// all tests in CoordinatorBackendStateTest as friends.
friend class CoordinatorBackendStateTest;
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index dde5b79..02bc9cf 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -290,6 +290,10 @@
stateful_report->set_report_seq_no(report_seq_no_);
runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
}
+ // If set in the RuntimeState, set the AuxErrorInfoPB field.
+ if (runtime_state()->HasAuxErrorInfo()) {
+ runtime_state()->GetAuxErrorInfo(instance_status->mutable_aux_error_info());
+ }
}
void FragmentInstanceState::ReportSuccessful(
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 972925c..f1a9951 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -443,6 +443,10 @@
MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
return;
}
+ // If the RPC failed due to a network error, set the RPC error info in RuntimeState.
+ if (controller_status.IsNetworkError()) {
+ parent_->state_->SetRPCErrorInfo(address_, controller_status.posix_code());
+ }
MarkDone(FromKuduStatus(controller_status, prepend));
}
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 6d1ab84..a7ff96d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -311,6 +311,25 @@
released_resources_ = true;
}
+void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) {
+ boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+ if (aux_error_info_ == nullptr) {
+ aux_error_info_.reset(new AuxErrorInfoPB());
+ RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
+ NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
+ network_addr->set_hostname(dest_node.hostname);
+ network_addr->set_port(dest_node.port);
+ rpc_error_info->set_posix_error_code(posix_error_code);
+ }
+}
+
+void RuntimeState::GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
+ boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+ if (aux_error_info_ != nullptr) {
+ aux_error_info->CopyFrom(*aux_error_info_);
+ }
+}
+
const std::string& RuntimeState::GetEffectiveUser() const {
return impala::GetEffectiveUser(query_ctx().session);
}
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 16c6df4..884fae6 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -302,6 +302,26 @@
/// Release resources and prepare this object for destruction. Can only be called once.
void ReleaseResources();
+ /// If the fragment instance associated with this RuntimeState failed due to a RPC
+ /// failure, use this method to set the network address of the RPC's target node and
+ /// the posix error code of the failed RPC. The target node address and posix error code
+ /// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This method is
+ /// idempotent.
+ void SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code);
+
+ /// Returns true if this RuntimeState has any auxiliary error information, false
+ /// otherwise. Currently, only SetRPCErrorInfo() sets aux error info.
+ bool HasAuxErrorInfo() {
+ boost::lock_guard<SpinLock> l(aux_error_info_lock_);
+ return aux_error_info_ != nullptr;
+ }
+
+ /// Sets the given AuxErrorInfoPB with all relevant aux error info from the fragment
+ /// instance associated with this RuntimeState. If no aux error info for this
+ /// RuntimeState has been set, this method does nothing. Currently, only
+ /// SetRPCErrorInfo() sets aux error info.
+ void GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info);
+
static const char* LLVM_CLASS_NAME;
private:
@@ -414,6 +434,13 @@
/// nodes that share this runtime state.
boost::scoped_ptr<RuntimeFilterBank> filter_bank_;
+ /// Lock protecting aux_error_info_.
+ SpinLock aux_error_info_lock_;
+
+ /// Auxiliary error information, only set if the fragment instance failed (e.g.
+ /// query_status_ != Status::OK()). Owned by this RuntimeState.
+ std::unique_ptr<AuxErrorInfoPB> aux_error_info_;
+
/// prohibit copies
RuntimeState(const RuntimeState&);
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index d29e3e9..d31076b 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -167,6 +167,19 @@
return ss.str();
}
+string NetworkAddressPBToString(const NetworkAddressPB& address) {
+ stringstream ss;
+ ss << address.hostname() << ":" << dec << address.port();
+ return ss.str();
+}
+
+TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) {
+ TNetworkAddress t_address;
+ t_address.__set_hostname(address.hostname());
+ t_address.__set_port(address.port());
+ return t_address;
+}
+
/// Pick a random port in the range of ephemeral ports
/// https://tools.ietf.org/html/rfc6335
int FindUnusedEphemeralPort() {
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 947ea03..b6629af 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -16,6 +16,7 @@
// under the License.
#include "common/status.h"
+#include "gen-cpp/common.pb.h"
#include "gen-cpp/StatestoreService_types.h"
#include "gen-cpp/Types_types.h"
#include <vector>
@@ -64,6 +65,12 @@
/// Utility method to print address as address:port
std::string TNetworkAddressToString(const TNetworkAddress& address);
+/// Utility method to print a NetworkAddressPB as address:port.
+std::string NetworkAddressPBToString(const NetworkAddressPB& address);
+
+/// Utility method to convert a NetworkAddressPB to a TNetworkAddress.
+TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address);
+
/// Utility method to convert TNetworkAddress to Kudu sock addr.
/// Note that 'address' has to contain a resolved IP address.
Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 6c265a3..0bf2a97 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -21,6 +21,12 @@
package impala;
+// Refer to Types.thrift for documentation.
+message NetworkAddressPB {
+ required string hostname = 1;
+ required int32 port = 2;
+}
+
// Proto-serialized version of Impala's Status object.
message StatusPB {
optional int32 status_code = 1;
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 50cad00..085be7a 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -124,6 +124,25 @@
map<int32, ErrorLogEntryPB> error_log = 2;
}
+// RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
+// RPC to another node failed.
+message RPCErrorInfoPB {
+ // The address of the RPC's target node.
+ required NetworkAddressPB dest_node = 1;
+
+ // The posix error code of the failed RPC.
+ required int32 posix_error_code = 2;
+}
+
+// Error metadata that can be associated with a failed fragment instance. Used to store
+// extra info about errors encountered during fragment execution. This information is
+// used by the Coordinator to blacklist potentially unhealthy nodes.
+message AuxErrorInfoPB {
+ // Set if the fragment instance failed because a RPC to another node failed. Only set
+ // if the RPC failed due to a network error.
+ optional RPCErrorInfoPB rpc_error_info = 1;
+}
+
message FragmentInstanceExecStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
optional int64 report_seq_no = 1;
@@ -144,6 +163,10 @@
// The non-idempotent parts of the report, and any prior reports that are not known to
// have been received by the coordinator.
repeated StatefulStatusPB stateful_report = 6;
+
+ // Metadata associated with a failed fragment instance. Only set for failed fragment
+ // instances.
+ optional AuxErrorInfoPB aux_error_info = 7;
}
message ReportExecStatusRequestPB {
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 4b47039..b278b8b 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -20,6 +20,7 @@
import pytest
import re
+from beeswaxd.BeeswaxService import QueryState
from tests.common.skip import SkipIfNotHdfsMinicluster
from time import sleep
@@ -113,3 +114,39 @@
assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
result.runtime_profile
assert re.search("NumBackends: 3", result.runtime_profile), result.runtime_profile
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
+ def test_kill_impalad_with_running_queries(self, cursor):
+ """Verifies that when an Impala executor is killed while running a query, that the
+ Coordinator blacklists the killed executor."""
+
+ # Run a query asynchronously. Normally, this query should take a few seconds to
+ # complete.
+ query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
+ where t1.l_orderkey = t2.l_orderkey"
+ handle = self.execute_query_async(query)
+
+ # Wait for the query to start running
+ self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)
+
+ # Kill one of the Impala executors
+ killed_impalad = self.cluster.impalads[2]
+ killed_impalad.kill()
+
+ # Try to fetch results from the query. Fetch requests should fail because one of the
+ # impalads running the query was killed. When the query fails, the Coordinator should
+ # add the killed Impala executor to the blacklist (since a RPC to that node failed).
+ try:
+ self.client.fetch(query, handle)
+ assert False, "Query was expected to fail"
+ except Exception as e:
+ # The query should fail due to an RPC error.
+ assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)
+
+ # Run another query which should succeed and verify the impalad was blacklisted.
+ result = self.execute_query("select count(*) from tpch.lineitem")
+ match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
+ assert match is not None and match.group(1) == "%s:%s" % \
+ (killed_impalad.hostname, killed_impalad.service.be_port), \
+ result.runtime_profile