blob: ecef159b6050996cadbc4bfdb5bbd622272bf5eb [file] [log] [blame]
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler.kubernetes;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.primitives.Ints;
import com.twitter.heron.common.basics.FileUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.TopologyRuntimeManagementException;
import com.twitter.heron.scheduler.UpdateTopologyManager;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Context;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.Resource;
import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.scheduler.IScheduler;
public class KubernetesScheduler implements IScheduler, IScalable {
private static final Logger LOG = Logger.getLogger(KubernetesScheduler.class.getName());
private Config configuration;
private Config runtimeConfiguration;
private KubernetesController controller;
private UpdateTopologyManager updateTopologyManager;
protected KubernetesController getController() {
return new KubernetesController(
KubernetesContext.getSchedulerURI(configuration),
KubernetesContext.getKubernetesNamespace(configuration),
Runtime.topologyName(runtimeConfiguration),
Context.verbose(configuration));
}
@Override
public void initialize(Config config, Config runtime) {
// validate the topology name before moving forward
if (!topologyNameIsValid(Runtime.topologyName(runtime))) {
throw new RuntimeException(getInvalidTopologyNameMessage(Runtime.topologyName(runtime)));
}
this.configuration = config;
this.runtimeConfiguration = runtime;
this.controller = getController();
this.updateTopologyManager =
new UpdateTopologyManager(configuration, runtimeConfiguration,
Optional.<IScalable>of(this));
}
@Override
public void close() {
// Nothing to do here
}
@Override
public boolean onSchedule(PackingPlan packing) {
if (packing == null || packing.getContainers().isEmpty()) {
LOG.severe("No container requested. Can't schedule");
return false;
}
LOG.info("Submitting topology to Kubernetes");
String[] topologyConf = getTopologyConf(packing);
return controller.submitTopology(topologyConf);
}
@Override
public List<String> getJobLinks() {
List<String> jobLinks = new LinkedList<>();
String kubernetesPodsLink = KubernetesContext.getSchedulerURI(configuration)
+ KubernetesConstants.JOB_LINK;
jobLinks.add(kubernetesPodsLink);
return jobLinks;
}
@Override
public boolean onKill(Scheduler.KillTopologyRequest request) {
return controller.killTopology();
}
@Override
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
int appId = request.getContainerIndex();
return controller.restartApp(appId);
}
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
try {
updateTopologyManager.updateTopology(
request.getCurrentPackingPlan(), request.getProposedPackingPlan());
} catch (ExecutionException | InterruptedException e) {
LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
return false;
}
return true;
}
/**
* Build all the pod specifications so we can deploy the Heron topology. This will be a list of
* JSON strings that contain all the necessary pod specifications we'll need to pass to the
* K8S API
*
* @param packing - PackingPlan of the topology
*/
protected String[] getTopologyConf(PackingPlan packing) {
configuration = Config.newBuilder()
.putAll(configuration)
.put(Key.TOPOLOGY_BINARY_FILE,
FileUtils.getBaseName(Context.topologyBinaryFile(configuration)))
.build();
ObjectMapper mapper = new ObjectMapper();
// Align resources to maximal requested resource
PackingPlan updatedPackingPlan = packing.cloneWithHomogeneousScheduledResource();
SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(runtimeConfiguration),
updatedPackingPlan, Runtime.schedulerStateManagerAdaptor(runtimeConfiguration));
Resource containerResource = updatedPackingPlan.getContainers()
.iterator().next().getScheduledResource().get();
// Create app conf list for each container
String[] deploymentConfs =
new String[Ints.checkedCast(Runtime.numContainers(runtimeConfiguration))];
for (int i = 0; i < Runtime.numContainers(runtimeConfiguration); i++) {
deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource);
}
return deploymentConfs;
}
/**
* Build a specification object for a K8S Pod, which will house the Docker image and necessary
* setup information about the Pod so we can run a Heron container
*
* @param mapper - ObjectMapper instance we can use to create new JSON nodes
* @param containerIndex - The index of the container
* @param containerResource - The Resource object for the new container
*/
protected String buildKubernetesPodSpec(ObjectMapper mapper,
Integer containerIndex,
Resource containerResource) {
ObjectNode instance = mapper.createObjectNode();
instance.put(KubernetesConstants.API_VERSION, KubernetesConstants.API_VERSION_1);
instance.put(KubernetesConstants.API_KIND, KubernetesConstants.API_POD);
instance.set(KubernetesConstants.API_METADATA, getMetadata(mapper, containerIndex));
instance.set(KubernetesConstants.API_SPEC, getContainerSpec(mapper,
containerIndex,
containerResource));
return instance.toString();
}
/**
* Build a new container spec based of an existing spec. This is used when we want to copy
* an existing spec so we can easily create a new one, while replacing necessary information like
* the container index
*
* @param podSpec - Existing pod specification JSON object
* @param mapper - ObjectMapper instance we can use to create new JSON nodes
* @param containerPlan - The ContainerPlan for the new container
* @param oldContainerIndex - The index of the existing container
*/
protected String rebuildKubernetesPodSpec(JsonNode podSpec,
ObjectMapper mapper,
PackingPlan.ContainerPlan containerPlan,
Integer oldContainerIndex) {
ObjectNode newContainer = mapper.createObjectNode();
newContainer.put(KubernetesConstants.API_VERSION,
podSpec.get(KubernetesConstants.API_VERSION).asText());
newContainer.put(KubernetesConstants.API_KIND,
podSpec.get(KubernetesConstants.API_KIND).asText());
newContainer.set(KubernetesConstants.API_METADATA, getMetadata(mapper, containerPlan.getId()));
newContainer.set(KubernetesConstants.API_SPEC, rebuildContainerSpec(
podSpec.get(KubernetesConstants.API_SPEC),
mapper,
containerPlan,
oldContainerIndex));
return newContainer.toString();
}
/**
* Build the metadata that we're going to attach to our Pod specification. This metadata
* will allow us to query the set of pods that belong to a certain topology so we can easily
* retrieve or delete all of them at once.
*
* @param mapper - ObjectMapper instance we can use to create other JSON nodes
* @param containerIndex - Index of the container
*/
protected ObjectNode getMetadata(ObjectMapper mapper, int containerIndex) {
ObjectNode metadataNode = mapper.createObjectNode();
metadataNode.put(KubernetesConstants.NAME,
Joiner.on("-").join(Runtime.topologyName(runtimeConfiguration), containerIndex));
ObjectNode labels = mapper.createObjectNode();
labels.put(KubernetesConstants.TOPOLOGY_LABEL, Runtime.topologyName(runtimeConfiguration));
metadataNode.set(KubernetesConstants.METADATA_LABELS, labels);
return metadataNode;
}
/**
* Based on an existing container spec, rebuild it and replace necessary information for building
* a new container (primarily the container index)
*
* @param existingSpec - Existing container spec
* @param mapper - ObjectMapper instance to use for creating new JSON nodes
* @param containerPlan - New container's ContainerPlan
* @param oldContainerIndex - The index of the old container
*/
protected ObjectNode rebuildContainerSpec(JsonNode existingSpec,
ObjectMapper mapper,
PackingPlan.ContainerPlan containerPlan,
int oldContainerIndex) {
ObjectNode containerSpec = mapper.createObjectNode();
ArrayNode containerList = mapper.createArrayNode();
for (JsonNode existingContainer : existingSpec.get("containers")) {
ObjectNode containerInfo = mapper.createObjectNode();
containerInfo.put(KubernetesConstants.NAME, Joiner.on("-").join("executor",
Integer.toString(containerPlan.getId())));
// set the host for this container
ArrayNode envList = mapper.createArrayNode();
ObjectNode envVar = mapper.createObjectNode();
envVar.put(KubernetesConstants.NAME, KubernetesConstants.HOST);
// build the JSON to attach the Pod IP as the "HOST" environment variable
ObjectNode fieldRef = mapper.createObjectNode();
ObjectNode fieldPath = mapper.createObjectNode();
fieldPath.put(KubernetesConstants.FIELD_PATH, KubernetesConstants.POD_IP);
fieldRef.set(KubernetesConstants.FIELD_REF, fieldPath);
envVar.set(KubernetesConstants.VALUE_FROM, fieldRef);
envList.add(envVar);
containerInfo.set(KubernetesConstants.ENV, envList);
// set the docker image to the same image as the existing container
containerInfo.put(KubernetesConstants.DOCKER_IMAGE,
existingContainer.get(KubernetesConstants.DOCKER_IMAGE).asText());
// Port info -- all the same
containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
// In order for the container to run with the correct index, we're copying the base
// configuration for container with index 0, and replacing the container index with
// the index for the new container we're going to deploy
// Example: " 0 exclamationTopology" will be replaced w/ " <new_cntr_idx> exclamationTopology"
// The rest of the command will stay the same
ArrayNode commandsArray = mapper.createArrayNode();
for (JsonNode cmd : existingContainer.get("command")) {
String oldPattern = " " + oldContainerIndex + " "
+ Runtime.topologyName(runtimeConfiguration);
commandsArray.add(cmd.asText().replaceAll(oldPattern,
" " + containerPlan.getId() + " "
+ Runtime.topologyName(runtimeConfiguration)));
}
containerInfo.set(KubernetesConstants.COMMAND, commandsArray);
// Requested resource info
ObjectNode requestedResourceInfo = mapper.createObjectNode();
requestedResourceInfo.put(KubernetesConstants.MEMORY,
containerPlan.getRequiredResource().getRam().asMegabytes());
requestedResourceInfo.put(KubernetesConstants.CPU,
containerPlan.getRequiredResource().getCpu());
// Wrap it up into a resources dictionary
ObjectNode resourceInfo = mapper.createObjectNode();
resourceInfo.set(KubernetesConstants.REQUESTS, requestedResourceInfo);
containerInfo.set(KubernetesConstants.RESOURCES, resourceInfo);
containerList.add(containerInfo);
containerSpec.set(KubernetesConstants.CONTAINERS, containerList);
}
return containerSpec;
}
/**
* Get the JSON-based specification for the K8S Pod
*
* @param mapper - An ObjectMapper instance which can be used to create JSON nodes
* @param containerIndex - Index of the container
* @param containerResource - The containers Resource object
*/
protected ObjectNode getContainerSpec(ObjectMapper mapper,
int containerIndex,
Resource containerResource) {
ObjectNode containerSpec = mapper.createObjectNode();
ArrayNode containerList = mapper.createArrayNode();
ObjectNode containerInfo = mapper.createObjectNode();
containerInfo.put(KubernetesConstants.NAME, Joiner.on("-").join("executor",
Integer.toString(containerIndex)));
// set the host for this container
ArrayNode envList = mapper.createArrayNode();
ObjectNode envVar = mapper.createObjectNode();
envVar.put(KubernetesConstants.NAME, KubernetesConstants.HOST);
// build the JSON to attach the Pod IP as the "HOST" environment variable
ObjectNode fieldRef = mapper.createObjectNode();
ObjectNode fieldPath = mapper.createObjectNode();
fieldPath.put(KubernetesConstants.FIELD_PATH, KubernetesConstants.POD_IP);
fieldRef.set(KubernetesConstants.FIELD_REF, fieldPath);
envVar.set(KubernetesConstants.VALUE_FROM, fieldRef);
envList.add(envVar);
containerInfo.set(KubernetesConstants.ENV, envList);
// Image information for this container
containerInfo.put(KubernetesConstants.DOCKER_IMAGE,
KubernetesContext.getExecutorDockerImage(configuration));
// Port information for this container
containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
// Heron command for the container
String[] command = getExecutorCommand(containerIndex);
ArrayNode commandsArray = mapper.createArrayNode();
for (int i = 0; i < command.length; i++) {
commandsArray.add(command[i]);
}
containerInfo.set(KubernetesConstants.COMMAND, commandsArray);
// Requested resource info
ObjectNode requestedResourceInfo = mapper.createObjectNode();
requestedResourceInfo.put(KubernetesConstants.MEMORY, containerResource.getRam().asMegabytes());
requestedResourceInfo.put(KubernetesConstants.CPU, containerResource.getCpu());
// Wrap it up into a resources dictionary
ObjectNode resourceInfo = mapper.createObjectNode();
resourceInfo.set(KubernetesConstants.REQUESTS, requestedResourceInfo);
containerInfo.set(KubernetesConstants.RESOURCES, resourceInfo);
containerList.add(containerInfo);
containerSpec.set(KubernetesConstants.CONTAINERS, containerList);
return containerSpec;
}
/**
* Get the ports the container will need to expose so other containers can access its services
*
* @param mapper
*/
protected ArrayNode getPorts(ObjectMapper mapper) {
ArrayNode ports = mapper.createArrayNode();
for (int i = 0; i < KubernetesConstants.PORT_NAMES.length; i++) {
ObjectNode port = mapper.createObjectNode();
port.put(KubernetesConstants.DOCKER_CONTAINER_PORT,
Integer.parseInt(KubernetesConstants.PORT_LIST[i], 10));
port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.PORT_NAMES[i]);
ports.add(port);
}
return ports;
}
/**
* Get the command that will be used to retrieve the topology JAR
*/
static String getFetchCommand(Config config, Config runtime) {
return String.format("%s %s .", Context.downloaderBinary(config),
Runtime.topologyPackageUri(runtime).toString());
}
/**
* Get the command string needed to start the container
*
* @param containerIndex
*/
protected String[] getExecutorCommand(int containerIndex) {
String[] executorCommand =
SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
containerIndex, Arrays.asList(KubernetesConstants.PORT_LIST));
String[] command = {
"sh",
"-c",
getFetchCommand(configuration, runtimeConfiguration)
+ " && " + Joiner.on(" ").join(executorCommand)
};
return command;
}
/**
* Add containers for a scale-up event from an update command
*
* @param containersToAdd the list of containers that need to be added
*
* NOTE: Due to the mechanics of Kubernetes pod creation, each container must be created on
* a one-by-one basis. If one container out of many containers to be deployed failed, it will
* leave the topology in a bad state.
*
* TODO (jrcrawfo) -- (https://github.com/twitter/heron/issues/1981)
*/
@Override
public void addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
// grab the base pod so we can copy and modify some stuff
String basePodName = Runtime.topologyName(runtimeConfiguration) + "-0";
JsonNode podConfig;
try {
podConfig = controller.getBasePod(basePodName);
} catch (IOException ioe) {
throw new TopologyRuntimeManagementException("Unable to retrieve base pod configuration from "
+ basePodName, ioe);
}
// iterate over the containers we need to add and rebuild the spec based on the new plan
ObjectMapper mapper = new ObjectMapper();
int totalNewContainerCount = containersToAdd.size();
int deployedContainerCount = 0;
for (PackingPlan.ContainerPlan containerPlan : containersToAdd) {
String newContainer = rebuildKubernetesPodSpec(podConfig, mapper, containerPlan, 0);
// deploy this new container
try {
controller.deployContainer(newContainer);
LOG.log(Level.INFO, "New container " + ++deployedContainerCount + "/"
+ totalNewContainerCount + " deployed");
} catch (IOException ioe) {
throw new TopologyRuntimeManagementException("Problem adding container with id "
+ containerPlan.getId() + ". Deployed " + deployedContainerCount + " out of "
+ totalNewContainerCount + " containers", ioe);
}
}
}
/**
* Remove containers for a scale-down event from an update command
*
* @param containersToRemove the list of containers that need to be removed
*
* NOTE: Due to the mechanics of Kubernetes pod removal, each container must be removed on
* a one-by-one basis. If one container out of many containers to be removed failed, it will
* leave the topology in a bad state.
*
* TODO (jrcrawfo) -- (https://github.com/twitter/heron/issues/1981)
*/
@Override
public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
for (PackingPlan.ContainerPlan container : containersToRemove) {
String podName = Runtime.topologyName(runtimeConfiguration) + "-" + container.getId();
try {
controller.removeContainer(podName);
} catch (IOException ioe) {
throw new TopologyRuntimeManagementException("Problem removing container with id "
+ container.getId(), ioe);
}
}
}
static boolean topologyNameIsValid(String topologyName) {
final Matcher matcher = KubernetesConstants.VALID_POD_NAME_REGEX.matcher(topologyName);
return matcher.matches();
}
private static String getInvalidTopologyNameMessage(String topologyName) {
return String.format("Invalid topology name: \"%s\": "
+ "topology names in kubernetes must consist of lower case alphanumeric "
+ "characters, '-' or '.', and must start and end with an alphanumeric "
+ "character.", topologyName);
}
}