Added support for adding Kubernetes annotations to the topology pod and service (#3699)
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index f938cd0..f6359b8 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -19,6 +19,11 @@
package org.apache.heron.scheduler.kubernetes;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
@@ -44,7 +49,7 @@
* provided in the Resource Limit. This mode effectively guarantees the
* cpu and memory will be reserved.
*/
- EQUAL_TO_LIMIT;
+ EQUAL_TO_LIMIT
}
/**
* This config item is used to determine how to configure the K8s Resource Request.
@@ -83,6 +88,11 @@
public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
"heron.kubernetes.container.volumeMount.path";
+ public static final String HERON_KUBERNETES_POD_ANNOTATION =
+ "heron.kubernetes.pod.annotation.";
+ public static final String HERON_KUBERNETES_SERVICE_ANNOTATION =
+ "heron.kubernetes.service.annotation.";
+
private KubernetesContext() {
}
@@ -152,6 +162,38 @@
return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
}
+ static Set<String> getConfigKeys(Config config, String keyPrefix) {
+ Set<String> annotations = new HashSet<>();
+ for (String s : config.getKeySet()) {
+ if (s.startsWith(keyPrefix)) {
+ annotations.add(s);
+ }
+ }
+ return annotations;
+ }
+
+ public static Map<String, String> getPodAnnotations(Config config) {
+ final Map<String, String> annotations = new HashMap<>();
+ final Set<String> keys = getConfigKeys(config, HERON_KUBERNETES_POD_ANNOTATION);
+ for (String s : keys) {
+ String value = config.getStringValue(s);
+ annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_POD_ANNOTATION,
+ ""), value);
+ }
+ return annotations;
+ }
+
+ public static Map<String, String> getServiceAnnotations(Config config) {
+ final Map<String, String> annotations = new HashMap<>();
+ final Set<String> keys = getConfigKeys(config, HERON_KUBERNETES_SERVICE_ANNOTATION);
+ for (String s : keys) {
+ String value = config.getStringValue(s);
+ annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_SERVICE_ANNOTATION,
+ ""), value);
+ }
+ return annotations;
+ }
+
public static boolean hasContainerVolume(Config config) {
final String name = getContainerVolumeName(config);
final String path = getContainerVolumeMountPath(config);
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2056372..59399b6 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -104,7 +104,7 @@
final Resource containerResource = getContainerResource(packingPlan);
- final V1Service topologyService = createTopologyyService();
+ final V1Service topologyService = createTopologyService();
try {
final V1Service response =
coreClient.createNamespacedService(getNamespace(), topologyService, null,
@@ -163,7 +163,7 @@
final int newContainerCount = currentContainerCount + containersToAdd.size();
try {
- patchStatefulsetReplicas(newContainerCount);
+ patchStatefulSetReplicas(newContainerCount);
} catch (ApiException ae) {
throw new TopologyRuntimeManagementException(
ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
@@ -185,14 +185,14 @@
final int newContainerCount = currentContainerCount - containersToRemove.size();
try {
- patchStatefulsetReplicas(newContainerCount);
+ patchStatefulSetReplicas(newContainerCount);
} catch (ApiException e) {
throw new TopologyRuntimeManagementException(
e.getMessage() + "\ndetails\n" + e.getResponseBody());
}
}
- private void patchStatefulsetReplicas(int replicas) throws ApiException {
+ private void patchStatefulSetReplicas(int replicas) throws ApiException {
final String body =
String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
replicas);
@@ -317,7 +317,7 @@
return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
}
- private V1Service createTopologyyService() {
+ private V1Service createTopologyService() {
final String topologyName = getTopologyName();
final Config runtimeConfiguration = getRuntimeConfiguration();
@@ -326,6 +326,7 @@
// setup service metadata
final V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.name(topologyName);
+ objectMeta.annotations(getServiceAnnotations());
service.setMetadata(objectMeta);
// create the headless service
@@ -370,7 +371,10 @@
// set up pod meta
final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName));
- templateMetaData.annotations(getPrometheusAnnotations());
+ Map<String, String> annotations = new HashMap<>();
+ annotations.putAll(getPodAnnotations());
+ annotations.putAll(getPrometheusAnnotations());
+ templateMetaData.annotations(annotations);
podTemplateSpec.setMetadata(templateMetaData);
final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID);
@@ -383,6 +387,18 @@
return statefulSet;
}
+ private Map<String, String> getPodAnnotations() {
+ Config config = getConfiguration();
+ final Map<String, String> annotations = KubernetesContext.getPodAnnotations(config);
+ return annotations;
+ }
+
+ private Map<String, String> getServiceAnnotations() {
+ Config config = getConfiguration();
+ final Map<String, String> annotations = KubernetesContext.getServiceAnnotations(config);
+ return annotations;
+ }
+
private Map<String, String> getPrometheusAnnotations() {
final Map<String, String> annotations = new HashMap<>();
annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
@@ -529,7 +545,7 @@
if (remoteDebugEnabled) {
IntStream.range(0, numberOfInstances).forEach(i -> {
final String portName =
- KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + String.valueOf(i);
+ KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
final V1ContainerPort port = new V1ContainerPort();
port.setName(portName);
port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);