blob: b1f80c5221cb43e0b86faf2d71bc64f509164450 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.nomad;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.google.common.base.Optional;
import com.hashicorp.nomad.apimodel.Job;
import com.hashicorp.nomad.apimodel.JobListStub;
import com.hashicorp.nomad.apimodel.NetworkResource;
import com.hashicorp.nomad.apimodel.Port;
import com.hashicorp.nomad.apimodel.Resources;
import com.hashicorp.nomad.apimodel.Service;
import com.hashicorp.nomad.apimodel.ServiceCheck;
import com.hashicorp.nomad.apimodel.Task;
import com.hashicorp.nomad.apimodel.TaskGroup;
import com.hashicorp.nomad.apimodel.Template;
import com.hashicorp.nomad.javasdk.EvaluationResponse;
import com.hashicorp.nomad.javasdk.NomadApiClient;
import com.hashicorp.nomad.javasdk.NomadApiConfiguration;
import com.hashicorp.nomad.javasdk.NomadException;
import com.hashicorp.nomad.javasdk.ServerQueryResponse;
import org.apache.heron.metricsmgr.MetricsSinksConfig;
import org.apache.heron.metricsmgr.sink.PrometheusSink;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.scheduler.UpdateTopologyManager;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.scheduler.utils.SchedulerUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.ConfigLoader;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import org.apache.heron.spi.scheduler.IScheduler;
import static org.apache.heron.scheduler.nomad.NomadConstants.METRICS_PORT;
@SuppressWarnings("IllegalCatch")
public class NomadScheduler implements IScheduler {
private static final Logger LOG = Logger.getLogger(NomadScheduler.class.getName());
private Config clusterConfig;
private Config localConfig;
private Config runtimeConfig;
private UpdateTopologyManager updateTopologyManager;
@Override
public void initialize(Config config, Config runtime) {
this.localConfig = config;
this.clusterConfig = Config.toClusterMode(Config.newBuilder()
.putAll(config)
.putAll(ConfigLoader.loadConfig(
Context.heronHome(config), Context.heronConf(config),
null, Context.apiserverOverrideFile(config) != null
? Context.apiserverOverrideFile(config) : Context.overrideFile(config)))
.build());
this.runtimeConfig = runtime;
this.updateTopologyManager =
new UpdateTopologyManager(config, runtime, Optional.absent());
}
@Override
public void close() {
// don't need to do anything
}
@Override
public boolean onSchedule(PackingPlan packing) {
if (packing == null || packing.getContainers().isEmpty()) {
LOG.severe("No container requested. Can't schedule");
return false;
}
NomadApiClient apiClient = getApiClient(NomadContext.getSchedulerURI(this.localConfig));
try {
List<Job> jobs = getJobs(packing);
startJobs(apiClient, jobs.toArray(new Job[jobs.size()]));
} catch (RuntimeException e) {
LOG.log(Level.SEVERE, "Failed to deploy topology "
+ Runtime.topologyName(this.runtimeConfig) + " with error: " + e.getMessage(), e);
return false;
} finally {
closeClient(apiClient);
}
return true;
}
@Override
public List<String> getJobLinks() {
List<String> jobLinks = new LinkedList<>();
String schedulerUri = NomadContext.getSchedulerURI(this.localConfig);
jobLinks.add(schedulerUri + NomadConstants.JOB_LINK);
return jobLinks;
}
@Override
public boolean onKill(Scheduler.KillTopologyRequest request) {
String topologyName = request.getTopologyName();
LOG.fine("Killing Topology " + topologyName);
NomadApiClient apiClient = getApiClient(NomadContext.getSchedulerURI(this.localConfig));
try {
List<Job> jobs = getTopologyJobs(apiClient, topologyName);
killJobs(apiClient, jobs.toArray(new Job[jobs.size()]));
} catch (RuntimeException e) {
LOG.log(Level.SEVERE, "Failed to kill topology " + topologyName
+ " with error: " + e.getMessage(), e);
return false;
} finally {
closeClient(apiClient);
}
return true;
}
@Override
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
LOG.fine("Restarting Topology " + request.getTopologyName()
+ " container " + request.getContainerIndex());
NomadApiClient apiClient = getApiClient(NomadContext.getSchedulerURI(this.localConfig));
String topologyName = request.getTopologyName();
int containerIndex = request.getContainerIndex();
try {
if (containerIndex == -1) {
// restarting whole topology
List<Job> jobs = getTopologyJobs(apiClient, topologyName);
restartJobs(apiClient, jobs.toArray(new Job[jobs.size()]));
} else {
// restarting single container
Job job = getTopologyContainerJob(apiClient, topologyName, containerIndex);
restartJobs(apiClient, job);
}
} catch (RuntimeException e) {
LOG.log(Level.SEVERE, "Failed to restart topology "
+ topologyName + " with error: " + e.getMessage(), e);
return false;
} finally {
closeClient(apiClient);
}
return true;
}
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
try {
updateTopologyManager.updateTopology(
request.getCurrentPackingPlan(), request.getProposedPackingPlan());
} catch (ExecutionException | InterruptedException e) {
LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
return false;
}
return true;
}
/**
* HELPER FUNCTIONS
**/
static NomadApiClient getApiClient(String uri) {
return new NomadApiClient(
new NomadApiConfiguration.Builder()
.setAddress(uri)
.build());
}
static void closeClient(NomadApiClient apiClient) {
try {
apiClient.close();
} catch (IOException e) {
LOG.log(Level.SEVERE, "Problem closing client: " + e.getMessage(), e);
}
}
List<Job> getJobs(PackingPlan packing) {
List<Job> ret = new LinkedList<>();
PackingPlan homogeneousPackingPlan = getHomogeneousPackingPlan(packing);
Resource resource = getHomogeneousContainerResource(homogeneousPackingPlan);
for (int i = 0; i < Runtime.numContainers(this.runtimeConfig); i++) {
Optional<PackingPlan.ContainerPlan> containerPlan = homogeneousPackingPlan.getContainer(i);
ret.add(getJob(i, containerPlan, resource));
}
return ret;
}
Job getJob(int containerIndex, Optional<PackingPlan.ContainerPlan> containerPlan,
Resource containerResource) {
String topologyName = Runtime.topologyName(this.runtimeConfig);
String topologyId = Runtime.topologyId(this.runtimeConfig);
Job job = new Job();
job.setId(getJobId(topologyId, containerIndex));
job.setName(getJobId(topologyName, containerIndex));
job.addTaskGroups(getTaskGroup(getJobId(topologyName, containerIndex),
containerIndex, containerResource));
job.setDatacenters(Arrays.asList(NomadConstants.NOMAD_DEFAULT_DATACENTER));
job.setMeta(getMetaData(this.runtimeConfig, containerPlan));
return job;
}
TaskGroup getTaskGroup(String groupName, int containerIndex, Resource containerResource) {
TaskGroup taskGroup = new TaskGroup();
taskGroup.setCount(1);
taskGroup.setName(groupName);
taskGroup.addTasks(getTask(groupName, containerIndex, containerResource));
return taskGroup;
}
Task getTask(String taskName, int containerIndex, Resource containerResource) {
String nomadDriver = NomadContext.getHeronNomadDriver(this.localConfig);
Task task = new Task();
if (nomadDriver.equals(NomadConstants.NomadDriver.RAW_EXEC.getName())) {
getTaskSpecRawDriver(task, taskName, containerIndex);
} else if (nomadDriver.equals(NomadConstants.NomadDriver.DOCKER.getName())) {
getTaskSpecDockerDriver(task, taskName, containerIndex);
} else {
throw new IllegalArgumentException("Invalid Nomad driver specified: " + nomadDriver);
}
// set resources requests
Resources resourceReqs = new Resources();
// configure nomad to allocate dynamic ports
Port[] ports = new Port[NomadConstants.EXECUTOR_PORTS.size()];
int i = 0;
for (SchedulerUtils.ExecutorPort port : NomadConstants.EXECUTOR_PORTS.keySet()) {
ports[i] = new Port().setLabel(port.getName().replace("-", "_"));
i++;
}
NetworkResource networkResource = new NetworkResource();
networkResource.addDynamicPorts(ports);
// set memory requirements
long memoryReqMb = containerResource.getRam().asMegabytes();
resourceReqs.setMemoryMb(longToInt(memoryReqMb));
// set CPU requirements
double coresReq = containerResource.getCpu();
double coresReqFreq = NomadContext.getCoreFreqMapping(this.localConfig) * coresReq;
resourceReqs.setCpu(Integer.valueOf((int) Math.round(coresReqFreq)));
// set disk requirements
long diskReqMb = containerResource.getDisk().asMegabytes();
resourceReqs.setDiskMb(longToInt(diskReqMb));
// allocate dynamic port for prometheus/websink metrics
String prometheusPortFile = getPrometheusMetricsFile(this.localConfig);
if (prometheusPortFile == null) {
LOG.severe("Failed to find port file for Prometheus metrics. "
+ "Please check metrics sinks configurations");
} else {
networkResource.addDynamicPorts(new Port().setLabel(METRICS_PORT));
task.addEnv(NomadConstants.METRICS_PORT_FILE, prometheusPortFile);
if (NomadContext.getHeronNomadMetricsServiceRegister(this.localConfig)) {
// getting tags for service
List<String> tags = new LinkedList<>();
tags.add(String.format("%s-%s",
Runtime.topologyName(this.runtimeConfig), containerIndex));
tags.addAll(Arrays.asList(
NomadContext.getHeronNomadMetricsServiceAdditionalTags(this.localConfig)));
//register metrics service with consul
Service service = new Service()
.setName(
getMetricsServiceName(Runtime.topologyName(this.runtimeConfig), containerIndex))
.setPortLabel(METRICS_PORT)
.setTags(tags)
.addChecks(new ServiceCheck().setType(NomadConstants.NOMAD_SERVICE_CHECK_TYPE)
.setPortLabel(METRICS_PORT)
.setInterval(TimeUnit.NANOSECONDS.convert(
NomadContext.getHeronNomadMetricsServiceCheckIntervalSec(this.localConfig),
TimeUnit.SECONDS))
.setTimeout(TimeUnit.NANOSECONDS.convert(
NomadContext.getHeronNomadMetricsServiceCheckTimeoutSec(this.localConfig),
TimeUnit.SECONDS)));
task.addServices(service);
}
}
resourceReqs.addNetworks(networkResource);
task.setResources(resourceReqs);
return task;
}
/**
* Get the task spec for using the docker driver in Nomad
* In docker mode, Heron will be use in docker containers
*/
Task getTaskSpecDockerDriver(Task task, String taskName, int containerIndex) {
String executorBinary = Context.executorBinary(this.clusterConfig);
// get arguments for heron executor command
String[] executorArgs = SchedulerUtils.executorCommandArgs(
this.clusterConfig, this.runtimeConfig, NomadConstants.EXECUTOR_PORTS,
String.valueOf(containerIndex));
// get complete heron executor command
String executorCmd = executorBinary + " " + String.join(" ", executorArgs);
// get heron_downloader command for downloading topology package
String topologyDownloadCmd = getFetchCommand(this.clusterConfig,
this.clusterConfig, this.runtimeConfig);
task.setName(taskName);
// use nomad driver
task.setDriver(NomadConstants.NomadDriver.DOCKER.getName());
// set docker image to use
task.addConfig(NomadConstants.NOMAD_IMAGE,
NomadContext.getHeronExecutorDockerImage(this.localConfig));
task.addConfig(NomadConstants.NOMAD_TASK_COMMAND, NomadConstants.SHELL_CMD);
task.addConfig(NomadConstants.NETWORK_MODE,
NomadContext.getHeronNomadNetworkMode(this.localConfig));
String setMetricsPortFileCmd = getSetMetricsPortFileCmd();
String[] args = {"-c", String.format("%s && %s && %s",
topologyDownloadCmd, setMetricsPortFileCmd, executorCmd)};
task.addConfig(NomadConstants.NOMAD_TASK_COMMAND_ARGS, args);
Map<String, String> envVars = new HashMap<>();
envVars.put(NomadConstants.HOST, "${attr.unique.network.ip-address}");
task.setEnv(envVars);
return task;
}
/**
* Get the task spec for using raw_exec driver in Nomad
* In raw exec mode, Heron will be run directly on the machine
*/
Task getTaskSpecRawDriver(Task task, String taskName, int containerIndex) {
String executorBinary = Context.executorBinary(this.clusterConfig);
// get arguments for heron executor command
String[] executorArgs = SchedulerUtils.executorCommandArgs(
this.clusterConfig, this.runtimeConfig, NomadConstants.EXECUTOR_PORTS,
String.valueOf(containerIndex));
// get complete heron executor command
String executorCmd = executorBinary + " " + String.join(" ", executorArgs);
// get heron_downloader command for downloading topology package
String topologyDownloadCmd = getFetchCommand(this.localConfig,
this.clusterConfig, this.runtimeConfig);
// read nomad heron executor start up script from file
String heronNomadScript = getHeronNomadScript(this.localConfig);
task.setName(taskName);
// use raw_exec driver
task.setDriver(NomadConstants.NomadDriver.RAW_EXEC.getName());
// call nomad heron start up script
task.addConfig(NomadConstants.NOMAD_TASK_COMMAND, NomadConstants.SHELL_CMD);
String[] args = {NomadConstants.NOMAD_HERON_SCRIPT_NAME};
task.addConfig(NomadConstants.NOMAD_TASK_COMMAND_ARGS, args);
Template template = new Template();
template.setEmbeddedTmpl(heronNomadScript);
template.setDestPath(NomadConstants.NOMAD_HERON_SCRIPT_NAME);
task.addTemplates(template);
// configure nomad to allocate dynamic ports
Port[] ports = new Port[NomadConstants.EXECUTOR_PORTS.size()];
int i = 0;
for (SchedulerUtils.ExecutorPort port : NomadConstants.EXECUTOR_PORTS.keySet()) {
ports[i] = new Port().setLabel(port.getName().replace("-", "_"));
i++;
}
// set enviroment variables used int the heron nomad start up script
Map<String, String> envVars = new HashMap<>();
envVars.put(NomadConstants.HERON_NOMAD_WORKING_DIR,
NomadContext.workingDirectory(this.localConfig) + "/container-"
+ String.valueOf(containerIndex));
if (NomadContext.useCorePackageUri(this.localConfig)) {
envVars.put(NomadConstants.HERON_USE_CORE_PACKAGE_URI, "true");
envVars.put(NomadConstants.HERON_CORE_PACKAGE_URI,
NomadContext.corePackageUri(this.localConfig));
} else {
envVars.put(NomadConstants.HERON_USE_CORE_PACKAGE_URI, "false");
envVars.put(NomadConstants.HERON_CORE_PACKAGE_DIR,
NomadContext.corePackageDirectory(this.localConfig));
}
envVars.put(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD, topologyDownloadCmd);
envVars.put(NomadConstants.HERON_EXECUTOR_CMD, executorCmd);
task.setEnv(envVars);
return task;
}
static List<JobListStub> getJobList(NomadApiClient apiClient) {
ServerQueryResponse<List<JobListStub>> response;
try {
response = apiClient.getJobsApi().list();
} catch (IOException | NomadException e) {
LOG.log(Level.SEVERE, "Error when attempting to fetch job list", e);
throw new RuntimeException(e);
}
return response.getValue();
}
static List<Job> getTopologyJobs(NomadApiClient apiClient, String topologyName) {
List<JobListStub> jobs = getJobList(apiClient);
List<Job> ret = new LinkedList<>();
for (JobListStub job : jobs) {
Job jobActual;
try {
jobActual = apiClient.getJobsApi().info(job.getId()).getValue();
} catch (IOException | NomadException e) {
String msg = "Failed to retrieve job info for job " + job.getId()
+ " part of topology " + topologyName;
LOG.log(Level.SEVERE, msg, e);
throw new RuntimeException(msg, e);
}
Map<String, String> metaData = jobActual.getMeta();
if (metaData != null && metaData.containsKey(NomadConstants.NOMAD_TOPOLOGY_NAME)) {
if (metaData.get(NomadConstants.NOMAD_TOPOLOGY_NAME).equals(topologyName)) {
ret.add(jobActual);
}
}
}
return ret;
}
static Job getTopologyContainerJob(NomadApiClient apiClient, String topologyName,
int containerIndex) {
List<Job> jobs = getTopologyJobs(apiClient, topologyName);
for (Job job : jobs) {
if (job.getMeta().get(NomadConstants.NOMAD_TOPOLOGY_CONTAINER_INDEX)
.equals(String.valueOf(containerIndex))) {
return job;
}
}
throw new RuntimeException("Container " + containerIndex
+ " does not exist for topology " + topologyName);
}
static void killJobs(NomadApiClient apiClient, Job... jobs) {
for (Job job : jobs) {
LOG.fine("Killing job " + job.getId());
try {
apiClient.getJobsApi().deregister(job.getId());
} catch (IOException | NomadException e) {
String errorMsg = "Failed to kill job " + job.getId();
LOG.log(Level.SEVERE, errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
}
}
static void restartJobs(NomadApiClient apiClient, Job... jobs) {
LOG.fine("Restarting jobs: " + Arrays.asList(jobs)
.stream().map(a -> a.getId()).collect(Collectors.toList()));
killJobs(apiClient, jobs);
startJobs(apiClient, jobs);
}
static void startJobs(NomadApiClient apiClient, Job... jobs) {
for (Job job : jobs) {
LOG.fine("Starting job " + job.getId());
LOG.fine("Job spec: " + job.toString());
try {
EvaluationResponse response = apiClient.getJobsApi().register(job);
LOG.fine("response: " + response);
} catch (IOException | NomadException e) {
String msg = "Failed to submit job "
+ job.getId() + " with error: " + e.getMessage();
LOG.log(Level.FINE, msg + " with job spec: " + job.toString());
throw new RuntimeException(msg, e);
}
}
}
static Map<String, String> getMetaData(
Config runtimeConfig,
Optional<PackingPlan.ContainerPlan> containerPlan) {
String topologyName = Runtime.topologyName(runtimeConfig);
String topologyId = Runtime.topologyId(runtimeConfig);
Map<String, String> metaData = new HashMap<>();
metaData.put(NomadConstants.NOMAD_TOPOLOGY_NAME, topologyName);
metaData.put(NomadConstants.NOMAD_TOPOLOGY_ID, topologyId);
if (containerPlan.isPresent()) {
metaData.put(NomadConstants.NOMAD_TOPOLOGY_CONTAINER_INDEX,
String.valueOf(containerPlan.get().getId()));
}
return metaData;
}
static String getJobId(String topologyName, int containerIndex) {
return String.format("%s-%d", topologyName, containerIndex);
}
static String getHeronNomadScript(Config config) {
try {
return new String(Files.readAllBytes(Paths.get(
NomadContext.getHeronNomadPath(config))), StandardCharsets.UTF_8);
} catch (IOException e) {
String msg = "Failed to read heron nomad script from "
+ NomadContext.getHeronNomadPath(config) + " . Please check file path!";
LOG.log(Level.SEVERE, msg, e);
throw new RuntimeException(msg, e);
}
}
/**
* Get the command that will be used to retrieve the topology JAR
*/
static String getFetchCommand(Config localConfig, Config clusterConfig, Config runtime) {
return String.format("%s -u %s -f . -m local -p %s -d %s",
Context.downloaderBinary(clusterConfig),
Runtime.topologyPackageUri(runtime).toString(), Context.heronConf(localConfig),
Context.heronHome(clusterConfig));
}
static int longToInt(long val) {
if (val < Integer.MIN_VALUE || val > Integer.MAX_VALUE) {
throw new IllegalArgumentException(val + " is too large and cannot be cast to an Integer");
}
return (int) val;
}
PackingPlan getHomogeneousPackingPlan(PackingPlan packingPlan) {
// Align resources to maximal requested resource
PackingPlan updatedPackingPlan = packingPlan.cloneWithHomogeneousScheduledResource();
SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(this.runtimeConfig),
updatedPackingPlan, Runtime.schedulerStateManagerAdaptor(this.runtimeConfig));
return updatedPackingPlan;
}
Resource getHomogeneousContainerResource(PackingPlan homogeneousPackingPlan) {
return homogeneousPackingPlan.getContainers().iterator().next().getRequiredResource();
}
static String getPrometheusMetricsFile(Config config) {
MetricsSinksConfig metricsSinksConfig;
try {
metricsSinksConfig = new MetricsSinksConfig(Context.metricsSinksFile(config),
Context.overrideFile(config));
} catch (IOException e) {
return null;
}
String prometheusSinkId = null;
Map<String, Object> prometheusSinkConfig = null;
for (String sinkId : metricsSinksConfig.getSinkIds()) {
Map<String, Object> sinkConfig = metricsSinksConfig.getConfigForSink(sinkId);
Object className = sinkConfig.get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME);
if (className != null && className instanceof String) {
if (PrometheusSink.class.getName().equals(className)) {
prometheusSinkId = sinkId;
prometheusSinkConfig = sinkConfig;
}
}
}
if (prometheusSinkId == null || prometheusSinkConfig == null) {
return null;
}
String prometheusMetricsPortFile = PrometheusSink.getServerPortFile(prometheusSinkConfig);
return prometheusMetricsPortFile;
}
static String getMetricsServiceName(String topologyName, int containerIndex) {
return String.format("metrics-heron-%s-%s", topologyName, containerIndex);
}
static String getSetMetricsPortFileCmd() {
return String.format("echo ${NOMAD_PORT_%s} > ${%s}",
NomadConstants.METRICS_PORT, NomadConstants.METRICS_PORT_FILE);
}
}