| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.heron.scheduler.kubernetes; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import org.apache.heron.api.utils.TopologyUtils; |
| import org.apache.heron.scheduler.TopologyRuntimeManagementException; |
| import org.apache.heron.scheduler.TopologySubmissionException; |
| import org.apache.heron.scheduler.utils.Runtime; |
| import org.apache.heron.scheduler.utils.SchedulerUtils; |
| import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort; |
| import org.apache.heron.spi.common.Config; |
| import org.apache.heron.spi.packing.PackingPlan; |
| import org.apache.heron.spi.packing.Resource; |
| |
| import io.kubernetes.client.custom.Quantity; |
| import io.kubernetes.client.custom.V1Patch; |
| import io.kubernetes.client.openapi.ApiClient; |
| import io.kubernetes.client.openapi.ApiException; |
| import io.kubernetes.client.openapi.Configuration; |
| import io.kubernetes.client.openapi.apis.AppsV1Api; |
| import io.kubernetes.client.openapi.apis.CoreV1Api; |
| import io.kubernetes.client.openapi.models.V1Container; |
| import io.kubernetes.client.openapi.models.V1ContainerPort; |
| import io.kubernetes.client.openapi.models.V1EnvVar; |
| import io.kubernetes.client.openapi.models.V1EnvVarSource; |
| import io.kubernetes.client.openapi.models.V1LabelSelector; |
| import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; |
| import io.kubernetes.client.openapi.models.V1ObjectMeta; |
| import io.kubernetes.client.openapi.models.V1PodSpec; |
| import io.kubernetes.client.openapi.models.V1PodTemplateSpec; |
| import io.kubernetes.client.openapi.models.V1ResourceRequirements; |
| import io.kubernetes.client.openapi.models.V1Service; |
| import io.kubernetes.client.openapi.models.V1ServiceSpec; |
| import io.kubernetes.client.openapi.models.V1StatefulSet; |
| import io.kubernetes.client.openapi.models.V1StatefulSetSpec; |
| import io.kubernetes.client.openapi.models.V1Toleration; |
| import io.kubernetes.client.openapi.models.V1Volume; |
| import io.kubernetes.client.openapi.models.V1VolumeMount; |
| import io.kubernetes.client.util.PatchUtils; |
| |
| import okhttp3.Response; |
| |
| import static java.net.HttpURLConnection.HTTP_NOT_FOUND; |
| |
| public class V1Controller extends KubernetesController { |
| |
| private static final Logger LOG = |
| Logger.getLogger(V1Controller.class.getName()); |
| |
| private static final String ENV_SHARD_ID = "SHARD_ID"; |
| |
| private final AppsV1Api appsClient; |
| private final CoreV1Api coreClient; |
| |
| V1Controller(Config configuration, Config runtimeConfiguration) { |
| super(configuration, runtimeConfiguration); |
| try { |
| final ApiClient apiClient = io.kubernetes.client.util.Config.defaultClient(); |
| Configuration.setDefaultApiClient(apiClient); |
| appsClient = new AppsV1Api(apiClient); |
| coreClient = new CoreV1Api(apiClient); |
| } catch (IOException e) { |
| LOG.log(Level.SEVERE, "Failed to setup Kubernetes client" + e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| boolean submit(PackingPlan packingPlan) { |
| final String topologyName = getTopologyName(); |
| if (!topologyName.equals(topologyName.toLowerCase())) { |
| throw new TopologySubmissionException("K8S scheduler does not allow upper case topologies."); |
| } |
| |
| final Resource containerResource = getContainerResource(packingPlan); |
| |
| final V1Service topologyService = createTopologyService(); |
| try { |
| final V1Service response = |
| coreClient.createNamespacedService(getNamespace(), topologyService, null, |
| null, null); |
| } catch (ApiException e) { |
| KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e); |
| throw new TopologySubmissionException(e.getMessage()); |
| } |
| |
| // find the max number of instances in a container so we can open |
| // enough ports if remote debugging is enabled. |
| int numberOfInstances = 0; |
| for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) { |
| numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size()); |
| } |
| final V1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances); |
| |
| try { |
| final V1StatefulSet response = |
| appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null, |
| null, null); |
| } catch (ApiException e) { |
| KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", e); |
| throw new TopologySubmissionException(e.getMessage()); |
| } |
| |
| return true; |
| } |
| |
| @Override |
| boolean killTopology() { |
| deleteStatefulSet(); |
| deleteService(); |
| return true; |
| } |
| |
| @Override |
| boolean restart(int shardId) { |
| final String message = "Restarting the whole topology is not supported yet. " |
| + "Please kill and resubmit the topology."; |
| LOG.log(Level.SEVERE, message); |
| return false; |
| } |
| |
| @Override |
| public Set<PackingPlan.ContainerPlan> |
| addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) { |
| final V1StatefulSet statefulSet; |
| try { |
| statefulSet = getStatefulSet(); |
| } catch (ApiException ae) { |
| final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody(); |
| throw new TopologyRuntimeManagementException(message, ae); |
| } |
| final int currentContainerCount = statefulSet.getSpec().getReplicas(); |
| final int newContainerCount = currentContainerCount + containersToAdd.size(); |
| |
| try { |
| patchStatefulSetReplicas(newContainerCount); |
| } catch (ApiException ae) { |
| throw new TopologyRuntimeManagementException( |
| ae.getMessage() + "\ndetails\n" + ae.getResponseBody()); |
| } |
| |
| return containersToAdd; |
| } |
| |
| @Override |
| public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) { |
| final V1StatefulSet statefulSet; |
| try { |
| statefulSet = getStatefulSet(); |
| } catch (ApiException ae) { |
| final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody(); |
| throw new TopologyRuntimeManagementException(message, ae); |
| } |
| final int currentContainerCount = statefulSet.getSpec().getReplicas(); |
| final int newContainerCount = currentContainerCount - containersToRemove.size(); |
| |
| try { |
| patchStatefulSetReplicas(newContainerCount); |
| } catch (ApiException e) { |
| throw new TopologyRuntimeManagementException( |
| e.getMessage() + "\ndetails\n" + e.getResponseBody()); |
| } |
| } |
| |
| private void patchStatefulSetReplicas(int replicas) throws ApiException { |
| final String body = |
| String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT, |
| replicas); |
| final V1Patch patch = new V1Patch(body); |
| |
| PatchUtils.patch(V1StatefulSet.class, |
| () -> |
| appsClient.patchNamespacedStatefulSetCall( |
| getTopologyName(), |
| getNamespace(), |
| patch, |
| null, |
| null, |
| null, |
| null, |
| null), |
| V1Patch.PATCH_FORMAT_JSON_PATCH, |
| appsClient.getApiClient()); |
| } |
| |
| private static final String JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT = |
| "[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%d}]"; |
| |
| V1StatefulSet getStatefulSet() throws ApiException { |
| return appsClient.readNamespacedStatefulSet(getTopologyName(), getNamespace(), |
| null, null, null); |
| } |
| |
| void deleteService() { |
| try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(), |
| getNamespace(), null, null, 0, null, |
| KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) { |
| |
| if (!response.isSuccessful()) { |
| if (response.code() == HTTP_NOT_FOUND) { |
| LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: " |
| + getTopologyName()); |
| return; |
| } |
| LOG.log(Level.SEVERE, "Error when deleting the Service of the job [" |
| + getTopologyName() + "] in namespace [" + getNamespace() + "]"); |
| LOG.log(Level.SEVERE, "Error killing topology message:" + response.message()); |
| KubernetesUtils.logResponseBodyIfPresent(LOG, response); |
| |
| throw new TopologyRuntimeManagementException( |
| KubernetesUtils.errorMessageFromResponse(response)); |
| } |
| } catch (ApiException e) { |
| if (e.getCode() == HTTP_NOT_FOUND) { |
| LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: " |
| + getTopologyName()); |
| return; |
| } |
| throw new TopologyRuntimeManagementException("Error deleting topology [" |
| + getTopologyName() + "] Kubernetes service", e); |
| } catch (IOException e) { |
| throw new TopologyRuntimeManagementException("Error deleting topology [" |
| + getTopologyName() + "] Kubernetes service", e); |
| } |
| LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName() |
| + "] in namespace [" + getNamespace() + "] is deleted."); |
| } |
| |
| void deleteStatefulSet() { |
| try (Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(), |
| getNamespace(), null, null, 0, null, |
| KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) { |
| |
| if (!response.isSuccessful()) { |
| if (response.code() == HTTP_NOT_FOUND) { |
| LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: " |
| + getTopologyName()); |
| return; |
| } |
| LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job [" |
| + getTopologyName() + "] in namespace [" + getNamespace() + "]"); |
| LOG.log(Level.SEVERE, "Error killing topology message: " + response.message()); |
| KubernetesUtils.logResponseBodyIfPresent(LOG, response); |
| |
| throw new TopologyRuntimeManagementException( |
| KubernetesUtils.errorMessageFromResponse(response)); |
| } |
| } catch (ApiException e) { |
| if (e.getCode() == HTTP_NOT_FOUND) { |
| LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: " |
| + getTopologyName()); |
| return; |
| } |
| throw new TopologyRuntimeManagementException("Error deleting topology [" |
| + getTopologyName() + "] Kubernetes StatefulSet", e); |
| } catch (IOException e) { |
| throw new TopologyRuntimeManagementException("Error deleting topology [" |
| + getTopologyName() + "] Kubernetes StatefulSet", e); |
| } |
| LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName() |
| + "] in namespace [" + getNamespace() + "] is deleted."); |
| } |
| |
| protected List<String> getExecutorCommand(String containerId) { |
| final Map<ExecutorPort, String> ports = |
| KubernetesConstants.EXECUTOR_PORTS.entrySet() |
| .stream() |
| .collect(Collectors.toMap(Map.Entry::getKey, |
| e -> e.getValue().toString())); |
| |
| final Config configuration = getConfiguration(); |
| final Config runtimeConfiguration = getRuntimeConfiguration(); |
| final String[] executorCommand = |
| SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration, |
| containerId, ports); |
| return Arrays.asList( |
| "sh", |
| "-c", |
| KubernetesUtils.getConfCommand(configuration) |
| + " && " + KubernetesUtils.getFetchCommand(configuration, runtimeConfiguration) |
| + " && " + setShardIdEnvironmentVariableCommand() |
| + " && " + String.join(" ", executorCommand) |
| ); |
| } |
| |
| private static String setShardIdEnvironmentVariableCommand() { |
| return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID); |
| } |
| |
| private V1Service createTopologyService() { |
| final String topologyName = getTopologyName(); |
| final Config runtimeConfiguration = getRuntimeConfiguration(); |
| |
| final V1Service service = new V1Service(); |
| |
| // setup service metadata |
| final V1ObjectMeta objectMeta = new V1ObjectMeta(); |
| objectMeta.name(topologyName); |
| objectMeta.annotations(getServiceAnnotations()); |
| service.setMetadata(objectMeta); |
| |
| // create the headless service |
| final V1ServiceSpec serviceSpec = new V1ServiceSpec(); |
| serviceSpec.clusterIP("None"); |
| serviceSpec.setSelector(getMatchLabels(topologyName)); |
| |
| service.setSpec(serviceSpec); |
| |
| return service; |
| } |
| |
| private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) { |
| final String topologyName = getTopologyName(); |
| final Config runtimeConfiguration = getRuntimeConfiguration(); |
| |
| final V1StatefulSet statefulSet = new V1StatefulSet(); |
| |
| // setup stateful set metadata |
| final V1ObjectMeta objectMeta = new V1ObjectMeta(); |
| objectMeta.name(topologyName); |
| statefulSet.metadata(objectMeta); |
| |
| // create the stateful set spec |
| final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec(); |
| statefulSetSpec.serviceName(topologyName); |
| statefulSetSpec.setReplicas(Runtime.numContainers(runtimeConfiguration).intValue()); |
| |
| // Parallel pod management tells the StatefulSet controller to launch or terminate |
| // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely |
| // terminated prior to launching or terminating another Pod. |
| statefulSetSpec.setPodManagementPolicy("Parallel"); |
| |
| // add selector match labels "app=heron" and "topology=topology-name" |
| // so the we know which pods to manage |
| final V1LabelSelector selector = new V1LabelSelector(); |
| selector.matchLabels(getMatchLabels(topologyName)); |
| statefulSetSpec.selector(selector); |
| |
| // create a pod template |
| final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec(); |
| |
| // set up pod meta |
| final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName)); |
| Map<String, String> annotations = new HashMap<>(); |
| annotations.putAll(getPodAnnotations()); |
| annotations.putAll(getPrometheusAnnotations()); |
| templateMetaData.annotations(annotations); |
| podTemplateSpec.setMetadata(templateMetaData); |
| |
| final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID); |
| podTemplateSpec.spec(getPodSpec(command, containerResource, numberOfInstances)); |
| |
| statefulSetSpec.setTemplate(podTemplateSpec); |
| |
| statefulSet.spec(statefulSetSpec); |
| |
| return statefulSet; |
| } |
| |
| private Map<String, String> getPodAnnotations() { |
| Config config = getConfiguration(); |
| final Map<String, String> annotations = KubernetesContext.getPodAnnotations(config); |
| return annotations; |
| } |
| |
| private Map<String, String> getServiceAnnotations() { |
| Config config = getConfiguration(); |
| final Map<String, String> annotations = KubernetesContext.getServiceAnnotations(config); |
| return annotations; |
| } |
| |
| private Map<String, String> getPrometheusAnnotations() { |
| final Map<String, String> annotations = new HashMap<>(); |
| annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true"); |
| annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT, |
| KubernetesConstants.PROMETHEUS_PORT); |
| |
| return annotations; |
| } |
| |
| private Map<String, String> getMatchLabels(String topologyName) { |
| final Map<String, String> labels = new HashMap<>(); |
| labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); |
| labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); |
| return labels; |
| } |
| |
| private Map<String, String> getLabels(String topologyName) { |
| final Map<String, String> labels = new HashMap<>(); |
| labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); |
| labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); |
| return labels; |
| } |
| |
| private V1PodSpec getPodSpec(List<String> executorCommand, Resource resource, |
| int numberOfInstances) { |
| final V1PodSpec podSpec = new V1PodSpec(); |
| |
| // set the termination period to 0 so pods can be deleted quickly |
| podSpec.setTerminationGracePeriodSeconds(0L); |
| |
| // set the pod tolerations so pods are rescheduled when nodes go down |
| // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions |
| podSpec.setTolerations(getTolerations()); |
| |
| podSpec.containers(Collections.singletonList( |
| getContainer(executorCommand, resource, numberOfInstances))); |
| |
| addVolumesIfPresent(podSpec); |
| |
| return podSpec; |
| } |
| |
| private List<V1Toleration> getTolerations() { |
| final List<V1Toleration> tolerations = new ArrayList<>(); |
| KubernetesConstants.TOLERATIONS.forEach(t -> { |
| final V1Toleration toleration = |
| new V1Toleration() |
| .key(t) |
| .operator("Exists") |
| .effect("NoExecute") |
| .tolerationSeconds(10L); |
| tolerations.add(toleration); |
| }); |
| |
| return tolerations; |
| } |
| |
| private void addVolumesIfPresent(V1PodSpec spec) { |
| final Config config = getConfiguration(); |
| if (KubernetesContext.hasVolume(config)) { |
| final V1Volume volume = Volumes.get().create(config); |
| if (volume != null) { |
| LOG.fine("Adding volume: " + volume.toString()); |
| spec.volumes(Collections.singletonList(volume)); |
| } |
| } |
| } |
| |
| private V1Container getContainer(List<String> executorCommand, Resource resource, |
| int numberOfInstances) { |
| final Config configuration = getConfiguration(); |
| final V1Container container = new V1Container().name("executor"); |
| |
| // set up the container images |
| container.setImage(KubernetesContext.getExecutorDockerImage(configuration)); |
| |
| // set up the container command |
| container.setCommand(executorCommand); |
| |
| if (KubernetesContext.hasImagePullPolicy(configuration)) { |
| container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration)); |
| } |
| |
| // setup the environment variables for the container |
| final V1EnvVar envVarHost = new V1EnvVar(); |
| envVarHost.name(KubernetesConstants.ENV_HOST) |
| .valueFrom(new V1EnvVarSource() |
| .fieldRef(new V1ObjectFieldSelector() |
| .fieldPath(KubernetesConstants.POD_IP))); |
| |
| final V1EnvVar envVarPodName = new V1EnvVar(); |
| envVarPodName.name(KubernetesConstants.ENV_POD_NAME) |
| .valueFrom(new V1EnvVarSource() |
| .fieldRef(new V1ObjectFieldSelector() |
| .fieldPath(KubernetesConstants.POD_NAME))); |
| container.setEnv(Arrays.asList(envVarHost, envVarPodName)); |
| |
| |
| // set container resources |
| final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements(); |
| // Set the Kubernetes container resource limit |
| final Map<String, Quantity> limits = new HashMap<>(); |
| limits.put(KubernetesConstants.MEMORY, |
| Quantity.fromString(KubernetesUtils.Megabytes( |
| resource.getRam()))); |
| limits.put(KubernetesConstants.CPU, |
| Quantity.fromString(Double.toString(roundDecimal( |
| resource.getCpu(), 3)))); |
| resourceRequirements.setLimits(limits); |
| KubernetesContext.KubernetesResourceRequestMode requestMode = |
| KubernetesContext.getKubernetesRequestMode(configuration); |
| // Set the Kubernetes container resource request |
| if (requestMode == KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) { |
| LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit"); |
| resourceRequirements.setRequests(limits); |
| } else { |
| LOG.log(Level.CONFIG, "Not setting K8s request because config was NOT_SET"); |
| } |
| container.setResources(resourceRequirements); |
| |
| // set container ports |
| final boolean debuggingEnabled = |
| TopologyUtils.getTopologyRemoteDebuggingEnabled( |
| Runtime.topology(getRuntimeConfiguration())); |
| container.setPorts(getContainerPorts(debuggingEnabled, numberOfInstances)); |
| |
| // setup volume mounts |
| mountVolumeIfPresent(container); |
| |
| return container; |
| } |
| |
| private List<V1ContainerPort> getContainerPorts(boolean remoteDebugEnabled, |
| int numberOfInstances) { |
| List<V1ContainerPort> ports = new ArrayList<>(); |
| KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> { |
| final V1ContainerPort port = new V1ContainerPort(); |
| port.setName(p.getName()); |
| port.setContainerPort(v); |
| ports.add(port); |
| }); |
| |
| |
| if (remoteDebugEnabled) { |
| IntStream.range(0, numberOfInstances).forEach(i -> { |
| final String portName = |
| KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i; |
| final V1ContainerPort port = new V1ContainerPort(); |
| port.setName(portName); |
| port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i); |
| ports.add(port); |
| }); |
| } |
| |
| return ports; |
| } |
| |
| private void mountVolumeIfPresent(V1Container container) { |
| final Config config = getConfiguration(); |
| if (KubernetesContext.hasContainerVolume(config)) { |
| final V1VolumeMount mount = |
| new V1VolumeMount() |
| .name(KubernetesContext.getContainerVolumeName(config)) |
| .mountPath(KubernetesContext.getContainerVolumeMountPath(config)); |
| container.volumeMounts(Collections.singletonList(mount)); |
| } |
| } |
| |
| public static double roundDecimal(double value, int places) { |
| double scale = Math.pow(10, places); |
| return Math.round(value * scale) / scale; |
| } |
| } |