blob: 587f616920a15489bd91dfe6cac2f58132e8514a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.bsp.TaskWorkerManager.TaskWorker;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.Protos.CommandInfo;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.Value;
import org.apache.hadoop.conf.Configuration;
import com.google.protobuf.ByteString;
public class ResourceManager {
public static final Log log = LogFactory.getLog(ResourceManager.class);
private final String anyGroomServer = "_any_";
private Configuration conf;
private static long launchedTasks = 0;
private Set<JobInProgress> executingJobs = Collections.synchronizedSet(new HashSet<JobInProgress>());
private Set<TaskInProgress> executingTasks = Collections.synchronizedSet(new HashSet<TaskInProgress>());
private Map<String, java.util.Queue<TaskInProgress>> tasksToRunByGroom;
private Set<TaskInProgress> tasksToRun;
private Set<TaskInProgress> recoveryTasks = Collections.synchronizedSet(new HashSet<TaskInProgress>());
private long slotMemory;
// Overhead requirements for the container groom server
double groomCpus;
double groomMem;
double groomDisk;
TaskDelegator taskDelegator;
/**
* Constructor for the mesos resource manager
*
* @param conf The configuration options for hama
* @param serverManager A reference to the groom server manager
* @param driver The mesos driver. This is required to terminate tasks
*/
public ResourceManager(Configuration conf,
AtomicReference<GroomServerManager> serverManager, SchedulerDriver driver) {
tasksToRunByGroom = new ConcurrentHashMap<String, java.util.Queue<TaskInProgress>>();
tasksToRunByGroom.put(anyGroomServer, new ConcurrentLinkedQueue<TaskInProgress>());
tasksToRun = new HashSet<TaskInProgress>();
slotMemory = parseMemory(conf);
taskDelegator = new TaskDelegator(serverManager, driver, recoveryTasks);
serverManager.get().addGroomStatusListener(taskDelegator);
this.conf = conf;
groomCpus = conf.getInt("hama.mesos.groom.cpu", 0);
groomMem = conf.getInt("hama.mesos.groom.mem", 200);
groomDisk = conf.getInt("hama.mesos.groom.disk", 0);
}
/**
* Handle a resource offer by the mesos framework
*
* @param schedulerDriver The mesos scheduler driver
* @param offers A list of offers from mesos
*/
public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers) {
if (tasksToRun.isEmpty()) {
// there is no need to track executing tasks if everything is
// started
clearQueues();
for (Offer offer : offers) {
schedulerDriver.declineOffer(offer.getId());
}
} else {
for (Offer offer : offers) {
useOffer(schedulerDriver, offer);
}
}
}
private void clearQueues() {
synchronized (tasksToRunByGroom) {
for (java.util.Queue<TaskInProgress> queue : tasksToRunByGroom.values()) {
queue.clear();
}
executingTasks.clear();
}
}
private void useOffer(SchedulerDriver schedulerDriver, Offer offer) {
log.debug("Received offer From: " + offer.getHostname());
String host = offer.getHostname();
ResourceOffer ro = new ResourceOffer(offer);
int maxSlots = ro.getMaxSlots();
if (maxSlots == 0) {
schedulerDriver.declineOffer(offer.getId());
return;
}
java.util.Queue<TaskInProgress> tasks = new LinkedList<TaskInProgress>();
while (tasks.size() < maxSlots) {
TaskInProgress tip = null;
if (tasksToRunByGroom.get(host) != null) {
tip = tasksToRunByGroom.get(host).poll();
}
if (tip == null) {
tip = tasksToRunByGroom.get(anyGroomServer).poll();
if (tip == null) {
if (tasks.isEmpty()) {
schedulerDriver.declineOffer(offer.getId());
}
break;
}
}
if (executingTasks.contains(tip)) {
continue;
}
executingTasks.add(tip);
tasksToRun.remove(tip);
tasks.add(tip);
log.debug("Found offer for: " + tip.getTaskId());
}
if (!tasks.isEmpty()) {
launchTasks(schedulerDriver, tasks, ro);
}
}
class MesosTaskWorker implements TaskWorker {
private final JobInProgress jip;
public MesosTaskWorker(JobInProgress jip) {
this.jip = jip;
}
@Override
public Boolean call() throws Exception {
log.debug("Task Worker called: " + jip.tasks.length);
for (TaskInProgress tip : jip.tasks) {
if (jip.isRecoveryPending()) {
recoveryTasks.add(tip);
}
String[] grooms = jip.getPreferredGrooms(tip, null, null);
if (grooms == null) {
grooms = new String[] { anyGroomServer };
}
log.info("Prefered Groom for tip " + tip.idWithinJob() + ": "
+ grooms[0]);
synchronized (tasksToRunByGroom) {
for (String groom : grooms) {
if (!tasksToRunByGroom.containsKey(groom)) {
tasksToRunByGroom.put(groom, new ConcurrentLinkedQueue<TaskInProgress>());
log.info("Received request for groom: " + groom);
}
tasksToRunByGroom.get(groom).add(tip);
}
tasksToRun.add(tip);
}
}
executingJobs.add(jip);
return true;
}
}
private void launchTasks(SchedulerDriver schedulerDriver,
java.util.Queue<TaskInProgress> tips, ResourceOffer offer) {
TaskID taskId = TaskID.newBuilder().setValue("Task_" + launchedTasks++)
.build();
List<Long> ports = claimPorts(offer.ports, 2);
double taskCpus = 1 * tips.size() + groomCpus;
double taskMem = slotMemory * tips.size() + groomMem;
double taskDisk = 10 + groomDisk;
String uri = conf.get("hama.mesos.executor.uri");
if (uri == null) {
throw new RuntimeException(
"Expecting configuration property 'mapred.mesos.executor.uri'");
}
String directory = conf.get("hama.mesos.executor.directory");
if (directory == null || directory.equals("")) {
log.info("URI: " + uri + ", name: " + new File(uri).getName());
directory = new File(uri).getName().split("\\.")[0] + "*";
}
log.debug("Directory: " + directory);
String command = conf.get("hama.mesos.executor.command");
if (command == null || command.equals("")) {
command = "env ; bash -x ./bin/hama org.apache.hama.bsp.MesosExecutor";
}
// Set up the environment for running the TaskTracker.
Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder();
// Set java specific environment, appropriately.
Map<String, String> env = System.getenv();
if (env.containsKey("JAVA_HOME")) {
envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
.setName("JAVA_HOME").setValue(env.get("JAVA_HOME")));
}
envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
.setName("HAMA_LOG_DIR").setValue("logs"));
log.debug("JAVA_HOME: " + env.get("JAVA_HOME"));
if (env.containsKey("JAVA_LIBRARY_PATH")) {
envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
.setName("JAVA_LIBRARY_PATH").setValue(env.get("JAVA_LIBRARY_PATH")));
}
log.debug("JAVA_LIBRARY_PATH: " + env.get("JAVA_LIBRARY_PATH"));
CommandInfo commandInfo = CommandInfo.newBuilder()
.setEnvironment(envBuilder)
.setValue(String.format("cd %s && %s", directory, command))
.addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
log.debug("Offer: cpus: " + offer.cpus + " mem: " + offer.mem + "disk: "
+ offer.disk);
log.debug("Cpu: " + taskCpus + " Mem: " + taskMem + " Disk: " + taskDisk
+ " port: " + ports.get(0));
TaskInfo info = TaskInfo
.newBuilder()
.setName(taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.offer.getSlaveId())
.addResources(
Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR)
.setRole(offer.cpuRole)
.setScalar(Value.Scalar.newBuilder().setValue(taskCpus)))
.addResources(
Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR)
.setRole(offer.memRole)
.setScalar(Value.Scalar.newBuilder().setValue(taskMem)))
.addResources(
Resource.newBuilder().setName("disk").setType(Value.Type.SCALAR)
.setRole(offer.diskRole)
.setScalar(Value.Scalar.newBuilder().setValue(taskDisk)))
.addResources(
Resource
.newBuilder()
.setName("ports")
.setType(Value.Type.RANGES)
.setRole(offer.portRole)
.setRanges(
Value.Ranges
.newBuilder()
.addRange(
Value.Range.newBuilder().setBegin(ports.get(0))
.setEnd(ports.get(0)))
.addRange(
Value.Range.newBuilder().setBegin(ports.get(1))
.setEnd(ports.get(1)))))
.setExecutor(
ExecutorInfo
.newBuilder()
.setExecutorId(
ExecutorID.newBuilder().setValue(
"executor_" + taskId.getValue()))
.setName("Hama Groom Server").setSource(taskId.getValue())
.setCommand(commandInfo))
.setData(
ByteString.copyFrom(getConfigurationOverride(ports.get(0),
ports.get(1), tips.size(), (long) taskMem))).build();
log.debug("Accepting offer " + offer.offer.getId() + " cpus: " + taskCpus
+ " mem: " + taskMem);
for (TaskInProgress tip : tips) {
taskDelegator.addTask(tip, taskId, offer.offer.getHostname(), ports
.get(0).intValue());
}
schedulerDriver.launchTasks(offer.offer.getId(), Arrays.asList(info));
}
private List<Long> claimPorts(List<Value.Range> offeredPorts, int count) {
List<Long> ports = new ArrayList<Long>(count);
for (Value.Range range : offeredPorts) {
long begin = range.getBegin();
while (ports.size() < count & range.getEnd() != begin) {
ports.add(begin++);
}
if (ports.size() == count) {
break;
}
}
return ports;
}
private byte[] getConfigurationOverride(Long groomRPCPort,
Long groomPeerPort, Integer maxTasks, Long slotJVMHeap) {
// Create a configuration from the current configuration and
// override properties as appropriate for the Groom server.
Configuration overrides = new Configuration(conf);
overrides.set("bsp.groom.rpc.port", groomRPCPort.toString());
overrides.set("bsp.peer.port", groomPeerPort.toString());
overrides.set("bsp.tasks.maximum", maxTasks.toString());
overrides.set("bsp.child.java.opts", conf.get("bsp.child.java.opts")
+ slotJVMHeap + "m");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
overrides.write(new DataOutputStream(baos));
baos.flush();
} catch (IOException e) {
log.warn("Failed to serialize configuration.", e);
System.exit(1);
}
return baos.toByteArray();
}
private class ResourceOffer {
Offer offer;
double cpus;
double mem;
double disk;
List<Value.Range> ports;
String cpuRole;
String memRole;
String diskRole;
String portRole;
public ResourceOffer(Offer offer) {
this.offer = offer;
parseOffer(offer);
}
private void parseOffer(Offer offer) {
// Pull out the cpus, memory, disk, and 2 ports from the offer.
for (Resource resource : offer.getResourcesList()) {
if (resource.getName().equals("cpus")
&& resource.getType() == Value.Type.SCALAR) {
cpus = resource.getScalar().getValue();
cpuRole = resource.getRole();
} else if (resource.getName().equals("mem")
&& resource.getType() == Value.Type.SCALAR) {
mem = resource.getScalar().getValue();
memRole = resource.getRole();
} else if (resource.getName().equals("disk")
&& resource.getType() == Value.Type.SCALAR) {
disk = resource.getScalar().getValue();
diskRole = resource.getRole();
} else if (resource.getName().equals("ports")
&& resource.getType() == Value.Type.RANGES) {
portRole = resource.getRole();
ports = resource.getRanges().getRangeList();
}
}
}
private int getMaxSlots() {
return (int) Math.min(cpus, mem / slotMemory);
}
}
/**
* Get the amount of memory requested in MiB
*
* @param javaOpts java options
* @return mesos formated memory argument
*/
private static long parseMemory(Configuration conf) {
String javaOpts = conf.get("bsp.child.java.opts", "-Xmx200m");
Matcher memMatcher = Pattern.compile("^*-Xmx+([0-9]+)([k,m,g]).*").matcher(
javaOpts);
if (memMatcher.matches()) {
long value = Long.parseLong(memMatcher.group(1));
String unit = memMatcher.group(2);
if (unit.equals("k")) {
value = (long) Math.ceil((float) value / 1024);
} else if (unit.equals("g")) {
value = value * 1024;
}
// remove memory request from the child java opts so it may be added
// later
conf.set("bsp.child.java.opts", memMatcher.replaceAll(""));
return value;
} else {
// default to 200 MiB
return 200;
}
}
}