blob: 25b9209b6aa8e654fa23dea40acbe6af6f06d840 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.mesos;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.scheduler.mesos.framework.BaseContainer;
import org.apache.heron.scheduler.mesos.framework.MesosFramework;
import org.apache.heron.scheduler.mesos.framework.TaskUtils;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.scheduler.utils.SchedulerUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.heron.spi.scheduler.IScheduler;
import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
/**
* Schedule a topology to a mesos cluster
*/
public class MesosScheduler implements IScheduler {
private static final Logger LOG = Logger.getLogger(MesosScheduler.class.getName());
private Config config;
private Config runtime;
private MesosFramework mesosFramework;
private SchedulerDriver driver;
@Override
public void initialize(Config mConfig, Config mRuntime) {
this.config = mConfig;
this.runtime = mRuntime;
this.mesosFramework = getMesosFramework();
String masterURI = MesosContext.getHeronMesosMasterUri(config);
this.driver = getSchedulerDriver(masterURI, mesosFramework);
startSchedulerDriver();
}
/**
* Start the scheduler driver and wait it to get registered
*/
protected void startSchedulerDriver() {
// start the driver non-blocking,
// since we need to set heron state after the scheduler driver is started.
// Heron will block the main thread eventually
driver.start();
// Staging the Mesos Framework
LOG.info("Waiting for Mesos Framework get registered");
long timeout = MesosContext.getHeronMesosFrameworkStagingTimeoutMs(config);
if (!mesosFramework.waitForRegistered(timeout, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Failed to register with Mesos Master in time");
}
}
/**
* Waits for the driver to be stopped or aborted
*
* @param timeout maximum time waiting
* @param unit time unit to wait
*/
protected void joinSchedulerDriver(long timeout, TimeUnit unit) {
// ExecutorService used to monitor whether close() completes in time
ExecutorService service = Executors.newFixedThreadPool(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
Runnable driverJoin = new Runnable() {
@Override
public void run() {
driver.join();
closeLatch.countDown();
}
};
service.submit(driverJoin);
LOG.info("Waiting for Mesos Driver got stopped");
try {
if (!closeLatch.await(timeout, unit)) {
LOG.severe("Mesos Driver failed to stop in time.");
} else {
LOG.info("Mesos Driver stopped.");
}
} catch (InterruptedException e) {
LOG.log(Level.SEVERE, "Close latch thread is interrupted: ", e);
}
// Shutdown the ExecutorService
service.shutdownNow();
}
@Override
public void close() {
// Stop the SchedulerDriver
if (driver != null) {
// Stop the SchedulerDriver
if (this.mesosFramework.isTerminated()) {
driver.stop();
} else {
driver.stop(true);
}
// Waits for the driver to be stopped or aborted,
// i.e. waits for the message sent to Mesos Master
long stopTimeoutInMs = MesosContext.getHeronMesosSchedulerDriverStopTimeoutMs(config);
joinSchedulerDriver(stopTimeoutInMs, TimeUnit.MILLISECONDS);
}
// Will not kill the topology when close() is invoked,
// since the lifecycle of Topology is independent from Scheduler
}
@Override
public boolean onSchedule(PackingPlan packing) {
// Construct the jobDefinition
Map<Integer, BaseContainer> jobDefinition = new HashMap<>();
for (int containerIndex = 0;
containerIndex < Runtime.numContainers(runtime);
containerIndex++) {
jobDefinition.put(containerIndex, getBaseContainer(containerIndex, packing));
}
return mesosFramework.createJob(jobDefinition);
}
@Override
public List<String> getJobLinks() {
Protos.FrameworkID frameworkID = mesosFramework.getFrameworkId();
// FrameworkID should exist otherwise onSchedule(..) returned false directly earlier
// So no need to null check
// The job link's format
String jobLink = String.format("%s/#/frameworks/%s",
MesosContext.getHeronMesosMasterUri(config), frameworkID.getValue());
List<String> jobLinks = new ArrayList<>();
jobLinks.add(jobLink);
return jobLinks;
}
@Override
public boolean onKill(Scheduler.KillTopologyRequest request) {
return mesosFramework.killJob();
}
@Override
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
int containerId = request.getContainerIndex();
return mesosFramework.restartJob(containerId);
}
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}
protected MesosFramework getMesosFramework() {
return new MesosFramework(config, runtime);
}
protected SchedulerDriver getSchedulerDriver(String masterURI, MesosFramework framework) {
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder()
.setUser("") // Have Mesos fill in the current user.
.setName("heron_scheduler_" + Context.topologyName(config))
.setCheckpoint(true);
return new MesosSchedulerDriver(
framework,
frameworkBuilder.build(),
masterURI);
}
/**
* Get BaseContainer info.
*
* @param containerIndex container index to start
* @param packing the PackingPlan
* @return BaseContainer Info
*/
protected BaseContainer getBaseContainer(Integer containerIndex, PackingPlan packing) {
BaseContainer container = new BaseContainer();
container.name = TaskUtils.getTaskNameForContainerIndex(containerIndex);
container.runAsUser = Context.role(config);
container.description = String.format("Container %d for topology %s",
containerIndex, Context.topologyName(config));
// Fill in the resources requirement for this container
fillResourcesRequirementForBaseContainer(container, containerIndex, packing);
// Force running as shell
container.shell = true;
// Infinite retries
container.retries = Integer.MAX_VALUE;
// The dependencies for the container
container.dependencies = new ArrayList<>();
String topologyPath =
Runtime.schedulerProperties(runtime).getProperty(Key.TOPOLOGY_PACKAGE_URI.value());
String heronCoreReleasePath = Context.corePackageUri(config);
container.dependencies.add(topologyPath);
container.dependencies.add(heronCoreReleasePath);
return container;
}
/**
* Fill the the resources requirement, i.e. CPU, memory and disk for the given container.
* This method changes the BaseContainer passed in.
* <p>
* Notice: Currently we just make every container homogeneous,
* requiring maximum resources for every container.
*
* @param container the BaseContainer to fill value in
* @param containerIndex the index of the container
* @param packing the packing plan
*/
protected void fillResourcesRequirementForBaseContainer(
BaseContainer container, Integer containerIndex, PackingPlan packing) {
PackingPlan updatedPackingPlan = packing.cloneWithHomogeneousScheduledResource();
Resource maxResourceContainer =
updatedPackingPlan.getContainers().iterator().next().getRequiredResource();
double cpu = 0;
ByteAmount disk = ByteAmount.ZERO;
ByteAmount mem = ByteAmount.ZERO;
for (PackingPlan.ContainerPlan cp : packing.getContainers()) {
Resource containerResource = cp.getRequiredResource();
cpu = Math.max(cpu, containerResource.getCpu());
disk = disk.max(containerResource.getDisk());
mem = mem.max(containerResource.getRam());
}
container.cpu = maxResourceContainer.getCpu();
// Convert them from bytes to MB
container.diskInMB = maxResourceContainer.getDisk().asMegabytes();
container.memInMB = maxResourceContainer.getRam().asMegabytes();
container.ports = SchedulerUtils.ExecutorPort.getRequiredPorts().size();
}
}