blob: bb6a919f605e48f7d1ecc3ddd1b1fdd9c3b3b2e9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.kubernetes;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import com.google.common.base.Optional;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.scheduler.UpdateTopologyManager;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.scheduler.IScalable;
import org.apache.heron.spi.scheduler.IScheduler;
public class KubernetesScheduler implements IScheduler, IScalable {
private static final Logger LOG = Logger.getLogger(KubernetesScheduler.class.getName());
private Config configuration;
private Config runtimeConfiguration;
private KubernetesController controller;
private UpdateTopologyManager updateTopologyManager;
protected KubernetesController getController() {
return new AppsV1beta1Controller(configuration, runtimeConfiguration);
}
@Override
public void initialize(Config config, Config runtime) {
// validate the topology name before moving forward
if (!topologyNameIsValid(Runtime.topologyName(runtime))) {
throw new RuntimeException(getInvalidTopologyNameMessage(Runtime.topologyName(runtime)));
}
// validate that the image pull policy has been set correctly
if (!imagePullPolicyIsValid(KubernetesContext.getKubernetesImagePullPolicy(config))) {
throw new RuntimeException(
getInvalidImagePullPolicyMessage(KubernetesContext.getKubernetesImagePullPolicy(config))
);
}
final Config.Builder builder = Config.newBuilder()
.putAll(config);
if (config.containsKey(Key.TOPOLOGY_BINARY_FILE)) {
builder.put(Key.TOPOLOGY_BINARY_FILE,
FileUtils.getBaseName(Context.topologyBinaryFile(config)));
}
this.configuration = builder.build();
this.runtimeConfiguration = runtime;
this.controller = getController();
this.updateTopologyManager =
new UpdateTopologyManager(configuration, runtimeConfiguration,
Optional.<IScalable>of(this));
}
@Override
public void close() {
// Nothing to do here
}
@Override
public boolean onSchedule(PackingPlan packing) {
if (packing == null || packing.getContainers().isEmpty()) {
LOG.severe("No container requested. Can't schedule");
return false;
}
LOG.info("Submitting topology to Kubernetes");
return controller.submit(packing);
}
@Override
public List<String> getJobLinks() {
List<String> jobLinks = new LinkedList<>();
String kubernetesPodsLink = KubernetesContext.getSchedulerURI(configuration)
+ KubernetesConstants.JOB_LINK;
jobLinks.add(kubernetesPodsLink);
return jobLinks;
}
@Override
public boolean onKill(Scheduler.KillTopologyRequest request) {
return controller.killTopology();
}
@Override
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
final int appId = request.getContainerIndex();
return controller.restart(appId);
}
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
try {
updateTopologyManager.updateTopology(
request.getCurrentPackingPlan(), request.getProposedPackingPlan());
} catch (ExecutionException | InterruptedException e) {
LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
return false;
}
return true;
}
/**
* Add containers for a scale-up event from an update command
*
* @param containersToAdd the list of containers that need to be added
*
* NOTE: Due to the mechanics of Kubernetes pod creation, each container must be created on
* a one-by-one basis. If one container out of many containers to be deployed failed, it will
* leave the topology in a bad state.
*
* TODO (jrcrawfo) -- (https://github.com/apache/incubator-heron/issues/1981)
*/
@Override
public Set<PackingPlan.ContainerPlan>
addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
controller.addContainers(containersToAdd);
return containersToAdd;
}
/**
* Remove containers for a scale-down event from an update command
*
* @param containersToRemove the list of containers that need to be removed
*
* NOTE: Due to the mechanics of Kubernetes pod removal, each container must be removed on
* a one-by-one basis. If one container out of many containers to be removed failed, it will
* leave the topology in a bad state.
*
* TODO (jrcrawfo) -- (https://github.com/apache/incubator-heron/issues/1981)
*/
@Override
public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
controller.removeContainers(containersToRemove);
}
static boolean topologyNameIsValid(String topologyName) {
final Matcher matcher = KubernetesConstants.VALID_POD_NAME_REGEX.matcher(topologyName);
return matcher.matches();
}
static boolean imagePullPolicyIsValid(String imagePullPolicy) {
if (imagePullPolicy == null || imagePullPolicy.isEmpty()) {
return true;
}
return KubernetesConstants.VALID_IMAGE_PULL_POLICIES.contains(imagePullPolicy);
}
private static String getInvalidTopologyNameMessage(String topologyName) {
return String.format("Invalid topology name: \"%s\": "
+ "topology names in kubernetes must consist of lower case alphanumeric "
+ "characters, '-' or '.', and must start and end with an alphanumeric "
+ "character.", topologyName);
}
private static String getInvalidImagePullPolicyMessage(String policy) {
return String.format("Invalid image pull policy: \"%s\": image pull polices must be one of "
+ " %s Defaults to Always if :latest tag is specified, or IfNotPresent otherwise.",
policy, KubernetesConstants.VALID_IMAGE_PULL_POLICIES.toString());
}
}