blob: c8e2a21c3061874a05569d019cda78824304c35f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.myriad.scheduler;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.mesos.Protos;
import org.apache.myriad.configuration.MyriadBadConfigurationException;
import org.apache.myriad.configuration.MyriadConfiguration;
import org.apache.myriad.configuration.MyriadContainerConfiguration;
import org.apache.myriad.configuration.MyriadDockerConfiguration;
import org.apache.myriad.configuration.NodeManagerConfiguration;
import org.apache.myriad.configuration.ServiceConfiguration;
import org.apache.myriad.executor.MyriadExecutorDefaults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
/**
* utility class for working with tasks and node manager profiles
*/
public class TaskUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskUtils.class);
private static final String YARN_NODEMANAGER_RESOURCE_CPU_VCORES = "yarn.nodemanager.resource.cpu-vcores";
private static final String YARN_NODEMANAGER_RESOURCE_MEMORY_MB = "yarn.nodemanager.resource.memory-mb";
private static final String CONTAINER_PATH_KEY = "containerPath";
private static final String HOST_PATH_KEY = "hostPath";
private static final String RW_MODE = "mode";
private static final String PARAMETER_KEY_KEY = "key";
private static final String PARAMETER_VALUE_KEY = "value";
private MyriadConfiguration cfg;
Random random = new Random();
@Inject
public TaskUtils(MyriadConfiguration cfg) {
this.cfg = cfg;
}
public static String getRevisedConfig(Double cpu, Double memory) {
String revisedConfig = "";
try {
// todo:(kgs) replace with more abstract xml parser
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
DocumentBuilder builder;
Document doc;
builder = factory.newDocumentBuilder();
InputStream resourceAsStream = TaskUtils.class.getClassLoader().getResourceAsStream("yarn-site.xml");
doc = builder.parse(new InputSource(resourceAsStream));
resourceAsStream.close();
XPathFactory xFactory = XPathFactory.newInstance();
XPath xpath = xFactory.newXPath();
XPathExpression cpuXpath = xpath.compile("//property/name");
Object cpuNodeObj = cpuXpath.evaluate(doc, XPathConstants.NODESET);
NodeList cpuNode = (NodeList) cpuNodeObj;
for (int i = 0; i < cpuNode.getLength(); i++) {
Node item = cpuNode.item(i);
if (YARN_NODEMANAGER_RESOURCE_CPU_VCORES.equals(item.getTextContent())) {
Node propertyNode = item.getParentNode();
NodeList childNodes = propertyNode.getChildNodes();
for (int j = 0; j < childNodes.getLength(); j++) {
Node item2 = childNodes.item(j);
if ("value".equals(item2.getNodeName())) {
item2.setTextContent(cpu.intValue() + "");
}
}
} else if (YARN_NODEMANAGER_RESOURCE_MEMORY_MB.equals(item.getTextContent())) {
Node propertyNode = item.getParentNode();
NodeList childNodes = propertyNode.getChildNodes();
for (int j = 0; j < childNodes.getLength(); j++) {
Node item2 = childNodes.item(j);
if ("value".equals(item2.getNodeName())) {
item2.setTextContent(memory.intValue() + "");
}
}
}
}
TransformerFactory tf = TransformerFactory.newInstance();
Transformer transformer = tf.newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
StringWriter writer = new StringWriter();
transformer.transform(new DOMSource(doc), new StreamResult(writer));
revisedConfig = writer.getBuffer().toString().replaceAll("\n|\r", "");
} catch (TransformerConfigurationException e) {
e.printStackTrace();
} catch (TransformerException | SAXException | XPathExpressionException | ParserConfigurationException e) {
LOGGER.error("Error with xml operation", e);
} catch (IOException e) {
LOGGER.error("Error with xml operation", e);
}
return revisedConfig;
}
public double getAggregateMemory(NMProfile profile) {
double totalTaskMemory;
double executorMemory = getExecutorMemory();
double nmJvmMaxMemoryMB = getNodeManagerMemory();
double advertisableMemory = profile.getMemory();
totalTaskMemory = executorMemory + nmJvmMaxMemoryMB + advertisableMemory;
return totalTaskMemory;
}
public double getAggregateCpus(NMProfile profile) {
return getNodeManagerCpus() + MyriadExecutorDefaults.DEFAULT_CPUS + profile.getCpus();
}
public double getNodeManagerMemory() {
return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB();
}
public double getNodeManagerCpus() {
return cfg.getNodeManagerConfiguration().getCpus();
}
public double getExecutorCpus() {
return MyriadExecutorDefaults.DEFAULT_CPUS;
}
public double getExecutorMemory() {
return cfg.getMyriadExecutorConfiguration().getJvmMaxMemoryMB();
}
public double getTaskCpus(NMProfile profile) {
return getAggregateCpus(profile) - getExecutorCpus();
}
public double getTaskMemory(NMProfile profile) {
return getAggregateMemory(profile) - getExecutorMemory();
}
public double getAuxTaskCpus(NMProfile profile, String taskName) throws MyriadBadConfigurationException {
if (taskName.startsWith(NodeManagerConfiguration.NM_TASK_PREFIX)) {
return getAggregateCpus(profile);
}
Optional<ServiceConfiguration> auxConf = cfg.getServiceConfiguration(taskName);
if (!auxConf.isPresent()) {
throw new MyriadBadConfigurationException("Can not find profile for task name: " + taskName);
}
return auxConf.get().getCpus();
}
public double getAuxTaskMemory(NMProfile profile, String taskName) throws MyriadBadConfigurationException {
if (taskName.startsWith(NodeManagerConfiguration.NM_TASK_PREFIX)) {
return getAggregateMemory(profile);
}
Optional<ServiceConfiguration> auxConf = cfg.getServiceConfiguration(taskName);
if (!auxConf.isPresent()) {
throw new MyriadBadConfigurationException("Cannot find profile for task name: " + taskName);
}
return auxConf.get().getJvmMaxMemoryMB();
}
public TaskUtils() {
super();
}
public Iterable<Protos.Volume> getVolumes(Iterable<Map<String, String>> volume) {
return Iterables.transform(volume, new Function<Map<String, String>, Protos.Volume>() {
@Nullable
@Override
public Protos.Volume apply(Map<String, String> map) {
Preconditions.checkArgument(map.containsKey(HOST_PATH_KEY) && map.containsKey(CONTAINER_PATH_KEY));
Protos.Volume.Mode mode = Protos.Volume.Mode.RO;
if (map.containsKey(RW_MODE) && map.get(RW_MODE).toLowerCase().equals("rw")) {
mode = Protos.Volume.Mode.RW;
}
return Protos.Volume.newBuilder()
.setContainerPath(map.get(CONTAINER_PATH_KEY))
.setHostPath(map.get(HOST_PATH_KEY))
.setMode(mode)
.build();
}
});
}
public Iterable<Protos.Parameter> getParameters(Iterable<Map<String, String>> params) {
Preconditions.checkNotNull(params);
return Iterables.transform(params, new Function<Map<String, String>, Protos.Parameter>() {
@Override
public Protos.Parameter apply(Map<String, String> parameter) {
Preconditions.checkNotNull(parameter, "Null parameter");
Preconditions.checkState(parameter.containsKey(PARAMETER_KEY_KEY), "Missing key");
Preconditions.checkState(parameter.containsKey(PARAMETER_VALUE_KEY), "Missing value");
return Protos.Parameter.newBuilder()
.setKey(parameter.get(PARAMETER_KEY_KEY))
.setValue(PARAMETER_VALUE_KEY)
.build();
}
});
}
private Protos.ContainerInfo.DockerInfo getDockerInfo(MyriadDockerConfiguration dockerConfiguration) {
Preconditions.checkArgument(dockerConfiguration.getNetwork().equals("HOST"), "Currently only host networking supported");
Protos.ContainerInfo.DockerInfo.Builder dockerBuilder = Protos.ContainerInfo.DockerInfo.newBuilder()
.setImage(dockerConfiguration.getImage())
.setForcePullImage(dockerConfiguration.getForcePullImage())
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.valueOf(dockerConfiguration.getNetwork()))
.setPrivileged(dockerConfiguration.getPrivledged())
.addAllParameters(getParameters(dockerConfiguration.getParameters()));
return dockerBuilder.build();
}
/**
* Builds a ContainerInfo Object
*
* @return ContainerInfo
*/
public Protos.ContainerInfo getContainerInfo() {
Preconditions.checkArgument(cfg.getContainerInfo().isPresent(), "ContainerConfiguration doesn't exist!");
MyriadContainerConfiguration containerConfiguration = cfg.getContainerInfo().get();
Protos.ContainerInfo.Builder containerBuilder = Protos.ContainerInfo.newBuilder()
.setType(Protos.ContainerInfo.Type.valueOf(containerConfiguration.getType()))
.addAllVolumes(getVolumes(containerConfiguration.getVolumes()));
if (containerConfiguration.getDockerInfo().isPresent()) {
MyriadDockerConfiguration dockerConfiguration = containerConfiguration.getDockerInfo().get();
containerBuilder.setDocker(getDockerInfo(dockerConfiguration));
}
return containerBuilder.build();
}
/**
* Helper function that returns all scalar resources of a given name in an offer up to a given value. Attempts to
* take resource from the prescribed role first and then from the default role. The variable used indicated any
* resources previously requested. Assumes enough resources are present.
*
* @param offer - An offer by Mesos, assumed to have enough resources.
* @param name - The name of the SCALAR resource, i.e. cpus or mem
* @param value - The amount of SCALAR resources needed.
* @param used - The amount of SCALAR resources already removed from this offer.
* @return An Iterable containing one or two scalar resources of a given name in an offer up to a given value.
*/
public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) {
String role = cfg.getFrameworkRole();
List<Protos.Resource> resources = new ArrayList<Protos.Resource>();
double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role *
//Find role by name, must loop through resources
for (Protos.Resource r : offer.getResourcesList()) {
if (r.getName().equals(name) && r.hasRole() && r.getRole().equals(role) && r.hasScalar()) {
//Use Math.max in case used>resourceValue
resourceDifference = Math.max(r.getScalar().getValue() - used, 0.0);
if (resourceDifference > 0) {
resources.add(Protos.Resource.newBuilder().setName(name).setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(Math.min(value, resourceDifference)).build())
.setRole(role).build());
}
break;
} else if (r.getName().equals(name) && r.hasRole() && r.getRole().equals(role)) {
//Should never get here, there must be a miss configured slave
LOGGER.warn("Resource with name: " + name + "expected type to be SCALAR check configuration on: " + offer.getHostname());
}
}
//Assume enough resources are present in default value, if not we shouldn't have gotten to this function.
if (value - resourceDifference > 0) {
resources.add(Protos.Resource.newBuilder().setName(name).setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(value - resourceDifference).build())
.build()); //no role assumes default
}
return resources;
}
}