SLIDER-48 switched to live node tracking of app state

git-svn-id: https://svn.apache.org/repos/asf/incubator/slider/trunk@1594266 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 2859f33..88dd704 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -26,6 +26,8 @@
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterDescriptionKeys;
+import org.apache.slider.api.ClusterNode;
 import org.apache.slider.api.OptionKeys;
 import org.apache.slider.api.StatusKeys;
 import org.apache.slider.common.SliderKeys;
@@ -70,6 +72,7 @@
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -94,7 +97,6 @@
   private static final String GLOBAL_CONFIG_TAG = "global";
   private AgentClientProvider clientProvider;
   private Map<String, ComponentInstanceState> componentStatuses = new HashMap<String, ComponentInstanceState>();
-  private Map<String, List<String>> roleHostMapping = new HashMap<String, List<String>>();
   private AtomicInteger taskId = new AtomicInteger(0);
   private Metainfo metainfo = null;
 
@@ -208,7 +210,6 @@
     }
 
     String label = getContainerLabel(container, role);
-    setRoleHostMapping(role, container.getNodeId().getHost());
     CommandLineBuilder operation = new CommandLineBuilder();
 
     operation.add(AgentKeys.PYTHON_EXE);
@@ -258,17 +259,10 @@
     getStateAccessor().getPublishedConfigurations().put(name, pubconf);
   }
 
-  protected void setRoleHostMapping(String role, String host) {
-    List<String> hosts = roleHostMapping.get(role);
-    if (hosts == null) {
-      hosts = new ArrayList<String>();
-    }
-    hosts.add(host);
-    roleHostMapping.put(role, hosts);
-  }
-
-  private List<String> getHostsForRole(String role) {
-    return roleHostMapping.get(role);
+  protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
+    return (Map<String, Map<String, ClusterNode>>)
+        stateAccessor.getClusterStatus().status.get(
+        ClusterDescriptionKeys.KEY_CLUSTER_LIVE);
   }
 
   private String getContainerLabel(Container container, String role) {
@@ -680,13 +674,23 @@
   }
 
   protected void addRoleRelatedTokens(Map<String, String> tokens) {
-    for (Map.Entry<String, List<String>> entry : roleHostMapping.entrySet()) {
+    for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
       String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
-      String hosts = StringUtils.join(",", entry.getValue());
+      String hosts = StringUtils.join(",", getHostsList(entry.getValue().values(), true));
       tokens.put("${" + tokenName + "}", hosts);
     }
   }
 
