AMBARI-16072. Stack Advisor issue when adding service to Kerberized cluster (rlevas)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
index dc4829d..51e782f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
@@ -403,7 +403,7 @@
 
     setAuthToLocalRules(kerberosDescriptor, cluster, kerberosDetails.getDefaultRealm(), configurations, kerberosConfigurations);
 
-    return applyStackAdvisorUpdates(cluster, services, configurations, kerberosConfigurations, propertiesToIgnore, kerberosEnabled);
+    return applyStackAdvisorUpdates(cluster, cluster.getServices().keySet(), configurations, kerberosConfigurations, propertiesToIgnore, kerberosEnabled);
   }
 
   public Map<String, Map<String, String>> applyStackAdvisorUpdates(Cluster cluster, Set<String> services,
@@ -479,6 +479,7 @@
           .forStack(stackVersion.getStackName(), stackVersion.getStackVersion())
           .forServices(new ArrayList<String>(services))
           .forHosts(hostNames)
+          .withComponentHostsMap(cluster.getServiceComponentHostMap(null, services))
           .withConfigurations(requestConfigurations)
           .ofType(StackAdvisorRequest.StackAdvisorRequestType.CONFIGURATIONS)
           .build();
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 88a60c8..6c25f5f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -81,6 +81,19 @@
   List<ServiceComponentHost> getServiceComponentHosts(String hostname);
 
   /**
+   * Gets a map of components to hosts they are installed on.
+   * <p>
+   * This may may be filtered by host and/or service by optionally providing a set of hostname
+   * and/or service names to use as a filter.  <code>null</code> for either filter indicates no
+   * filter (or all), an empty set indicates a complete filter (or none).
+   *
+   * @param hostNames
+   * @param serviceNames
+   * @return a map of (filtered) components to hosts
+   */
+  Map<String, Set<String>> getServiceComponentHostMap(Set<String> hostNames, Set<String> serviceNames);
+
+  /**
    * Get all ServiceComponentHosts for a given service and optional component
    *
    * If the component name is <code>null</code>, all components for the requested service will be returned.
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index dbff426..7fd6393 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -875,6 +875,43 @@
   }
 
   @Override
+  public Map<String, Set<String>> getServiceComponentHostMap(Set<String> hostNames, Set<String> serviceNames) {
+    Map<String, Set<String>> componentHostMap = new HashMap<String, Set<String>>();
+
+    Collection<Host> hosts = getHosts();
+
+    if(hosts != null) {
+      for (Host host : hosts) {
+        String hostname = host.getHostName();
+
+        // If this host is not filtered out, continue processing
+        if ((hostNames == null) || hostNames.contains(hostname)) {
+          List<ServiceComponentHost> serviceComponentHosts = getServiceComponentHosts(hostname);
+
+          if (serviceComponentHosts != null) {
+            for (ServiceComponentHost sch : serviceComponentHosts) {
+              // If the service for this ServiceComponentHost is not filtered out, continue processing
+              if ((serviceNames == null) || serviceNames.contains(sch.getServiceName())) {
+                String component = sch.getServiceComponentName();
+                Set<String> componentHosts = componentHostMap.get(component);
+
+                if (componentHosts == null) {
+                  componentHosts = new HashSet<String>();
+                  componentHostMap.put(component, componentHosts);
+                }
+
+                componentHosts.add(hostname);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    return componentHostMap;
+  }
+
+  @Override
   public List<ServiceComponentHost> getServiceComponentHosts(String serviceName, String componentName) {
     ArrayList<ServiceComponentHost> foundItems = new ArrayList<ServiceComponentHost>();
 
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
index feb11a1..1d2c92b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
@@ -2315,6 +2315,18 @@
           }
         });
 
+    Map<String, Service> services = new HashMap<String, Service>();
+    services.put("SERVICE1", service1);
+    services.put("SERVICE2", service2);
+    services.put("SERVICE3", service3);
+
+    Map<String, Set<String>> serviceComponentHostMap = new HashMap<String, Set<String>>();
+    serviceComponentHostMap.put("COMPONENT1A", Collections.singleton("hostA"));
+    serviceComponentHostMap.put("COMPONENT1B", new HashSet<String>(Arrays.asList("hostB", "hostC")));
+    serviceComponentHostMap.put("COMPONENT2A", Collections.singleton("hostA"));
+    serviceComponentHostMap.put("COMPONENT2B", new HashSet<String>(Arrays.asList("hostB", "hostC")));
+    serviceComponentHostMap.put("COMPONEN3A", Collections.singleton("hostA"));
+
     final Cluster cluster = createMock(Cluster.class);
     expect(cluster.getDesiredConfigByType("krb5-conf")).andReturn(krb5ConfConfig).atLeastOnce();
     expect(cluster.getDesiredConfigByType("kerberos-env")).andReturn(kerberosEnvConfig).atLeastOnce();
@@ -2322,13 +2334,8 @@
     expect(cluster.getCurrentStackVersion()).andReturn(new StackId("HDP", "2.2")).atLeastOnce();
     expect(cluster.getClusterName()).andReturn("c1").atLeastOnce();
     expect(cluster.getHosts()).andReturn(hosts).anyTimes();
-    expect(cluster.getServices()).andReturn(new HashMap<String, Service>() {
-      {
-        put("SERVICE1", service1);
-        put("SERVICE2", service2);
-        put("SERVICE3", service3);
-      }
-    }).anyTimes();
+    expect(cluster.getServices()).andReturn(services).anyTimes();
+    expect(cluster.getServiceComponentHostMap(null, services.keySet())).andReturn(serviceComponentHostMap).anyTimes();
     expect(cluster.isBluePrintDeployed()).andReturn(false).atLeastOnce();
 
     final Map<String, Map<String, String>> existingConfigurations = new HashMap<String, Map<String, String>>() {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
index bf3b376..4427e12 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
@@ -830,6 +830,229 @@
   }
 
   @Test
+  public void testGetServiceComponentHostMap() throws Exception {
+    createDefaultCluster();
+
+    Service s = serviceFactory.createNew(c1, "HDFS");
+    c1.addService(s);
+    s.persist();
+
+    ServiceComponent scNN = serviceComponentFactory.createNew(s, "NAMENODE");
+    s.addServiceComponent(scNN);
+    scNN.persist();
+    ServiceComponentHost schNNH1 = serviceComponentHostFactory.createNew(scNN, "h1");
+    scNN.addServiceComponentHost(schNNH1);
+    schNNH1.persist();
+
+    ServiceComponent scDN = serviceComponentFactory.createNew(s, "DATANODE");
+    s.addServiceComponent(scDN);
+    scDN.persist();
+    ServiceComponentHost scDNH1 = serviceComponentHostFactory.createNew(scDN, "h1");
+    scDN.addServiceComponentHost(scDNH1);
+    scDNH1.persist();
+    ServiceComponentHost scDNH2 = serviceComponentHostFactory.createNew(scDN, "h2");
+    scDN.addServiceComponentHost(scDNH2);
+    scDNH2.persist();
+
+    Map<String, Set<String>> componentHostMap;
+
+    componentHostMap = c1.getServiceComponentHostMap(null, null);
+    Assert.assertEquals(2, componentHostMap.size());
+
+    Assert.assertEquals(1, componentHostMap.get("NAMENODE").size());
+    Assert.assertTrue(componentHostMap.get("NAMENODE").contains("h1"));
+
+    Assert.assertEquals(2, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h1"));
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h2"));
+  }
+
+  @Test
+  public void testGetServiceComponentHostMap_ForService() throws Exception {
+    createDefaultCluster();
+
+    Service sfHDFS = serviceFactory.createNew(c1, "HDFS");
+    c1.addService(sfHDFS);
+    sfHDFS.persist();
+
+    Service sfMR = serviceFactory.createNew(c1, "MAPREDUCE");
+    c1.addService(sfMR);
+    sfMR.persist();
+
+    ServiceComponent scNN = serviceComponentFactory.createNew(sfHDFS, "NAMENODE");
+    sfHDFS.addServiceComponent(scNN);
+    scNN.persist();
+    ServiceComponentHost schNNH1 = serviceComponentHostFactory.createNew(scNN, "h1");
+    scNN.addServiceComponentHost(schNNH1);
+    schNNH1.persist();
+
+    ServiceComponent scDN = serviceComponentFactory.createNew(sfHDFS, "DATANODE");
+    sfHDFS.addServiceComponent(scDN);
+    scDN.persist();
+    ServiceComponentHost scDNH1 = serviceComponentHostFactory.createNew(scDN, "h1");
+    scDN.addServiceComponentHost(scDNH1);
+    scDNH1.persist();
+    ServiceComponentHost scDNH2 = serviceComponentHostFactory.createNew(scDN, "h2");
+    scDN.addServiceComponentHost(scDNH2);
+    scDNH2.persist();
+
+    ServiceComponent scJT = serviceComponentFactory.createNew(sfMR, "JOBTRACKER");
+    sfMR.addServiceComponent(scJT);
+    scJT.persist();
+    ServiceComponentHost schJTH1 = serviceComponentHostFactory.createNew(scJT, "h1");
+    scJT.addServiceComponentHost(schJTH1);
+    schJTH1.persist();
+
+    Map<String, Set<String>> componentHostMap;
+
+    componentHostMap = c1.getServiceComponentHostMap(null, Collections.singleton("HDFS"));
+    Assert.assertEquals(2, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("NAMENODE").size());
+    Assert.assertTrue(componentHostMap.get("NAMENODE").contains("h1"));
+    Assert.assertEquals(2, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h1"));
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h2"));
+
+    componentHostMap = c1.getServiceComponentHostMap(null, Collections.singleton("MAPREDUCE"));
+    Assert.assertEquals(1, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("JOBTRACKER").size());
+    Assert.assertTrue(componentHostMap.get("JOBTRACKER").contains("h1"));
+
+    componentHostMap = c1.getServiceComponentHostMap(null, new HashSet<String>(Arrays.asList("HDFS", "MAPREDUCE")));
+    Assert.assertEquals(3, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("NAMENODE").size());
+    Assert.assertTrue(componentHostMap.get("NAMENODE").contains("h1"));
+    Assert.assertEquals(2, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h1"));
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h2"));
+    Assert.assertEquals(1, componentHostMap.get("JOBTRACKER").size());
+    Assert.assertTrue(componentHostMap.get("JOBTRACKER").contains("h1"));
+
+    componentHostMap = c1.getServiceComponentHostMap(null, Collections.singleton("UNKNOWN"));
+    Assert.assertEquals(0, componentHostMap.size());
+  }
+
+  @Test
+  public void testGetServiceComponentHostMap_ForHost() throws Exception {
+    createDefaultCluster();
+
+    Service sfHDFS = serviceFactory.createNew(c1, "HDFS");
+    c1.addService(sfHDFS);
+    sfHDFS.persist();
+
+    Service sfMR = serviceFactory.createNew(c1, "MAPREDUCE");
+    c1.addService(sfMR);
+    sfMR.persist();
+
+    ServiceComponent scNN = serviceComponentFactory.createNew(sfHDFS, "NAMENODE");
+    sfHDFS.addServiceComponent(scNN);
+    scNN.persist();
+    ServiceComponentHost schNNH1 = serviceComponentHostFactory.createNew(scNN, "h1");
+    scNN.addServiceComponentHost(schNNH1);
+    schNNH1.persist();
+
+    ServiceComponent scDN = serviceComponentFactory.createNew(sfHDFS, "DATANODE");
+    sfHDFS.addServiceComponent(scDN);
+    scDN.persist();
+    ServiceComponentHost scDNH1 = serviceComponentHostFactory.createNew(scDN, "h1");
+    scDN.addServiceComponentHost(scDNH1);
+    scDNH1.persist();
+    ServiceComponentHost scDNH2 = serviceComponentHostFactory.createNew(scDN, "h2");
+    scDN.addServiceComponentHost(scDNH2);
+    scDNH2.persist();
+
+    ServiceComponent scJT = serviceComponentFactory.createNew(sfMR, "JOBTRACKER");
+    sfMR.addServiceComponent(scJT);
+    scJT.persist();
+    ServiceComponentHost schJTH1 = serviceComponentHostFactory.createNew(scJT, "h1");
+    scJT.addServiceComponentHost(schJTH1);
+    schJTH1.persist();
+
+    Map<String, Set<String>> componentHostMap;
+
+    componentHostMap = c1.getServiceComponentHostMap(Collections.singleton("h1"), null);
+    Assert.assertEquals(3, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("NAMENODE").size());
+    Assert.assertTrue(componentHostMap.get("NAMENODE").contains("h1"));
+    Assert.assertEquals(1, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h1"));
+    Assert.assertEquals(1, componentHostMap.get("JOBTRACKER").size());
+    Assert.assertTrue(componentHostMap.get("JOBTRACKER").contains("h1"));
+
+    componentHostMap = c1.getServiceComponentHostMap(Collections.singleton("h2"), null);
+    Assert.assertEquals(1, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h2"));
+
+    componentHostMap = c1.getServiceComponentHostMap(new HashSet<String>(Arrays.asList("h1", "h2", "h3")), null);
+    Assert.assertEquals(3, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("NAMENODE").size());
+    Assert.assertTrue(componentHostMap.get("NAMENODE").contains("h1"));
+    Assert.assertEquals(2, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h1"));
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h2"));
+    Assert.assertEquals(1, componentHostMap.get("JOBTRACKER").size());
+    Assert.assertTrue(componentHostMap.get("JOBTRACKER").contains("h1"));
+
+    componentHostMap = c1.getServiceComponentHostMap(Collections.singleton("unknown"), null);
+    Assert.assertEquals(0, componentHostMap.size());
+  }
+
+  @Test
+  public void testGetServiceComponentHostMap_ForHostAndService() throws Exception {
+    createDefaultCluster();
+
+    Service sfHDFS = serviceFactory.createNew(c1, "HDFS");
+    c1.addService(sfHDFS);
+    sfHDFS.persist();
+
+    Service sfMR = serviceFactory.createNew(c1, "MAPREDUCE");
+    c1.addService(sfMR);
+    sfMR.persist();
+
+    ServiceComponent scNN = serviceComponentFactory.createNew(sfHDFS, "NAMENODE");
+    sfHDFS.addServiceComponent(scNN);
+    scNN.persist();
+    ServiceComponentHost schNNH1 = serviceComponentHostFactory.createNew(scNN, "h1");
+    scNN.addServiceComponentHost(schNNH1);
+    schNNH1.persist();
+
+    ServiceComponent scDN = serviceComponentFactory.createNew(sfHDFS, "DATANODE");
+    sfHDFS.addServiceComponent(scDN);
+    scDN.persist();
+    ServiceComponentHost scDNH1 = serviceComponentHostFactory.createNew(scDN, "h1");
+    scDN.addServiceComponentHost(scDNH1);
+    scDNH1.persist();
+    ServiceComponentHost scDNH2 = serviceComponentHostFactory.createNew(scDN, "h2");
+    scDN.addServiceComponentHost(scDNH2);
+    scDNH2.persist();
+
+    ServiceComponent scJT = serviceComponentFactory.createNew(sfMR, "JOBTRACKER");
+    sfMR.addServiceComponent(scJT);
+    scJT.persist();
+    ServiceComponentHost schJTH1 = serviceComponentHostFactory.createNew(scJT, "h1");
+    scJT.addServiceComponentHost(schJTH1);
+    schJTH1.persist();
+
+    Map<String, Set<String>> componentHostMap;
+
+    componentHostMap = c1.getServiceComponentHostMap(Collections.singleton("h1"), Collections.singleton("HDFS"));
+    Assert.assertEquals(2, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h1"));
+    Assert.assertEquals(1, componentHostMap.get("NAMENODE").size());
+    Assert.assertTrue(componentHostMap.get("NAMENODE").contains("h1"));
+
+    componentHostMap = c1.getServiceComponentHostMap(Collections.singleton("h2"), Collections.singleton("HDFS"));
+    Assert.assertEquals(1, componentHostMap.size());
+    Assert.assertEquals(1, componentHostMap.get("DATANODE").size());
+    Assert.assertTrue(componentHostMap.get("DATANODE").contains("h2"));
+
+    componentHostMap = c1.getServiceComponentHostMap(Collections.singleton("h3"), Collections.singleton("HDFS"));
+    Assert.assertEquals(0, componentHostMap.size());
+  }
+
+  @Test
   public void testGetAndSetConfigs() throws Exception {
     createDefaultCluster();