AMBARI-22628 - YARN Shuffle Service Can't Be Found On Client-Only Nodes After New Cluster Install (jonathanhurley)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 9d5e29e..ffe2ed6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -29,8 +29,6 @@
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.UpgradeContext.UpgradeSummary;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
@@ -560,30 +558,21 @@
   }
 
   /**
-   * Used to set a map of {service -> { component -> version}}.  This is necessary when performing
-   * an upgrade to correct build paths of required binaries.
-   * @param cluster the cluster from which to build the map
+   * Used to set a map of {service -> { component -> version}}. This is
+   * necessary when performing an upgrade to correct build paths of required
+   * binaries. This method will only set the version information for a component
+   * if:
+   * <ul>
+   * <li>The component advertises a version</li>
+   * <li>The repository for the component has been resolved and the version can
+   * be trusted</li>
+   * </ul>
+   *
+   * @param cluster
+   *          the cluster from which to build the map
    */
   public void setComponentVersions(Cluster cluster) throws AmbariException {
-    Map<String, Map<String, String>> componentVersionMap = new HashMap<>();
-
-    for (Service service : cluster.getServices().values()) {
-      Map<String, String> componentMap = new HashMap<>();
-
-      boolean shouldSet = false;
-      for (ServiceComponent component : service.getServiceComponents().values()) {
-        if (component.isVersionAdvertised()) {
-          shouldSet = true;
-          componentMap.put(component.getName(), component.getDesiredVersion());
-        }
-      }
-
-      if (shouldSet) {
-        componentVersionMap.put(service.getName(), componentMap);
-      }
-    }
-
-    this.componentVersionMap = componentVersionMap;
+    componentVersionMap = cluster.getComponentVersionMap();
   }
 
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
index a7c712b..5e42eb4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
@@ -448,6 +448,8 @@
         jsonContent.put("clusterName", cluster.getClusterName());
         jsonContent.put("serviceName", serviceName);
         jsonContent.put("role", componentName);
+        jsonContent.put("componentVersionMap", cluster.getComponentVersionMap());
+
         jsonConfigurations = gson.toJson(jsonContent);
 
         File tmpDirectory = new File(TMP_PATH);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 15efcd2..7962148 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -678,4 +678,13 @@
    */
   void addSuspendedUpgradeParameters(Map<String, String> commandParams,
       Map<String, String> roleParams);
+
+  /**
+   * Gets a mapping of service to component/version for every installed
+   * component in the cluster which advertises a version and for which the
+   * repository has been resolved.
+   *
+   * @return a mapping of service to component version, or an empty map.
+   */
+  Map<String, Map<String, String>> getComponentVersionMap();
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index ce328f9..4a0e3a2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -2808,4 +2808,35 @@
     // suspended goes in role params
     roleParams.put(KeyNames.UPGRADE_SUSPENDED, Boolean.TRUE.toString().toLowerCase());
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Map<String, Map<String, String>> getComponentVersionMap() {
+    Map<String, Map<String, String>> componentVersionMap = new HashMap<>();
+
+    for (Service service : getServices().values()) {
+      Map<String, String> componentMap = new HashMap<>();
+      for (ServiceComponent component : service.getServiceComponents().values()) {
+        // skip components which don't advertise a version
+        if (!component.isVersionAdvertised()) {
+          continue;
+        }
+
+        // if the repo isn't resolved, then we can't trust the version
+        if (!component.getDesiredRepositoryVersion().isResolved()) {
+          continue;
+        }
+
+        componentMap.put(component.getName(), component.getDesiredVersion());
+      }
+
+      if (!componentMap.isEmpty()) {
+        componentVersionMap.put(service.getName(), componentMap);
+      }
+    }
+
+    return componentVersionMap;
+  }
 }
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
index 9814145..6b5559c 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
@@ -21,6 +21,7 @@
 import os
 
 from resource_management.core import sudo
+from resource_management.core.logger import Logger
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.functions import component_version
@@ -84,11 +85,45 @@
 # It cannot be used during the initial Cluser Install because the version is not yet known.
 version = default("/commandParams/version", None)
 
