Add ability to add Kubernetes Secrets and SecretKeyRefs (#3702)
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 4074fbc..71a660a 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -59,44 +59,49 @@
public static final String KUBERNETES_RESOURCE_REQUEST_MODE =
"heron.kubernetes.resource.request.mode";
- public static final String HERON_KUBERNETES_VOLUME_NAME = "heron.kubernetes.volume.name";
- public static final String HERON_KUBERNETES_VOLUME_TYPE = "heron.kubernetes.volume.type";
+ public static final String KUBERNETES_VOLUME_NAME = "heron.kubernetes.volume.name";
+ public static final String KUBERNETES_VOLUME_TYPE = "heron.kubernetes.volume.type";
// HostPath volume keys
// https://kubernetes.io/docs/concepts/storage/volumes/#hostpath
- public static final String HERON_KUBERNETES_VOLUME_HOSTPATH_PATH =
+ public static final String KUBERNETES_VOLUME_HOSTPATH_PATH =
"heron.kubernetes.volume.hostPath.path";
// nfs volume keys
// https://kubernetes.io/docs/concepts/storage/volumes/#nfs
- public static final String HERON_KUBERNETES_VOLUME_NFS_PATH =
+ public static final String KUBERNETES_VOLUME_NFS_PATH =
"heron.kubernetes.volume.nfs.path";
- public static final String HERON_KUBERNETES_VOLUME_NFS_SERVER =
+ public static final String KUBERNETES_VOLUME_NFS_SERVER =
"heron.kubernetes.volume.nfs.server";
// awsElasticBlockStore volume keys
// https://kubernetes.io/docs/concepts/storage/volumes/#awselasticblockstore
- public static final String HERON_KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID =
+ public static final String KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID =
"heron.kubernetes.volume.awsElasticBlockStore.volumeID";
- public static final String HERON_KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
+ public static final String KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
"heron.kubernetes.volume.awsElasticBlockStore.fsType";
// container mount volume mount keys
- public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME =
+ public static final String KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME =
"heron.kubernetes.container.volumeMount.name";
- public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
+ public static final String KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
"heron.kubernetes.container.volumeMount.path";
- public static final String HERON_KUBERNETES_POD_ANNOTATION =
+ public static final String KUBERNETES_POD_ANNOTATION_PREFIX =
"heron.kubernetes.pod.annotation.";
- public static final String HERON_KUBERNETES_SERVICE_ANNOTATION =
+ public static final String KUBERNETES_SERVICE_ANNOTATION_PREFIX =
"heron.kubernetes.service.annotation.";
- public static final String HERON_KUBERNETES_POD_LABEL =
- "heron.kubernetes.pod.label.";
- public static final String HERON_KUBERNETES_SERVICE_LABEL =
- "heron.kubernetes.service.label.";
-
+ public static final String KUBERNETES_POD_LABEL_PREFIX =
+ "heron.kubernetes.pod.label.";
+ public static final String KUBERNETES_SERVICE_LABEL_PREFIX =
+ "heron.kubernetes.service.label.";
+ // heron.kubernetes.pod.secret.heron-secret=/etc/secrets
+ public static final String KUBERNETES_POD_SECRET_PREFIX =
+ "heron.kubernetes.pod.secret.";
+ // heron.kubernetes.pod.secretKeyRef.ENV_NAME=name:key
+ public static final String KUBERNETES_POD_SECRET_KEY_REF_PREFIX =
+ "heron.kubernetes.pod.secretKeyRef.";
private KubernetesContext() {
}
@@ -128,31 +133,31 @@
}
static String getVolumeType(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_TYPE);
+ return config.getStringValue(KUBERNETES_VOLUME_TYPE);
}
static String getVolumeName(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_NAME);
+ return config.getStringValue(KUBERNETES_VOLUME_NAME);
}
static String getHostPathVolumePath(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_HOSTPATH_PATH);
+ return config.getStringValue(KUBERNETES_VOLUME_HOSTPATH_PATH);
}
static String getNfsVolumePath(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_NFS_PATH);
+ return config.getStringValue(KUBERNETES_VOLUME_NFS_PATH);
}
static String getNfsServer(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_NFS_SERVER);
+ return config.getStringValue(KUBERNETES_VOLUME_NFS_SERVER);
}
static String getAwsEbsVolumeId(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID);
+ return config.getStringValue(KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID);
}
static String getAwsEbsFsType(Config config) {
- return config.getStringValue(HERON_KUBERNETES_VOLUME_AWS_EBS_FS_TYPE);
+ return config.getStringValue(KUBERNETES_VOLUME_AWS_EBS_FS_TYPE);
}
static boolean hasVolume(Config config) {
@@ -160,27 +165,35 @@
}
static String getContainerVolumeName(Config config) {
- return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME);
+ return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME);
}
static String getContainerVolumeMountPath(Config config) {
- return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
+ return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
}
public static Map<String, String> getPodLabels(Config config) {
- return getConfigItemsByPrefix(config, HERON_KUBERNETES_POD_LABEL);
+ return getConfigItemsByPrefix(config, KUBERNETES_POD_LABEL_PREFIX);
}
public static Map<String, String> getServiceLabels(Config config) {
- return getConfigItemsByPrefix(config, HERON_KUBERNETES_SERVICE_LABEL);
+ return getConfigItemsByPrefix(config, KUBERNETES_SERVICE_LABEL_PREFIX);
}
public static Map<String, String> getPodAnnotations(Config config) {
- return getConfigItemsByPrefix(config, HERON_KUBERNETES_POD_ANNOTATION);
+ return getConfigItemsByPrefix(config, KUBERNETES_POD_ANNOTATION_PREFIX);
}
public static Map<String, String> getServiceAnnotations(Config config) {
- return getConfigItemsByPrefix(config, HERON_KUBERNETES_SERVICE_ANNOTATION);
+ return getConfigItemsByPrefix(config, KUBERNETES_SERVICE_ANNOTATION_PREFIX);
+ }
+
+ public static Map<String, String> getPodSecretsToMount(Config config) {
+ return getConfigItemsByPrefix(config, KUBERNETES_POD_SECRET_PREFIX);
+ }
+
+ public static Map<String, String> getPodSecretKeyRefs(Config config) {
+ return getConfigItemsByPrefix(config, KUBERNETES_POD_SECRET_KEY_REF_PREFIX);
}
static Set<String> getConfigKeys(Config config, String keyPrefix) {
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2bf815b..a236462 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -59,6 +60,8 @@
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1SecretKeySelector;
+import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
@@ -159,7 +162,8 @@
final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
throw new TopologyRuntimeManagementException(message, ae);
}
- final int currentContainerCount = statefulSet.getSpec().getReplicas();
+ final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+ final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
final int newContainerCount = currentContainerCount + containersToAdd.size();
try {
@@ -181,7 +185,9 @@
final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
throw new TopologyRuntimeManagementException(message, ae);
}
- final int currentContainerCount = statefulSet.getSpec().getReplicas();
+
+ final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+ final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
final int newContainerCount = currentContainerCount - containersToRemove.size();
try {
@@ -319,7 +325,6 @@
private V1Service createTopologyService() {
final String topologyName = getTopologyName();
- final Config runtimeConfiguration = getRuntimeConfiguration();
final V1Service service = new V1Service();
@@ -390,14 +395,12 @@
private Map<String, String> getPodAnnotations() {
Config config = getConfiguration();
- final Map<String, String> annotations = KubernetesContext.getPodAnnotations(config);
- return annotations;
+ return KubernetesContext.getPodAnnotations(config);
}
private Map<String, String> getServiceAnnotations() {
Config config = getConfiguration();
- final Map<String, String> annotations = KubernetesContext.getServiceAnnotations(config);
- return annotations;
+ return KubernetesContext.getServiceAnnotations(config);
}
private Map<String, String> getPrometheusAnnotations() {
@@ -444,6 +447,8 @@
addVolumesIfPresent(podSpec);
+ mountSecretsAsVolumes(podSpec);
+
return podSpec;
}
@@ -473,6 +478,25 @@
}
}
+ private void mountSecretsAsVolumes(V1PodSpec podSpec) {
+ final Config config = getConfiguration();
+ final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
+ for (Map.Entry<String, String> secret : secrets.entrySet()) {
+ final V1VolumeMount mount = new V1VolumeMount()
+ .name(secret.getKey())
+ .mountPath(secret.getValue());
+ final V1Volume secretVolume = new V1Volume()
+ .name(secret.getKey())
+ .secret(new V1SecretVolumeSourceBuilder()
+ .withSecretName(secret.getKey())
+ .build());
+ podSpec.addVolumesItem(secretVolume);
+ for (V1Container container : podSpec.getContainers()) {
+ container.addVolumeMountsItem(mount);
+ }
+ }
+ }
+
private V1Container getContainer(List<String> executorCommand, Resource resource,
int numberOfInstances) {
final Config configuration = getConfiguration();
@@ -500,8 +524,10 @@
.valueFrom(new V1EnvVarSource()
.fieldRef(new V1ObjectFieldSelector()
.fieldPath(KubernetesConstants.POD_NAME)));
- container.setEnv(Arrays.asList(envVarHost, envVarPodName));
+ container.addEnvItem(envVarHost);
+ container.addEnvItem(envVarPodName);
+ setSecretKeyRefs(container);
// set container resources
final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
@@ -573,6 +599,29 @@
}
}
+ private void setSecretKeyRefs(V1Container container) {
+ final Config config = getConfiguration();
+ final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
+ for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
+ final String[] keyRefParts = secret.getValue().split(":");
+ if (keyRefParts.length != 2) {
+ LOG.log(Level.SEVERE,
+ "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+ throw new TopologyRuntimeManagementException(
+ "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+ }
+ String name = keyRefParts[0];
+ String key = keyRefParts[1];
+ final V1EnvVar envVar = new V1EnvVar()
+ .name(secret.getKey())
+ .valueFrom(new V1EnvVarSource()
+ .secretKeyRef(new V1SecretKeySelector()
+ .key(key)
+ .name(name)));
+ container.addEnvItem(envVar);
+ }
+ }
+
public static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index 95f82f4..401c97f 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -39,8 +39,8 @@
public void testHostPathVolume() {
final String path = "/test/dir1";
final Config config = Config.newBuilder()
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_TYPE, "hostPath")
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_HOSTPATH_PATH, path)
+ .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "hostPath")
+ .put(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PATH, path)
.build();
final V1Volume volume = Volumes.get().create(config);
@@ -54,9 +54,9 @@
final String path = "/test/dir1";
final String server = "10.10.10.10";
final Config config = Config.newBuilder()
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_TYPE, "nfs")
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_NFS_PATH, path)
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_NFS_SERVER, server)
+ .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "nfs")
+ .put(KubernetesContext.KUBERNETES_VOLUME_NFS_PATH, path)
+ .put(KubernetesContext.KUBERNETES_VOLUME_NFS_SERVER, server)
.build();
final V1Volume volume = Volumes.get().create(config);
@@ -71,9 +71,9 @@
final String volumeId = "aws-ebs-1";
final String fsType = "ext4";
final Config config = Config.newBuilder()
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_TYPE, "awsElasticBlockStore")
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID, volumeId)
- .put(KubernetesContext.HERON_KUBERNETES_VOLUME_AWS_EBS_FS_TYPE, fsType)
+ .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "awsElasticBlockStore")
+ .put(KubernetesContext.KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID, volumeId)
+ .put(KubernetesContext.KUBERNETES_VOLUME_AWS_EBS_FS_TYPE, fsType)
.build();
final V1Volume volume = Volumes.get().create(config);