SLIDER 53. Add ability to export arbitrary templetized data
git-svn-id: https://svn.apache.org/repos/asf/incubator/slider/trunk@1594446 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/app-packages/hbase-v0_96/appConfig.json b/app-packages/hbase-v0_96/appConfig.json
index bd844b0..32d128f 100644
--- a/app-packages/hbase-v0_96/appConfig.json
+++ b/app-packages/hbase-v0_96/appConfig.json
@@ -46,7 +46,7 @@
"site.hbase-site.hbase.client.scanner.caching": "100",
"site.hbase-site.hbase.zookeeper.useMulti": "true",
"site.hbase-site.hbase.regionserver.info.port": "0",
- "site.hbase-site.hbase.master.info.port": "60010",
+ "site.hbase-site.hbase.master.info.port": "${HBASE_MASTER.ALLOCATED_PORT}",
"site.hbase-site.hbase.regionserver.port": "0",
"site.core-site.fs.defaultFS": "${NN_URI}",
"site.hdfs-site.dfs.namenode.https-address": "${NN_HOST}:50470",
diff --git a/app-packages/hbase-v0_96/metainfo.xml b/app-packages/hbase-v0_96/metainfo.xml
index ad440f1..6bca4ef 100644
--- a/app-packages/hbase-v0_96/metainfo.xml
+++ b/app-packages/hbase-v0_96/metainfo.xml
@@ -27,6 +27,21 @@
<version>0.96.0.2.1.1</version>
<type>YARN-APP</type>
<minHadoopVersion>2.1.0</minHadoopVersion>
+ <exportGroups>
+ <exportGroup>
+ <name>QuickLinks</name>
+ <exports>
+ <export>
+ <name>JMX_Endpoint</name>
+ <value>http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/jmx</value>
+ </export>
+ <export>
+ <name>Master_Status</name>
+ <value>http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/master-status</value>
+ </export>
+ </exports>
+ </exportGroup>
+ </exportGroups>
<components>
<component>
<name>HBASE_MASTER</name>
diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
index 021a1a8..b4f6c66 100644
--- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
+++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
@@ -23,6 +23,7 @@
import json
import pprint
import sys
+import socket
from AgentConfig import AgentConfig
from AgentException import AgentException
from PythonExecutor import PythonExecutor
@@ -217,7 +218,9 @@
"""
def finalize_command(self, command, store_config):
-
+ component = command['componentName']
+ allocated_port_format = "${{{0}.ALLOCATED_PORT}}"
+ port_allocation_req = allocated_port_format.format(component)
if 'configurations' in command:
for key in command['configurations']:
if len(command['configurations'][key]) > 0:
@@ -227,6 +230,10 @@
self.config.getWorkRootPath())
value = value.replace("${AGENT_LOG_ROOT}",
self.config.getLogPath())
+ if port_allocation_req in value:
+ port = self.allocate_port()
+ value = value.replace(port_allocation_req, str(port))
+ logger.info("Allocated port " + str(port) + " for " + port_allocation_req)
command['configurations'][key][k] = value
pass
pass
@@ -240,4 +247,22 @@
pass
+ def allocate_port(self):
+ MAX_ATTEMPT = 5
+ iter = 0
+ port = -1
+ while iter < MAX_ATTEMPT:
+ iter = iter + 1
+ try:
+ sock = socket.socket()
+ sock.bind(('', 0))
+ port = sock.getsockname()[1]
+ except Exception, err:
+ logger.info("Encountered error while trying to opening socket - " + str(err))
+ finally:
+ sock.close()
+ pass
+ logger.info("Allocated dynamic port: " + str(port))
+ return port
+
diff --git a/slider-agent/src/test/python/agent/TestActionQueue.py b/slider-agent/src/test/python/agent/TestActionQueue.py
index d52a6db..ed394fe 100644
--- a/slider-agent/src/test/python/agent/TestActionQueue.py
+++ b/slider-agent/src/test/python/agent/TestActionQueue.py
@@ -53,6 +53,7 @@
datanode_install_command = {
'commandType': 'EXECUTION_COMMAND',
'role': u'HBASE_MASTER',
+ "componentName": "HBASE_MASTER",
'roleCommand': u'INSTALL',
'commandId': '1-1',
'taskId': 3,
@@ -71,6 +72,7 @@
'roleCommand': u'INSTALL',
'commandId': '1-1',
'taskId': 7,
+ "componentName": "HBASE_MASTER",
'clusterName': u'cc',
'serviceName': u'HDFS',
}
diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
index 2abbd58..616bea6 100644
--- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
+++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
@@ -53,6 +53,7 @@
hostname_mock.return_value = "test.hst"
command = {
'commandType': 'EXECUTION_COMMAND',
+ 'componentName': 'NAMENODE',
'role': u'DATANODE',
'roleCommand': u'INSTALL',
'commandId': '1-1',
@@ -180,6 +181,7 @@
isfile_mock.return_value = True
command = {
'role': 'REGION_SERVER',
+ 'componentName': 'REGION_SERVER',
'hostLevelParams': {
'stack_name': 'HDP',
'stack_version': '2.0.7',
@@ -289,7 +291,51 @@
status = orchestrator.requestComponentStatus(status_command)
self.assertEqual(CustomServiceOrchestrator.DEAD_STATUS, status['exitcode'])
- def test_finalize_command(self):
+ @patch.object(CustomServiceOrchestrator, "allocate_port")
+ def test_finalize_command(self, mock_allocate_port):
+ dummy_controller = MagicMock()
+ tempdir = tempfile.gettempdir()
+ tempWorkDir = tempdir + "W"
+ config = MagicMock()
+ config.get.return_value = "something"
+ config.getResolvedPath.return_value = tempdir
+ config.getWorkRootPath.return_value = tempWorkDir
+ config.getLogPath.return_value = tempdir
+ mock_allocate_port.return_value = "10023"
+
+ orchestrator = CustomServiceOrchestrator(config, dummy_controller)
+ command = {}
+ command['componentName'] = "HBASE_MASTER"
+ command['configurations'] = {}
+ command['configurations']['hbase-site'] = {}
+ command['configurations']['hbase-site']['a'] = 'b'
+ command['configurations']['hbase-site']['work_root'] = "${AGENT_WORK_ROOT}"
+ command['configurations']['hbase-site']['log_root'] = "${AGENT_LOG_ROOT}/log"
+ command['configurations']['hbase-site']['blog_root'] = "/b/${AGENT_LOG_ROOT}/log"
+ command['configurations']['oozie-site'] = {}
+ command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}"
+ command['configurations']['oozie-site']['a_port'] = "${HBASE_MASTER.ALLOCATED_PORT}"
+
+ orchestrator.finalize_command(command, False)
+ self.assertEqual(command['configurations']['hbase-site']['work_root'], tempWorkDir)
+ self.assertEqual(command['configurations']['oozie-site']['log_root'], tempdir)
+ self.assertEqual(command['configurations']['oozie-site']['a_port'], "10023")
+ self.assertEqual(orchestrator.applied_configs, {})
+
+ command['configurations']['hbase-site']['work_root'] = "${AGENT_WORK_ROOT}"
+ command['configurations']['hbase-site']['log_root'] = "${AGENT_LOG_ROOT}/log"
+ command['configurations']['hbase-site']['blog_root'] = "/b/${AGENT_LOG_ROOT}/log"
+ command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}"
+ command['configurations']['oozie-site']['b_port'] = "${HBASE_REGIONSERVER.ALLOCATED_PORT}"
+
+ orchestrator.finalize_command(command, True)
+ self.assertEqual(command['configurations']['hbase-site']['log_root'], tempdir + "/log")
+ self.assertEqual(command['configurations']['hbase-site']['blog_root'], "/b/" + tempdir + "/log")
+ self.assertEqual(command['configurations']['oozie-site']['b_port'], "${HBASE_REGIONSERVER.ALLOCATED_PORT}")
+ self.assertEqual(orchestrator.applied_configs, command['configurations'])
+
+
+ def test_port_allocation(self):
dummy_controller = MagicMock()
tempdir = tempfile.gettempdir()
tempWorkDir = tempdir + "W"
@@ -300,30 +346,9 @@
config.getLogPath.return_value = tempdir
orchestrator = CustomServiceOrchestrator(config, dummy_controller)
- command = {}
- command['configurations'] = {}
- command['configurations']['hbase-site'] = {}
- command['configurations']['hbase-site']['a'] = 'b'
- command['configurations']['hbase-site']['work_root'] = "${AGENT_WORK_ROOT}"
- command['configurations']['hbase-site']['log_root'] = "${AGENT_LOG_ROOT}/log"
- command['configurations']['hbase-site']['blog_root'] = "/b/${AGENT_LOG_ROOT}/log"
- command['configurations']['oozie-site'] = {}
- command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}"
-
- orchestrator.finalize_command(command, False)
- self.assertEqual(command['configurations']['hbase-site']['work_root'], tempWorkDir)
- self.assertEqual(command['configurations']['oozie-site']['log_root'], tempdir)
- self.assertEqual(orchestrator.applied_configs, {})
-
- command['configurations']['hbase-site']['work_root'] = "${AGENT_WORK_ROOT}"
- command['configurations']['hbase-site']['log_root'] = "${AGENT_LOG_ROOT}/log"
- command['configurations']['hbase-site']['blog_root'] = "/b/${AGENT_LOG_ROOT}/log"
- command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}"
-
- orchestrator.finalize_command(command, True)
- self.assertEqual(command['configurations']['hbase-site']['log_root'], tempdir + "/log")
- self.assertEqual(command['configurations']['hbase-site']['blog_root'], "/b/" + tempdir + "/log")
- self.assertEqual(orchestrator.applied_configs, command['configurations'])
+ port = orchestrator.allocate_port()
+ self.assertFalse(port == -1)
+ self.assertTrue(port > 0)
def tearDown(self):
# enable stdout
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 88dd704..4dbd7a6 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -23,11 +23,10 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.ClusterDescriptionKeys;
-import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.StatusKeys;
import org.apache.slider.common.SliderKeys;
@@ -46,9 +45,12 @@
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.providers.agent.application.metadata.Component;
+import org.apache.slider.providers.agent.application.metadata.Export;
+import org.apache.slider.providers.agent.application.metadata.ExportGroup;
import org.apache.slider.providers.agent.application.metadata.Metainfo;
import org.apache.slider.providers.agent.application.metadata.MetainfoParser;
import org.apache.slider.providers.agent.application.metadata.Service;
+import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
@@ -72,7 +74,6 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -259,10 +260,22 @@
getStateAccessor().getPublishedConfigurations().put(name, pubconf);
}
- protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
- return (Map<String, Map<String, ClusterNode>>)
- stateAccessor.getClusterStatus().status.get(
- ClusterDescriptionKeys.KEY_CLUSTER_LIVE);
+ protected Map<String, Map<String, String>> getRoleClusterNodeMapping() {
+ StateAccessForProviders accessor = getStateAccessor();
+ assert accessor.isApplicationLive();
+ Map<ContainerId, RoleInstance> liveNodes = accessor.getLiveNodes();
+ Map<String, Map<String, String>> retVal = new HashMap<>();
+ for (ContainerId cid : liveNodes.keySet()) {
+ RoleInstance ri = liveNodes.get(cid);
+ Map<String, String> containerMap = retVal.get(ri.role);
+ if (containerMap == null) {
+ containerMap = new HashMap<>();
+ retVal.put(ri.role, containerMap);
+ }
+ containerMap.put(cid.toString(), ri.host);
+ }
+
+ return retVal;
}
private String getContainerLabel(Container container, String role) {
@@ -373,8 +386,12 @@
return response;
}
+ Boolean isMaster = isMaster(roleName);
ComponentInstanceState componentStatus = componentStatuses.get(label);
- processReturnedStatus(heartBeat, componentStatus);
+ // TODO: Currently only process configurations from Master
+ if (isMaster) {
+ processReturnedStatus(heartBeat, componentStatus);
+ }
List<CommandReport> reports = heartBeat.getReports();
if (reports != null && !reports.isEmpty()) {
@@ -407,7 +424,6 @@
}
}
// if there is no outstanding command then retrieve config
- Boolean isMaster = isMaster(roleName);
if (isMaster && componentStatus.getState() == State.STARTED
&& command == Command.NOP) {
if (!componentStatus.getConfigReported()) {
@@ -430,9 +446,50 @@
log.info("Status report: " + status.toString());
if (status.getConfigs() != null) {
for (String key : status.getConfigs().keySet()) {
- if (!key.equals(GLOBAL_CONFIG_TAG)) {
+ Map<String, String> configs = status.getConfigs().get(key);
+ publishComponentConfiguration(key, key, configs.entrySet());
+ }
+
+ Service service = getMetainfo().getServices().get(0);
+ List<ExportGroup> exportGroups = service.getExportGroups();
+ if (exportGroups != null && exportGroups.size() > 0) {
+
+ String configKeyFormat = "${site.%s.%s}";
+ String hostKeyFormat = "${%s_HOST}";
+
+ // publish export groups if any
+ Map<String, String> replaceTokens = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : getRoleClusterNodeMapping().entrySet()) {
+ String hostName = getHostsList(entry.getValue(), true).iterator().next();
+ replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName);
+ }
+
+ for (String key : status.getConfigs().keySet()) {
Map<String, String> configs = status.getConfigs().get(key);
- publishComponentConfiguration(key, key, configs.entrySet());
+ for (String configKey : configs.keySet()) {
+ String lookupKey = String.format(configKeyFormat, key, configKey);
+ replaceTokens.put(lookupKey, configs.get(configKey));
+ }
+ }
+
+ for (ExportGroup exportGroup : exportGroups) {
+ List<Export> exports = exportGroup.getExports();
+ if (exports != null && exports.size() > 0) {
+ String exportGroupName = exportGroup.getName();
+ Map<String, String> map = new HashMap<>();
+ for (Export export : exports) {
+ String value = export.getValue();
+ // replace host names
+ for (String token : replaceTokens.keySet()) {
+ if (value.contains(token)) {
+ value = value.replace(token, replaceTokens.get(token));
+ }
+ }
+ map.put(export.getName(), value);
+ log.info("Preparing to publish. Key {} and Value {}", export.getName(), value);
+ }
+ publishComponentConfiguration(exportGroupName, exportGroupName, map.entrySet());
+ }
}
}
componentStatus.setConfigReported(true);
@@ -457,8 +514,7 @@
return scriptPath;
}
- protected Boolean isMaster(String roleName) throws SliderException {
- String scriptPath = null;
+ protected Boolean isMaster(String roleName) {
List<Service> services = getMetainfo().getServices();
if (services.size() != 1) {
log.error("Malformed app definition: Expect only one service in the metainfo.xml");
@@ -474,7 +530,7 @@
}
}
}
- throw new SliderException(String.format("Rolename %s not found in metainfo.xml", roleName));
+ return false;
}
private String getRoleName(String label) {
@@ -674,18 +730,17 @@
}
protected void addRoleRelatedTokens(Map<String, String> tokens) {
- for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
+ for (Map.Entry<String, Map<String, String>> entry : getRoleClusterNodeMapping().entrySet()) {
String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
- String hosts = StringUtils.join(",", getHostsList(entry.getValue().values(), true));
+ String hosts = StringUtils.join(",", getHostsList(entry.getValue(), true));
tokens.put("${" + tokenName + "}", hosts);
}
}
- private Iterable<String> getHostsList(Collection<ClusterNode> values,
- boolean hostOnly) {
+ private Iterable<String> getHostsList(Map<String, String> values, boolean hostOnly) {
List<String> hosts = new ArrayList<>();
- for (ClusterNode cn : values) {
- hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name);
+ for (Map.Entry<String, String> entry : values.entrySet()) {
+ hosts.add(hostOnly ? entry.getValue() : entry.getValue() + "/" + entry.getKey());
}
return hosts;
@@ -706,10 +761,10 @@
}
private void buildRoleHostDetails(Map<String, URL> details) {
- for (Map.Entry<String, Map<String, ClusterNode>> entry :
+ for (Map.Entry<String, Map<String, String>> entry :
getRoleClusterNodeMapping().entrySet()) {
details.put(entry.getKey() + " Host(s)/Container(s): " +
- getHostsList(entry.getValue().values(), false),
+ getHostsList(entry.getValue(), false),
null);
}
}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Export.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Export.java
new file mode 100644
index 0000000..17326a3
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Export.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent.application.metadata;
+
+/**
+ *
+ */
+public class Export {
+ String name;
+ String value;
+
+ public Export() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("{");
+ sb.append(",\n\"name\": ").append(name);
+ sb.append(",\n\"value\": ").append(value);
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/ExportGroup.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/ExportGroup.java
new file mode 100644
index 0000000..d2e20a4
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/ExportGroup.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent.application.metadata;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class ExportGroup {
+ String name;
+ List<Export> exports;
+
+ public ExportGroup() {
+ exports = new ArrayList<Export>();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void addExport(Export export) {
+ exports.add(export);
+ }
+
+ public List<Export> getExports() {
+ return exports;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("{");
+ sb.append(",\n\"name\": ").append(name);
+ sb.append(",\n\"exports\" : {");
+ for (Export export : exports) {
+ sb.append("\n").append(export);
+ }
+ sb.append("\n},");
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
index 370366b..8e2dc23 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
@@ -39,6 +39,14 @@
digester.addBeanPropertySetter("*/service/comment");
digester.addBeanPropertySetter("*/service/version");
+ digester.addObjectCreate("*/exportGroup", ExportGroup.class);
+ digester.addBeanPropertySetter("*/exportGroup/name");
+ digester.addObjectCreate("*/export", Export.class);
+ digester.addBeanPropertySetter("*/export/name");
+ digester.addBeanPropertySetter("*/export/value");
+ digester.addSetNext("*/export", "addExport");
+ digester.addSetNext("*/exportGroup", "addExportGroup");
+
digester.addObjectCreate("*/component", Component.class);
digester.addBeanPropertySetter("*/component/name");
digester.addBeanPropertySetter("*/component/category");
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java
index 2067bee..dd201bd 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java
@@ -27,10 +27,12 @@
String comment;
String version;
List<Component> components;
+ List<ExportGroup> exportGroups;
List<OSSpecific> osSpecifics;
ConfigurationDependencies configDependencies;
public Service() {
+ exportGroups = new ArrayList<ExportGroup>();
components = new ArrayList<Component>();
osSpecifics = new ArrayList<OSSpecific>();
}
@@ -75,6 +77,14 @@
return components;
}
+ public void addExportGroup(ExportGroup exportGroup) {
+ exportGroups.add(exportGroup);
+ }
+
+ public List<ExportGroup> getExportGroups() {
+ return exportGroups;
+ }
+
public void addOSSpecific(OSSpecific osSpecific) {
osSpecifics.add(osSpecific);
}
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index 9195de9..e017c02 100644
--- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -28,9 +28,11 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.ClusterDescriptionKeys;
-import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.StatusKeys;
import org.apache.slider.common.tools.SliderFileSystem;
@@ -41,6 +43,8 @@
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.providers.agent.application.metadata.Component;
+import org.apache.slider.providers.agent.application.metadata.Export;
+import org.apache.slider.providers.agent.application.metadata.ExportGroup;
import org.apache.slider.providers.agent.application.metadata.Metainfo;
import org.apache.slider.providers.agent.application.metadata.MetainfoParser;
import org.apache.slider.providers.agent.application.metadata.Service;
@@ -48,6 +52,7 @@
import org.apache.slider.server.appmaster.model.mock.MockFileSystem;
import org.apache.slider.server.appmaster.model.mock.MockNodeId;
import org.apache.slider.server.appmaster.state.AppState;
+import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus;
import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat;
@@ -56,6 +61,7 @@
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,19 +71,19 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
@@ -86,6 +92,68 @@
public class TestAgentProviderService {
protected static final Logger log =
LoggerFactory.getLogger(TestAgentProviderService.class);
+ private static final String metainfo_1_str = "<metainfo>\n"
+ + " <schemaVersion>2.0</schemaVersion>\n"
+ + " <services>\n"
+ + " <service>\n"
+ + " <name>HBASE</name>\n"
+ + " <comment>\n"
+ + " Apache HBase\n"
+ + " </comment>\n"
+ + " <version>0.96.0.2.1.1</version>\n"
+ + " <type>YARN-APP</type>\n"
+ + " <minHadoopVersion>2.1.0</minHadoopVersion>\n"
+ + " <exportGroups>\n"
+ + " <exportGroup>\n"
+ + " <name>QuickLinks</name>\n"
+ + " <exports>\n"
+ + " <export>\n"
+ + " <name>JMX_Endpoint</name>\n"
+ + " <value>http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/jmx</value>\n"
+ + " </export>\n"
+ + " <export>\n"
+ + " <name>Master_Status</name>\n"
+ + " <value>http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/master-status</value>\n"
+ + " </export>\n"
+ + " </exports>\n"
+ + " </exportGroup>\n"
+ + " </exportGroups>\n"
+ + " <components>\n"
+ + " <component>\n"
+ + " <name>HBASE_MASTER</name>\n"
+ + " <category>MASTER</category>\n"
+ + " <minInstanceCount>1</minInstanceCount>\n"
+ + " <maxInstanceCount>2</maxInstanceCount>\n"
+ + " <commandScript>\n"
+ + " <script>scripts/hbase_master.py</script>\n"
+ + " <scriptType>PYTHON</scriptType>\n"
+ + " <timeout>600</timeout>\n"
+ + " </commandScript>\n"
+ + " </component>\n"
+ + " <component>\n"
+ + " <name>HBASE_REGIONSERVER</name>\n"
+ + " <category>SLAVE</category>\n"
+ + " <minInstanceCount>1</minInstanceCount>\n"
+ + " <commandScript>\n"
+ + " <script>scripts/hbase_regionserver.py</script>\n"
+ + " <scriptType>PYTHON</scriptType>\n"
+ + " </commandScript>\n"
+ + " </component>\n"
+ + " </components>\n"
+ + " <osSpecifics>\n"
+ + " <osSpecific>\n"
+ + " <osType>any</osType>\n"
+ + " <packages>\n"
+ + " <package>\n"
+ + " <type>tarball</type>\n"
+ + " <name>files/hbase-0.96.1-hadoop2-bin.tar.gz</name>\n"
+ + " </package>\n"
+ + " </packages>\n"
+ + " </osSpecific>\n"
+ + " </osSpecifics>\n"
+ + " </service>\n"
+ + " </services>\n"
+ + "</metainfo>";
@Test
public void testRegistration() throws IOException {
@@ -185,118 +253,103 @@
AgentProviderService aps = new AgentProviderService();
StateAccessForProviders appState = new AppState(null) {
@Override
- public ClusterDescription getClusterStatus() {
- ClusterDescription cd = new ClusterDescription();
- cd.status = new HashMap<String,Object>();
- Map<String, Map<String,ClusterNode>> roleMap = new HashMap<>();
- ClusterNode cn1 = new ClusterNode(new MyContainerId(1));
- cn1.host = "FIRST_HOST";
- Map<String, ClusterNode> map1 = new HashMap<>();
- map1.put("FIRST_CONTAINER", cn1);
- ClusterNode cn2 = new ClusterNode(new MyContainerId(2));
- cn2.host = "SECOND_HOST";
- Map<String, ClusterNode> map2 = new HashMap<>();
- map2.put("SECOND_CONTAINER", cn2);
- ClusterNode cn3 = new ClusterNode(new MyContainerId(3));
- cn3.host = "THIRD_HOST";
- map2.put("THIRD_CONTAINER", cn3);
+ public Map<ContainerId, RoleInstance> getLiveNodes() {
+ Map<ContainerId, RoleInstance> retVal = new HashMap<>();
+ ContainerId cid = new MyContainerId(1);
+ Container container = new MyContainer();
+ container.setId(cid);
+ RoleInstance ri = new RoleInstance(container);
+ ri.host = "FIRST_HOST";
+ ri.role = "FIRST_ROLE";
+ retVal.put(cid, ri);
- roleMap.put("FIRST_ROLE", map1);
- roleMap.put("SECOND_ROLE", map2);
+ cid = new MyContainerId(2);
+ container = new MyContainer();
+ container.setId(cid);
+ ri = new RoleInstance(container);
+ ri.host = "SECOND_HOST";
+ ri.role = "SECOND_ROLE";
+ retVal.put(cid, ri);
- cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, roleMap);
+ cid = new MyContainerId(3);
+ container = new MyContainer();
+ container.setId(cid);
+ ri = new RoleInstance(container);
+ ri.host = "THIRD_HOST";
+ ri.role = "SECOND_ROLE";
+ retVal.put(cid, ri);
- return cd;
+ return retVal;
+ }
+
+ @Override
+ public boolean isApplicationLive() {
+ return true;
}
};
aps.setStateAccessor(appState);
- Map<String, String> tokens = new HashMap<String, String>();
+ Map<String, String> tokens = new HashMap<>();
aps.addRoleRelatedTokens(tokens);
TestCase.assertEquals(2, tokens.size());
TestCase.assertEquals("FIRST_HOST", tokens.get("${FIRST_ROLE_HOST}"));
- TestCase.assertEquals("THIRD_HOST,SECOND_HOST", tokens.get("${SECOND_ROLE_HOST}"));
+ TestCase.assertEquals("SECOND_HOST,THIRD_HOST", tokens.get("${SECOND_ROLE_HOST}"));
aps.close();
}
@Test
public void testProcessConfig() throws Exception {
+ InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
+ Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
+ assert metainfo.getServices().size() == 1;
AgentProviderService aps = new AgentProviderService();
HeartBeat hb = new HeartBeat();
ComponentStatus status = new ComponentStatus();
status.setClusterName("test");
status.setComponentName("HBASE_MASTER");
status.setRoleCommand("GET_CONFIG");
- Map<String, String> hbaseSite = new HashMap<String, String>();
- hbaseSite.put("a", "b");
+ Map<String, String> hbaseSite = new HashMap<>();
+ hbaseSite.put("hbase.master.info.port", "60012");
hbaseSite.put("c", "d");
- Map<String, Map<String, String>> configs = new HashMap<String,Map<String, String>>();
+ Map<String, Map<String, String>> configs = new HashMap<>();
configs.put("hbase-site", hbaseSite);
configs.put("global", hbaseSite);
status.setConfigs(configs);
- hb.setComponentStatus(new ArrayList<ComponentStatus>(Arrays.asList(status)));
+ hb.setComponentStatus(new ArrayList<>(Arrays.asList(status)));
+
+ Map<String, Map<String, String>> roleClusterNodeMap = new HashMap<>();
+ Map<String, String> container = new HashMap<>();
+ container.put("cid1", "HOST1");
+ roleClusterNodeMap.put("HBASE_MASTER", container);
ComponentInstanceState componentStatus = new ComponentInstanceState("HBASE_MASTER", "aid", "cid");
AgentProviderService mockAps = Mockito.spy(aps);
doNothing().when(mockAps).publishComponentConfiguration(anyString(), anyString(), anyCollection());
+ doReturn(metainfo).when(mockAps).getMetainfo();
+ doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping();
+
mockAps.processReturnedStatus(hb, componentStatus);
assert componentStatus.getConfigReported() == true;
- Mockito.verify(mockAps, Mockito.times(1)).publishComponentConfiguration(
+ ArgumentCaptor<Collection> commandCaptor = ArgumentCaptor.
+ forClass(Collection.class);
+ Mockito.verify(mockAps, Mockito.times(3)).publishComponentConfiguration(
anyString(),
anyString(),
- anyCollection());
+ commandCaptor.capture());
+ assert commandCaptor.getAllValues().size() == 3;
+ for (Collection coll : commandCaptor.getAllValues()) {
+ Set<Map.Entry<String, String>> entrySet = (Set<Map.Entry<String, String>>) coll;
+ for (Map.Entry entry : entrySet) {
+ log.info("{}:{}", entry.getKey(), entry.getValue().toString());
+ if (entry.getKey().equals("JMX_Endpoint")) {
+ assert entry.getValue().toString().equals("http://HOST1:60012/jmx");
+ }
+ }
+ }
}
@Test
public void testMetainfoParsing() throws Exception {
- String metainfo_1_str = "<metainfo>\n"
- + " <schemaVersion>2.0</schemaVersion>\n"
- + " <services>\n"
- + " <service>\n"
- + " <name>HBASE</name>\n"
- + " <comment>\n"
- + " Apache HBase\n"
- + " </comment>\n"
- + " <version>0.96.0.2.1.1</version>\n"
- + " <type>YARN-APP</type>\n"
- + " <minHadoopVersion>2.1.0</minHadoopVersion>\n"
- + " <components>\n"
- + " <component>\n"
- + " <name>HBASE_MASTER</name>\n"
- + " <category>MASTER</category>\n"
- + " <minInstanceCount>1</minInstanceCount>\n"
- + " <maxInstanceCount>2</maxInstanceCount>\n"
- + " <commandScript>\n"
- + " <script>scripts/hbase_master.py</script>\n"
- + " <scriptType>PYTHON</scriptType>\n"
- + " <timeout>600</timeout>\n"
- + " </commandScript>\n"
- + " </component>\n"
- + " <component>\n"
- + " <name>HBASE_REGIONSERVER</name>\n"
- + " <category>SLAVE</category>\n"
- + " <minInstanceCount>1</minInstanceCount>\n"
- + " <commandScript>\n"
- + " <script>scripts/hbase_regionserver.py</script>\n"
- + " <scriptType>PYTHON</scriptType>\n"
- + " </commandScript>\n"
- + " </component>\n"
- + " </components>\n"
- + " <osSpecifics>\n"
- + " <osSpecific>\n"
- + " <osType>any</osType>\n"
- + " <packages>\n"
- + " <package>\n"
- + " <type>tarball</type>\n"
- + " <name>files/hbase-0.96.1-hadoop2-bin.tar.gz</name>\n"
- + " </package>\n"
- + " </packages>\n"
- + " </osSpecific>\n"
- + " </osSpecifics>\n"
- + " </service>\n"
- + " </services>\n"
- + "</metainfo>";
-
InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
assert metainfo.getServices().size() == 1;
@@ -305,16 +358,41 @@
assert service.getName().equals("HBASE");
assert service.getComponents().size() == 2;
List<Component> components = service.getComponents();
+ int found = 0;
for (Component component : components) {
if (component.getName().equals("HBASE_MASTER")) {
assert component.getCommandScript().getScript().equals("scripts/hbase_master.py");
assert component.getCategory().equals("MASTER");
+ found++;
}
if (component.getName().equals("HBASE_REGIONSERVER")) {
assert component.getCommandScript().getScript().equals("scripts/hbase_regionserver.py");
assert component.getCategory().equals("SLAVE");
+ found++;
}
}
+ assert found == 2;
+
+ assert service.getExportGroups().size() == 1;
+ List<ExportGroup> egs = service.getExportGroups();
+ ExportGroup eg = egs.get(0);
+ assert eg.getName().equals("QuickLinks");
+ assert eg.getExports().size() == 2;
+
+ found = 0;
+ for (Export export : eg.getExports()) {
+ if (export.getName().equals("JMX_Endpoint")) {
+ found++;
+ assert export.getValue().equals(
+ "http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/jmx");
+ }
+ if (export.getName().equals("Master_Status")) {
+ found++;
+ assert export.getValue().equals(
+ "http://${HBASE_MASTER_HOST}:${site.hbase-site.hbase.master.info.port}/master-status");
+ }
+ }
+ assert found == 2;
AgentProviderService aps = new AgentProviderService();
AgentProviderService mockAps = Mockito.spy(aps);
@@ -336,6 +414,76 @@
assert metainfo == null;
}
+ private static class MyContainer extends Container {
+
+ ContainerId cid = null;
+
+ @Override
+ public ContainerId getId() {
+ return this.cid;
+ }
+
+ @Override
+ public void setId(ContainerId containerId) {
+ this.cid = containerId;
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public String getNodeHttpAddress() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setNodeHttpAddress(String s) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Resource getResource() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setResource(Resource resource) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Priority getPriority() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setPriority(Priority priority) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Token getContainerToken() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setContainerToken(Token token) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public int compareTo(Container o) {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
+
private static class MyContainerId extends ContainerId {
int id;
@@ -369,6 +517,11 @@
}
@Override
+ public int hashCode() {
+ return this.id;
+ }
+
+ @Override
public String toString() {
return "MyContainerId{" +
"id=" + id +