+def get_spark_version(service_name, component_name, yarn_version):
+  """
+  Attempts to calculate the correct version placeholder value for spark or spark2 based on
+  what is installed in the cluster. If Spark is not installed, then this value will need to be
+  that of YARN so it can still find the correct shuffle class.
+
+  On cluster installs, we have not yet calcualted any versions and all known values could be None.
+  This doesn't affect daemons, but it does affect client-only hosts where they will never receive
+  a start command after install. Therefore, this function will attempt to use stack-select as a
+  last resort to get a value value.
+  :param service_name:  the service name (SPARK, SPARK2, etc)
+  :param component_name:  the component name (SPARK_CLIENT, etc)
+  :param yarn_version:  the default version of Yarn to use if no spark is installed
+  :return:  a value for the version placeholder in shuffle classpath properties
+  """
+  # start off seeing if we need to populate a default value for YARN
+  if yarn_version is None:
+    yarn_version = component_version.get_component_repository_version(service_name = "YARN",
+      component_name = "YARN_CLIENT")
+
+  # now try to get the version of spark/spark2, defaulting to the version if YARN
+  shuffle_classpath_version = component_version.get_component_repository_version(service_name = service_name,
+    component_name = component_name, default_value = yarn_version)
+
+  # even with the default of using YARN's version, on an install this might be None since we haven't
+  # calculated the version of YARN yet - use stack_select as a last ditch effort
+  if shuffle_classpath_version is None:
+    try:
+      shuffle_classpath_version = stack_select.get_role_component_current_stack_version()
+    except:
+      Logger.exception("Unable to query for the correct shuffle classpath")
+
+  return shuffle_classpath_version
+
 # these are used to render the classpath for picking up Spark classes
 # in the event that spark is not installed, then we must default to the vesrion of YARN installed
 # since it will still load classes from its own spark version
-spark_version = component_version.get_component_repository_version(service_name =  "SPARK", component_name = "SPARK_CLIENT", default_value = version)
-spark2_version = component_version.get_component_repository_version(service_name = "SPARK2", component_name = "SPARK2_CLIENT", default_value = version)
+spark_version = get_spark_version("SPARK", "SPARK_CLIENT", version)
+spark2_version = get_spark_version("SPARK2", "SPARK2_CLIENT", version)
 
 stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
 stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
@@ -541,5 +576,3 @@
 
 # need this to capture cluster name from where ranger yarn plugin is enabled
 cluster_name = config['clusterName']
-
-# ranger yarn plugin end section
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
index 2efc7a7..bd27945 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
@@ -291,7 +291,12 @@
     Cluster cluster = clusters.getCluster(CLUSTER1);
 
     StackId stackId = cluster.getDesiredStackVersion();
+    
+    // set the repo version resolved state to verify that the version is not sent
     RepositoryVersionEntity repositoryVersion = ormTestHelper.getOrCreateRepositoryVersion(stackId, "0.1-0000");
+    repositoryVersion.setResolved(false);
+    ormTestHelper.repositoryVersionDAO.merge(repositoryVersion);
+
     Service service = cluster.getService("HDFS");
     service.setDesiredRepositoryVersion(repositoryVersion);
 
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
index ec18a32..f2a6e91 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
@@ -564,6 +564,52 @@
     Assert.assertTrue(command.getComponentVersionMap().containsKey("ZOOKEEPER"));
   }
 
+  /**
+   * Tests that if a component's repository is not resolved, then the repo
+   * version map does not get populated.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAvailableServicesMapIsEmptyWhenRepositoriesNotResolved() throws Exception {
+
+    // set all repos to resolve=false to verify that we don't get a
+    // component version map
+    RepositoryVersionDAO repositoryVersionDAO = injector.getInstance(RepositoryVersionDAO.class);
+    List<RepositoryVersionEntity> repoVersions = repositoryVersionDAO.findAll();
+    for (RepositoryVersionEntity repoVersion : repoVersions) {
+      repoVersion.setResolved(false);
+      repositoryVersionDAO.merge(repoVersion);
+    }
+
+    Map<String, String> requestProperties = new HashMap<String, String>() {
+      {
+        put(REQUEST_CONTEXT_PROPERTY, "Refresh YARN Capacity Scheduler");
+        put("command", "REFRESHQUEUES");
+      }
+    };
+
+    ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", "REFRESHQUEUES",
+        new HashMap<String, String>() {
+          {
+            put("forceRefreshConfigTags", "capacity-scheduler");
+          }
+        }, false);
+
+    actionRequest.getResourceFilters().add(new RequestResourceFilter("YARN", "RESOURCEMANAGER",
+        Collections.singletonList("c1-c6401")));
+
+    EasyMock.replay(hostRoleCommand, actionManager, configHelper);
+
+    ambariManagementController.createAction(actionRequest, requestProperties);
+    Request request = requestCapture.getValue();
+    Stage stage = request.getStages().iterator().next();
+    List<ExecutionCommandWrapper> commands = stage.getExecutionCommands("c1-c6401");
+    ExecutionCommand command = commands.get(0).getExecutionCommand();
+
+    Assert.assertTrue(MapUtils.isEmpty(command.getComponentVersionMap()));
+  }
+
   @Test
   public void testCommandRepository() throws Exception {
     Cluster cluster = clusters.getCluster("c1");
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
index a2406ea..1d92e7f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
@@ -674,6 +674,9 @@
 
         repositoryVersion = repositoryVersionDAO.create(stackEntity, version,
             String.valueOf(System.currentTimeMillis()) + uniqueCounter.incrementAndGet(), operatingSystems);
+
+        repositoryVersion.setResolved(true);
+        repositoryVersion = repositoryVersionDAO.merge(repositoryVersion);
       } catch (Exception ex) {
         LOG.error("Caught exception", ex);