+  private Iterable<String> getHostsList(Collection<ClusterNode> values,
+                                        boolean hostOnly) {
+    List<String> hosts = new ArrayList<>();
+    for (ClusterNode cn : values) {
+      hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name);
+    }
+
+    return hosts;
+  }
+
   private void addDefaultGlobalConfig(Map<String, String> config) {
     config.put("app_log_dir", "${AGENT_LOG_ROOT}/app/log");
     config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run");
@@ -702,8 +706,10 @@
   }
 
   private void buildRoleHostDetails(Map<String, URL> details) {
-    for (Map.Entry<String, List<String>> entry : roleHostMapping.entrySet()) {
-      details.put(entry.getKey() + " Host(s): " + entry.getValue(),
+    for (Map.Entry<String, Map<String, ClusterNode>> entry :
+        getRoleClusterNodeMapping().entrySet()) {
+      details.put(entry.getKey() + " Host(s)/Container(s): " +
+                  getHostsList(entry.getValue().values(), false),
                   null);
     }
   }
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index e758b04..9195de9 100644
--- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -22,11 +22,15 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.ClusterDescriptionKeys;
+import org.apache.slider.api.ClusterNode;
 import org.apache.slider.api.OptionKeys;
 import org.apache.slider.api.StatusKeys;
 import org.apache.slider.common.tools.SliderFileSystem;
@@ -43,6 +47,7 @@
 import org.apache.slider.server.appmaster.model.mock.MockContainerId;
 import org.apache.slider.server.appmaster.model.mock.MockFileSystem;
 import org.apache.slider.server.appmaster.model.mock.MockNodeId;
+import org.apache.slider.server.appmaster.state.AppState;
 import org.apache.slider.server.appmaster.state.StateAccessForProviders;
 import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus;
 import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat;
@@ -178,14 +183,39 @@
   @Test
   public void testRoleHostMapping() throws Exception {
     AgentProviderService aps = new AgentProviderService();
-    aps.setRoleHostMapping("FIRST_ROLE", "FIRST_HOST");
-    aps.setRoleHostMapping("SECOND_ROLE", "SECOND_HOST");
-    aps.setRoleHostMapping("SECOND_ROLE", "THIRD_HOST");
+    StateAccessForProviders appState = new AppState(null) {
+      @Override
+      public ClusterDescription getClusterStatus() {
+        ClusterDescription cd = new ClusterDescription();
+        cd.status = new HashMap<String,Object>();
+        Map<String, Map<String,ClusterNode>> roleMap = new HashMap<>();
+        ClusterNode cn1 = new ClusterNode(new MyContainerId(1));
+        cn1.host = "FIRST_HOST";
+        Map<String, ClusterNode> map1 = new HashMap<>();
+        map1.put("FIRST_CONTAINER", cn1);
+        ClusterNode cn2 = new ClusterNode(new MyContainerId(2));
+        cn2.host = "SECOND_HOST";
+        Map<String, ClusterNode> map2 = new HashMap<>();
+        map2.put("SECOND_CONTAINER", cn2);
+        ClusterNode cn3 = new ClusterNode(new MyContainerId(3));
+        cn3.host = "THIRD_HOST";
+        map2.put("THIRD_CONTAINER", cn3);
+
+        roleMap.put("FIRST_ROLE", map1);
+        roleMap.put("SECOND_ROLE", map2);
+
+        cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, roleMap);
+
+        return cd;
+      }
+    };
+
+    aps.setStateAccessor(appState);
     Map<String, String> tokens = new HashMap<String, String>();
     aps.addRoleRelatedTokens(tokens);
     TestCase.assertEquals(2, tokens.size());
     TestCase.assertEquals("FIRST_HOST", tokens.get("${FIRST_ROLE_HOST}"));
-    TestCase.assertEquals("SECOND_HOST,THIRD_HOST", tokens.get("${SECOND_ROLE_HOST}"));
+    TestCase.assertEquals("THIRD_HOST,SECOND_HOST", tokens.get("${SECOND_ROLE_HOST}"));
     aps.close();
   }
 
@@ -211,7 +241,10 @@
     doNothing().when(mockAps).publishComponentConfiguration(anyString(), anyString(), anyCollection());
     mockAps.processReturnedStatus(hb, componentStatus);
     assert componentStatus.getConfigReported() == true;
-    Mockito.verify(mockAps, Mockito.times(1)).publishComponentConfiguration(anyString(), anyString(), anyCollection());
+    Mockito.verify(mockAps, Mockito.times(1)).publishComponentConfiguration(
+        anyString(),
+        anyString(),
+        anyCollection());
   }
 
   @Test
@@ -302,4 +335,44 @@
     metainfo = new MetainfoParser().parse(metainfo_1);
     assert metainfo == null;
   }
+
+  private static class MyContainerId extends ContainerId {
+    int id;
+
+    private MyContainerId(int id) {
+      this.id = id;
+    }
+
+    @Override
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    protected void setApplicationAttemptId(ApplicationAttemptId applicationAttemptId) {
+      //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public int getId() {
+      return id;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    protected void setId(int i) {
+      //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    protected void build() {
+      //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public String toString() {
+      return "MyContainerId{" +
+             "id=" + id +
+             '}';
+    }
+  }
 }