AMBARI-22628 - YARN Shuffle Service Can't Be Found On Client-Only Nodes After New Cluster Install (jonathanhurley)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 9d5e29e..ffe2ed6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -29,8 +29,6 @@
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.UpgradeContext.UpgradeSummary;
import org.apache.ambari.server.utils.StageUtils;
import org.slf4j.Logger;
@@ -560,30 +558,21 @@
}
/**
- * Used to set a map of {service -> { component -> version}}. This is necessary when performing
- * an upgrade to correct build paths of required binaries.
- * @param cluster the cluster from which to build the map
+ * Used to set a map of {service -> { component -> version}}. This is
+ * necessary when performing an upgrade to correct build paths of required
+ * binaries. This method will only set the version information for a component
+ * if:
+ * <ul>
+ * <li>The component advertises a version</li>
+ * <li>The repository for the component has been resolved and the version can
+ * be trusted</li>
+ * </ul>
+ *
+ * @param cluster
+ * the cluster from which to build the map
*/
public void setComponentVersions(Cluster cluster) throws AmbariException {
- Map<String, Map<String, String>> componentVersionMap = new HashMap<>();
-
- for (Service service : cluster.getServices().values()) {
- Map<String, String> componentMap = new HashMap<>();
-
- boolean shouldSet = false;
- for (ServiceComponent component : service.getServiceComponents().values()) {
- if (component.isVersionAdvertised()) {
- shouldSet = true;
- componentMap.put(component.getName(), component.getDesiredVersion());
- }
- }
-
- if (shouldSet) {
- componentVersionMap.put(service.getName(), componentMap);
- }
- }
-
- this.componentVersionMap = componentVersionMap;
+ componentVersionMap = cluster.getComponentVersionMap();
}
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
index a7c712b..5e42eb4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
@@ -448,6 +448,8 @@
jsonContent.put("clusterName", cluster.getClusterName());
jsonContent.put("serviceName", serviceName);
jsonContent.put("role", componentName);
+ jsonContent.put("componentVersionMap", cluster.getComponentVersionMap());
+
jsonConfigurations = gson.toJson(jsonContent);
File tmpDirectory = new File(TMP_PATH);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 15efcd2..7962148 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -678,4 +678,13 @@
*/
void addSuspendedUpgradeParameters(Map<String, String> commandParams,
Map<String, String> roleParams);
+
+ /**
+ * Gets a mapping of service to component/version for every installed
+ * component in the cluster which advertises a version and for which the
+ * repository has been resolved.
+ *
+ * @return a mapping of service to component version, or an empty map.
+ */
+ Map<String, Map<String, String>> getComponentVersionMap();
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index ce328f9..4a0e3a2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -2808,4 +2808,35 @@
// suspended goes in role params
roleParams.put(KeyNames.UPGRADE_SUSPENDED, Boolean.TRUE.toString().toLowerCase());
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, Map<String, String>> getComponentVersionMap() {
+ Map<String, Map<String, String>> componentVersionMap = new HashMap<>();
+
+ for (Service service : getServices().values()) {
+ Map<String, String> componentMap = new HashMap<>();
+ for (ServiceComponent component : service.getServiceComponents().values()) {
+ // skip components which don't advertise a version
+ if (!component.isVersionAdvertised()) {
+ continue;
+ }
+
+ // if the repo isn't resolved, then we can't trust the version
+ if (!component.getDesiredRepositoryVersion().isResolved()) {
+ continue;
+ }
+
+ componentMap.put(component.getName(), component.getDesiredVersion());
+ }
+
+ if (!componentMap.isEmpty()) {
+ componentVersionMap.put(service.getName(), componentMap);
+ }
+ }
+
+ return componentVersionMap;
+ }
}
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
index 9814145..6b5559c 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
@@ -21,6 +21,7 @@
import os
from resource_management.core import sudo
+from resource_management.core.logger import Logger
from resource_management.libraries.script.script import Script
from resource_management.libraries.resources.hdfs_resource import HdfsResource
from resource_management.libraries.functions import component_version
@@ -84,11 +85,45 @@
# It cannot be used during the initial Cluser Install because the version is not yet known.
version = default("/commandParams/version", None)
+def get_spark_version(service_name, component_name, yarn_version):
+ """
+ Attempts to calculate the correct version placeholder value for spark or spark2 based on
+ what is installed in the cluster. If Spark is not installed, then this value will need to be
+ that of YARN so it can still find the correct shuffle class.
+
+ On cluster installs, we have not yet calcualted any versions and all known values could be None.
+ This doesn't affect daemons, but it does affect client-only hosts where they will never receive
+ a start command after install. Therefore, this function will attempt to use stack-select as a
+ last resort to get a value value.
+ :param service_name: the service name (SPARK, SPARK2, etc)
+ :param component_name: the component name (SPARK_CLIENT, etc)
+ :param yarn_version: the default version of Yarn to use if no spark is installed
+ :return: a value for the version placeholder in shuffle classpath properties
+ """
+ # start off seeing if we need to populate a default value for YARN
+ if yarn_version is None:
+ yarn_version = component_version.get_component_repository_version(service_name = "YARN",
+ component_name = "YARN_CLIENT")
+
+ # now try to get the version of spark/spark2, defaulting to the version if YARN
+ shuffle_classpath_version = component_version.get_component_repository_version(service_name = service_name,
+ component_name = component_name, default_value = yarn_version)
+
+ # even with the default of using YARN's version, on an install this might be None since we haven't
+ # calculated the version of YARN yet - use stack_select as a last ditch effort
+ if shuffle_classpath_version is None:
+ try:
+ shuffle_classpath_version = stack_select.get_role_component_current_stack_version()
+ except:
+ Logger.exception("Unable to query for the correct shuffle classpath")
+
+ return shuffle_classpath_version
+
# these are used to render the classpath for picking up Spark classes
# in the event that spark is not installed, then we must default to the vesrion of YARN installed
# since it will still load classes from its own spark version
-spark_version = component_version.get_component_repository_version(service_name = "SPARK", component_name = "SPARK_CLIENT", default_value = version)
-spark2_version = component_version.get_component_repository_version(service_name = "SPARK2", component_name = "SPARK2_CLIENT", default_value = version)
+spark_version = get_spark_version("SPARK", "SPARK_CLIENT", version)
+spark2_version = get_spark_version("SPARK2", "SPARK2_CLIENT", version)
stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
@@ -541,5 +576,3 @@
# need this to capture cluster name from where ranger yarn plugin is enabled
cluster_name = config['clusterName']
-
-# ranger yarn plugin end section
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
index 2efc7a7..bd27945 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
@@ -291,7 +291,12 @@
Cluster cluster = clusters.getCluster(CLUSTER1);
StackId stackId = cluster.getDesiredStackVersion();
+
+ // set the repo version resolved state to verify that the version is not sent
RepositoryVersionEntity repositoryVersion = ormTestHelper.getOrCreateRepositoryVersion(stackId, "0.1-0000");
+ repositoryVersion.setResolved(false);
+ ormTestHelper.repositoryVersionDAO.merge(repositoryVersion);
+
Service service = cluster.getService("HDFS");
service.setDesiredRepositoryVersion(repositoryVersion);
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
index ec18a32..f2a6e91 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
@@ -564,6 +564,52 @@
Assert.assertTrue(command.getComponentVersionMap().containsKey("ZOOKEEPER"));
}
+ /**
+ * Tests that if a component's repository is not resolved, then the repo
+ * version map does not get populated.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAvailableServicesMapIsEmptyWhenRepositoriesNotResolved() throws Exception {
+
+ // set all repos to resolve=false to verify that we don't get a
+ // component version map
+ RepositoryVersionDAO repositoryVersionDAO = injector.getInstance(RepositoryVersionDAO.class);
+ List<RepositoryVersionEntity> repoVersions = repositoryVersionDAO.findAll();
+ for (RepositoryVersionEntity repoVersion : repoVersions) {
+ repoVersion.setResolved(false);
+ repositoryVersionDAO.merge(repoVersion);
+ }
+
+ Map<String, String> requestProperties = new HashMap<String, String>() {
+ {
+ put(REQUEST_CONTEXT_PROPERTY, "Refresh YARN Capacity Scheduler");
+ put("command", "REFRESHQUEUES");
+ }
+ };
+
+ ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", "REFRESHQUEUES",
+ new HashMap<String, String>() {
+ {
+ put("forceRefreshConfigTags", "capacity-scheduler");
+ }
+ }, false);
+
+ actionRequest.getResourceFilters().add(new RequestResourceFilter("YARN", "RESOURCEMANAGER",
+ Collections.singletonList("c1-c6401")));
+
+ EasyMock.replay(hostRoleCommand, actionManager, configHelper);
+
+ ambariManagementController.createAction(actionRequest, requestProperties);
+ Request request = requestCapture.getValue();
+ Stage stage = request.getStages().iterator().next();
+ List<ExecutionCommandWrapper> commands = stage.getExecutionCommands("c1-c6401");
+ ExecutionCommand command = commands.get(0).getExecutionCommand();
+
+ Assert.assertTrue(MapUtils.isEmpty(command.getComponentVersionMap()));
+ }
+
@Test
public void testCommandRepository() throws Exception {
Cluster cluster = clusters.getCluster("c1");
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
index a2406ea..1d92e7f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
@@ -674,6 +674,9 @@
repositoryVersion = repositoryVersionDAO.create(stackEntity, version,
String.valueOf(System.currentTimeMillis()) + uniqueCounter.incrementAndGet(), operatingSystems);
+
+ repositoryVersion.setResolved(true);
+ repositoryVersion = repositoryVersionDAO.merge(repositoryVersion);
} catch (Exception ex) {
LOG.error("Caught exception", ex);