IMPALA-9151: Maintain cluster size in ExecutorMembershipSnapshot

This change improves the cluster membership snapshot we maintain in the
frontend in cases where all executors have been shut down or none have
started yet.

Prior to this change when configuring Impala with executor groups, the
planner might see a ExecutorMembershipSnapshot that has no executors in
it. This could happen if the first executor group had not started up
yet, or if all executor groups had been shutdown. If this happened, the
planner would make sub-optimal decisions, e.g. decide on a broadcast
join vs a partitioned hash join.

With this change if no executors have been registered so far, the
planner will use the expected number of executors which can be set using
the -num_expected_executors flag and is 20 by default. After executors
come online, the planner will use the size of the largest healthy
executor group, and it will hold on to the group's size even if it shuts
down or becomes unhealthy. This allows the planner to work on the
assumption that a healthy executor group of the same size will
eventually come online to execute the query.

Change-Id: Ib6b05326c82fb3ca625c015cfcdc38f891f5d4f9
Reviewed-on: http://gerrit.cloudera.org:8080/14756
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 22049b1..7d7fea4 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -165,16 +165,46 @@
 
 namespace {
 using namespace impala;
-// Helper method to hand off cluster membership update to the frontend instance.
+/// Helper method to forward cluster membership updates to the frontend.
+///
+/// The frontend uses cluster membership information to determine whether it expects the
+/// scheduler to assign local or remote reads. It also uses the number of executors to
+/// determine the join type (partitioned vs broadcast). For the default executor group, we
+/// assume that local reads are preferred and will include the hostnames and IP addresses
+/// in the update to the frontend. For non-default executor groups, we assume that we will
+/// read data remotely and will only send the number of executors in the largest healthy
+/// group.
 void SendClusterMembershipToFrontend(
     ClusterMembershipMgr::SnapshotPtr& snapshot, Frontend* frontend) {
   TUpdateExecutorMembershipRequest update_req;
-  for (const auto& it : snapshot->current_backends) {
-    const TBackendDescriptor& backend = it.second;
-    if (backend.is_executor) {
-      update_req.hostnames.insert(backend.address.hostname);
-      update_req.ip_addresses.insert(backend.ip_address);
-      update_req.num_executors++;
+  const ExecutorGroup* group = nullptr;
+  bool is_default_group = false;
+  auto default_it =
+      snapshot->executor_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+  if (default_it != snapshot->executor_groups.end()) {
+    is_default_group = true;
+    group = &(default_it->second);
+  } else {
+    int max_num_executors = 0;
+    // Find largest healthy group.
+    for (const auto& it : snapshot->executor_groups) {
+      if (!it.second.IsHealthy()) continue;
+      int num_executors = it.second.NumExecutors();
+      if (num_executors > max_num_executors) {
+        max_num_executors = num_executors;
+        group = &(it.second);
+      }
+    }
+  }
+  if (group) {
+    for (const auto& backend : group->GetAllExecutorDescriptors()) {
+      if (backend.is_executor) {
+        if (is_default_group) {
+          update_req.hostnames.insert(backend.address.hostname);
+          update_req.ip_addresses.insert(backend.ip_address);
+        }
+        update_req.num_executors++;
+      }
     }
   }
   Status status = frontend->UpdateExecutorMembership(update_req);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6d0a20a..c79d708 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -249,6 +249,11 @@
     "contains at least that number of executors for the group will it be considered "
     "healthy for admission. Currently only a single group may be specified.");
 
+DEFINE_int32(num_expected_executors, 20, "The number of executors that are expected to "
+    "be available for the execution of a single query. This value is used during "
+    "planning if no executors have started yet. Once a healthy executor group has "
+    "started, its size is used instead.");
+
 // TODO: can we automatically choose a startup grace period based on the max admission
 // control queue timeout + some margin for error?
 DEFINE_int64(shutdown_grace_period_s, 120, "Shutdown startup grace period in seconds. "
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 56f6a65..c5eb39e 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -82,6 +82,7 @@
 DECLARE_bool(unlock_zorder_sort);
 DECLARE_string(blacklisted_tables);
 DECLARE_bool(simplify_check_on_show_tables);
+DECLARE_int32(num_expected_executors);
 
 namespace impala {
 
@@ -167,6 +168,7 @@
   cfg.__set_unlock_zorder_sort(FLAGS_unlock_zorder_sort);
   cfg.__set_blacklisted_tables(FLAGS_blacklisted_tables);
   cfg.__set_simplify_check_on_show_tables(FLAGS_simplify_check_on_show_tables);
+  cfg.__set_num_expected_executors(FLAGS_num_expected_executors);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index af29d2c..7fc47d5 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -145,4 +145,6 @@
   60: required bool simplify_check_on_show_tables
 
   61: required bool mt_dop_auto_fallback
+
+  62: required i32 num_expected_executors
 }
diff --git a/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java b/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
index 3482d6b..e852a14 100644
--- a/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
+++ b/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
@@ -20,6 +20,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
 import com.google.common.collect.Sets;
@@ -35,30 +36,50 @@
   private static AtomicReference<ExecutorMembershipSnapshot> cluster_ =
       new AtomicReference<ExecutorMembershipSnapshot>(new ExecutorMembershipSnapshot());
 
-  // The set of hosts that are members of the cluster given by hostname.
+  // The set of hosts that are members of the cluster given by hostname. Can be empty when
+  // using executor groups, in which case the planner shall assume that all reads are
+  // remote.
   private final Set<String> hostnames_;
 
-  // The set of hosts that are members of the cluster given by IP address.
+  // The set of hosts that are members of the cluster given by IP address. Can be empty
+  // when using executor groups, in which case the planner shall assume that all reads are
+  // remote.
   private final Set<String> ipAddresses_;
 
-  // The number of executor nodes of the cluster.  Normally, this will be equal to
-  // hostnames_.size(), except in the test minicluster where there are multiple
-  // impalad's running on a single host.
+  // The number of executor nodes of the cluster.
+  //
+  // When not using executor groups, this value reflects the number of executors in the
+  // cluster. It will be equal to hostnames_.size(), except in the test minicluster where
+  // multiple impalads are running on a single host.
+
+  // When using executor groups, this value reflects the number of executors in the
+  // largest healthy group. If all group become unhealthy, the backend will not send a
+  // membership update and this value will retain the last non-zero value. This allows the
+  // planner to work on the assumption that a healthy executor group of the same size will
+  // eventually come online to execute queries.
   private final int numExecutors_;
 
-  // Used only to construct the initial ExecutorMembershipSnapshot. Before we get the
-  // first snapshot, assume one node (the localhost) to mimic Scheduler.
+  // Used only to construct the initial ExecutorMembershipSnapshot.
   private ExecutorMembershipSnapshot() {
     hostnames_ = Sets.newHashSet();
     ipAddresses_ = Sets.newHashSet();
-    numExecutors_ = 1;
+    // We use 0 for the number of executors to indicate that no update from the
+    // ClusterMembershipManager has arrived yet and we will return the value
+    // '-num_expected_executors' in numExecutors().
+    numExecutors_ = 0;
   }
 
   // Construct a new snapshot based on the TUpdateExecutorMembershipRequest.
   private ExecutorMembershipSnapshot(TUpdateExecutorMembershipRequest request) {
     hostnames_ = request.getHostnames();
     ipAddresses_ = request.getIp_addresses();
-    numExecutors_ = request.getNum_executors();
+    // If the updates snapshot does not contain any executors we fall back to the previous
+    // value. This can happen if no healthy executor groups are currently online.
+    if (request.getNum_executors() > 0) {
+      numExecutors_ = request.getNum_executors();
+    } else {
+      numExecutors_ = cluster_.get().numExecutors_;
+    }
   }
 
   // Determine whether a host, given either by IP address or hostname, is a member of
@@ -68,8 +89,15 @@
     return ipAddresses_.contains(host) || hostnames_.contains(host);
   }
 
-  // The number of nodes in this snapshot.
-  public int numExecutors() { return numExecutors_; }
+  // Return the number of executors that should be used for planning. If no executors have
+  // been registered so far, this method will return a configurable default to allow the
+  // planner to operated based on the expected number of executors.
+  public int numExecutors() {
+    if (numExecutors_ == 0) {
+      return BackendConfig.INSTANCE.getBackendCfg().num_expected_executors;
+    }
+    return numExecutors_;
+  }
 
   // Atomically update the singleton snapshot instance.  After the update completes,
   // all calls to getCluster() will return the new snapshot.
diff --git a/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java b/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java
new file mode 100644
index 0000000..5f6d3ba
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests to make sure that the planner handles changes to the cluster size correctly.
+ */
+public class ClusterSizeTest extends FrontendTestBase {
+
+  /**
+   * Retrieves the explain string for query statement 'stmt' from the frontend.
+   */
+  private String getExplainString(String stmt) {
+    String ret = "";
+    try {
+      TQueryCtx queryCtx = TestUtils.createQueryContext(
+          "functional", System.getProperty("user.name"));
+      queryCtx.client_request.setStmt(stmt);
+      ret = frontend_.getExplainString(queryCtx);
+    } catch (ImpalaException e) {
+      fail(e.getMessage());
+    }
+    return ret;
+  }
+
+  /**
+   * Sends an update to the ExecutorMembershipSnapshot containing the specified number of
+   * executors. The host list will only contain localhost.
+   */
+  private void setNumExecutors(int num) {
+    TUpdateExecutorMembershipRequest updateReq = new TUpdateExecutorMembershipRequest();
+    updateReq.setIp_addresses(Sets.newHashSet("127.0.0.1"));
+    updateReq.setHostnames(Sets.newHashSet("localhost"));
+    updateReq.setNum_executors(num);
+    ExecutorMembershipSnapshot.update(updateReq);
+  }
+
+  /**
+   * IMPALA-9151: Tests that the planner selects the correct join strategy based on the
+   * number of executors in the cluster.
+   */
+  @Test
+  public void testChangeClusterSize() {
+    final String query = "select * from alltypes a inner join alltypes b on a.id = b.id";
+    final String broadcast_exchange = ":EXCHANGE [BROADCAST]";
+    final String hash_exchange = ":EXCHANGE [HASH(b.id)]";
+
+    // By default no executors are registered and the planner falls back to the value of
+    // -num_expected_executors, which is 20 by default.
+    assertTrue(getExplainString(query).contains(hash_exchange));
+
+    // Adding a single executor will make the planner switch to a broadcast join.
+    setNumExecutors(1);
+    assertTrue(getExplainString(query).contains(broadcast_exchange));
+
+    // Adding two or more executors will make the planner switch to a partitioned hash
+    // join.
+    for (int n = 2; n < 5; ++n) {
+      setNumExecutors(n);
+      assertTrue(getExplainString(query).contains(hash_exchange));
+    }
+
+    // If the backend reports a single executor, the planner should fall back to a
+    // broadcast join.
+    setNumExecutors(1);
+    assertTrue(getExplainString(query).contains(broadcast_exchange));
+  }
+}
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 0bf5164..613fcf9 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -298,3 +298,15 @@
                      "pool default-pool. Queued reason: Waiting for executors to " \
                      "start. Only DDL queries can currently run."
     assert expected_error in str(result)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1, num_exclusive_coordinators=1,
+                                    impalad_args="-num_expected_executors=10")
+  def test_num_expected_executors_flag(self):
+    """Verifies that the '-num_expected_executors' flag is effective."""
+    client = self.cluster.impalads[0].service.create_beeswax_client()
+    client.execute("set explain_level=2")
+    ret = client.execute("explain select * from functional.alltypes a inner join "
+                         "functional.alltypes b on a.id = b.id;")
+    num_hosts = "hosts=10 instances=10"
+    assert num_hosts in str(ret)
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 1bb070a..9b696bb 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -431,3 +431,77 @@
     self.coordinator.service.wait_for_metric_value(
       "admission-controller.executor-group.num-queries-executing.{0}".format(
         self._group_name(group_names[0])), 0, timeout=30)
+
+  @pytest.mark.execute_serially
+  def test_join_strategy_single_executor(self):
+    """Tests that the planner picks the correct join strategy based on the current cluster
+    size. This test uses an executor group with a minimum size of 1."""
+    TABLE = "functional.alltypes"
+    QUERY = "explain select * from %s a inner join %s b on a.id = b.id" % (TABLE, TABLE)
+
+    # Predicates to assert that a certain join type was picked.
+    def assert_broadcast_join():
+      ret = self.execute_query_expect_success(self.client, QUERY)
+      assert ":EXCHANGE [BROADCAST]" in str(ret)
+
+    def assert_hash_join():
+      ret = self.execute_query_expect_success(self.client, QUERY)
+      assert ":EXCHANGE [HASH(b.id)]" in str(ret)
+
+    # Without any executors we default to a hash join.
+    assert_hash_join()
+
+    # Add a healthy executor group of size 1 and observe that we switch to broadcast join.
+    self._add_executor_group("group1", min_size=1, num_executors=1)
+    assert_broadcast_join()
+
+    # Add another executor to the same group and observe that with two executors we pick a
+    # partitioned hash join.
+    self._add_executor_group("group1", min_size=1, num_executors=1)
+    assert_hash_join()
+
+    # Kill an executor. The group remains healthy but its size decreases and we revert
+    # back to a broadcast join.
+    self.cluster.impalads[-1].kill()
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
+                                                   timeout=20)
+    assert_broadcast_join()
+
+    # Kill a second executor. The group becomes unhealthy but we cache its last healthy
+    # size and will continue to pick a broadcast join.
+    self.cluster.impalads[-2].kill()
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1,
+                                                   timeout=20)
+    assert_broadcast_join()
+
+  @pytest.mark.execute_serially
+  def test_join_strategy_multiple_executors(self):
+    """Tests that the planner picks the correct join strategy based on the current cluster
+    size. This test uses an executor group which requires multiple executors to be
+    healthy."""
+    TABLE = "functional.alltypes"
+    QUERY = "explain select * from %s a inner join %s b on a.id = b.id" % (TABLE, TABLE)
+
+    # Predicate to assert that the planner decided on a hash join.
+    def assert_hash_join():
+      ret = self.execute_query_expect_success(self.client, QUERY)
+      assert ":EXCHANGE [HASH(b.id)]" in str(ret)
+
+    # Without any executors we default to a hash join.
+    assert_hash_join()
+
+    # Adding an unhealthy group will not affect the planner's decision.
+    self._add_executor_group("group1", min_size=2, num_executors=1)
+    assert_hash_join()
+
+    # Adding a second executor makes the group healthy (note that the resulting join
+    # strategy is the same though).
+    self._add_executor_group("group1", min_size=2, num_executors=1)
+    assert_hash_join()
+
+    # Kill an executor. The unhealthy group does not affect the planner's decision, even
+    # though only one executor is now online.
+    self.cluster.impalads[-1].kill()
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
+                                                   timeout=20)
+    assert_hash_join()