Add ability to add Kubernetes Secrets and SecretKeyRefs (#3702)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 4074fbc..71a660a 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -59,44 +59,49 @@
   public static final String KUBERNETES_RESOURCE_REQUEST_MODE =
           "heron.kubernetes.resource.request.mode";
 
-  public static final String HERON_KUBERNETES_VOLUME_NAME = "heron.kubernetes.volume.name";
-  public static final String HERON_KUBERNETES_VOLUME_TYPE = "heron.kubernetes.volume.type";
+  public static final String KUBERNETES_VOLUME_NAME = "heron.kubernetes.volume.name";
+  public static final String KUBERNETES_VOLUME_TYPE = "heron.kubernetes.volume.type";
 
 
   // HostPath volume keys
   // https://kubernetes.io/docs/concepts/storage/volumes/#hostpath
-  public static final String HERON_KUBERNETES_VOLUME_HOSTPATH_PATH =
+  public static final String KUBERNETES_VOLUME_HOSTPATH_PATH =
       "heron.kubernetes.volume.hostPath.path";
 
   // nfs volume keys
   // https://kubernetes.io/docs/concepts/storage/volumes/#nfs
-  public static final String HERON_KUBERNETES_VOLUME_NFS_PATH =
+  public static final String KUBERNETES_VOLUME_NFS_PATH =
       "heron.kubernetes.volume.nfs.path";
-  public static final String HERON_KUBERNETES_VOLUME_NFS_SERVER =
+  public static final String KUBERNETES_VOLUME_NFS_SERVER =
       "heron.kubernetes.volume.nfs.server";
 
   // awsElasticBlockStore volume keys
   // https://kubernetes.io/docs/concepts/storage/volumes/#awselasticblockstore
-  public static final String HERON_KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID =
+  public static final String KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID =
       "heron.kubernetes.volume.awsElasticBlockStore.volumeID";
-  public static final String HERON_KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
+  public static final String KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
       "heron.kubernetes.volume.awsElasticBlockStore.fsType";
 
   // container mount volume mount keys
-  public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME =
+  public static final String KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME =
       "heron.kubernetes.container.volumeMount.name";
-  public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
+  public static final String KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
       "heron.kubernetes.container.volumeMount.path";
 
-  public static final String HERON_KUBERNETES_POD_ANNOTATION =
+  public static final String KUBERNETES_POD_ANNOTATION_PREFIX =
       "heron.kubernetes.pod.annotation.";
-  public static final String HERON_KUBERNETES_SERVICE_ANNOTATION =
+  public static final String KUBERNETES_SERVICE_ANNOTATION_PREFIX =
       "heron.kubernetes.service.annotation.";
-  public static final String HERON_KUBERNETES_POD_LABEL =
-          "heron.kubernetes.pod.label.";
-  public static final String HERON_KUBERNETES_SERVICE_LABEL =
-          "heron.kubernetes.service.label.";
-
+  public static final String KUBERNETES_POD_LABEL_PREFIX =
+      "heron.kubernetes.pod.label.";
+  public static final String KUBERNETES_SERVICE_LABEL_PREFIX =
+      "heron.kubernetes.service.label.";
+  // heron.kubernetes.pod.secret.heron-secret=/etc/secrets
+  public static final String KUBERNETES_POD_SECRET_PREFIX =
+      "heron.kubernetes.pod.secret.";
+  // heron.kubernetes.pod.secretKeyRef.ENV_NAME=name:key
+  public static final String KUBERNETES_POD_SECRET_KEY_REF_PREFIX =
+      "heron.kubernetes.pod.secretKeyRef.";
 
   private KubernetesContext() {
   }
@@ -128,31 +133,31 @@
   }
 
   static String getVolumeType(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_TYPE);
+    return config.getStringValue(KUBERNETES_VOLUME_TYPE);
   }
 
   static String getVolumeName(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_NAME);
+    return config.getStringValue(KUBERNETES_VOLUME_NAME);
   }
 
   static String getHostPathVolumePath(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_HOSTPATH_PATH);
+    return config.getStringValue(KUBERNETES_VOLUME_HOSTPATH_PATH);
   }
 
   static String getNfsVolumePath(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_NFS_PATH);
+    return config.getStringValue(KUBERNETES_VOLUME_NFS_PATH);
   }
 
   static String getNfsServer(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_NFS_SERVER);
+    return config.getStringValue(KUBERNETES_VOLUME_NFS_SERVER);
   }
 
   static String getAwsEbsVolumeId(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID);
+    return config.getStringValue(KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID);
   }
 
   static String getAwsEbsFsType(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_VOLUME_AWS_EBS_FS_TYPE);
+    return config.getStringValue(KUBERNETES_VOLUME_AWS_EBS_FS_TYPE);
   }
 
   static boolean hasVolume(Config config) {
@@ -160,27 +165,35 @@
   }
 
   static String getContainerVolumeName(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME);
+    return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME);
   }
 
   static String getContainerVolumeMountPath(Config config) {
-    return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
+    return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
   }
 
   public static Map<String, String> getPodLabels(Config config) {
-    return getConfigItemsByPrefix(config, HERON_KUBERNETES_POD_LABEL);
+    return getConfigItemsByPrefix(config, KUBERNETES_POD_LABEL_PREFIX);
   }
 
   public static Map<String, String> getServiceLabels(Config config) {
-    return getConfigItemsByPrefix(config, HERON_KUBERNETES_SERVICE_LABEL);
+    return getConfigItemsByPrefix(config, KUBERNETES_SERVICE_LABEL_PREFIX);
   }
 
   public static Map<String, String> getPodAnnotations(Config config) {
-    return getConfigItemsByPrefix(config, HERON_KUBERNETES_POD_ANNOTATION);
+    return getConfigItemsByPrefix(config, KUBERNETES_POD_ANNOTATION_PREFIX);
   }
 
   public static Map<String, String> getServiceAnnotations(Config config) {
-    return getConfigItemsByPrefix(config, HERON_KUBERNETES_SERVICE_ANNOTATION);
+    return getConfigItemsByPrefix(config, KUBERNETES_SERVICE_ANNOTATION_PREFIX);
+  }
+
+  public static Map<String, String> getPodSecretsToMount(Config config) {
+    return getConfigItemsByPrefix(config, KUBERNETES_POD_SECRET_PREFIX);
+  }
+
+  public static Map<String, String> getPodSecretKeyRefs(Config config) {
+    return getConfigItemsByPrefix(config, KUBERNETES_POD_SECRET_KEY_REF_PREFIX);
   }
 
   static Set<String> getConfigKeys(Config config, String keyPrefix) {
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2bf815b..a236462 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -26,6 +26,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -59,6 +60,8 @@
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1SecretKeySelector;
+import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
@@ -159,7 +162,8 @@
       final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
       throw new TopologyRuntimeManagementException(message, ae);
     }
-    final int currentContainerCount = statefulSet.getSpec().getReplicas();
+    final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+    final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
     final int newContainerCount = currentContainerCount + containersToAdd.size();
 
     try {
@@ -181,7 +185,9 @@
       final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
       throw new TopologyRuntimeManagementException(message, ae);
     }
-    final int currentContainerCount = statefulSet.getSpec().getReplicas();
+
+    final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+    final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
     final int newContainerCount = currentContainerCount - containersToRemove.size();
 
     try {
@@ -319,7 +325,6 @@
 
   private V1Service createTopologyService() {
     final String topologyName = getTopologyName();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
 
     final V1Service service = new V1Service();
 
@@ -390,14 +395,12 @@
 
   private Map<String, String> getPodAnnotations() {
     Config config = getConfiguration();
-    final Map<String, String> annotations = KubernetesContext.getPodAnnotations(config);
-    return annotations;
+    return KubernetesContext.getPodAnnotations(config);
   }
 
   private Map<String, String> getServiceAnnotations() {
     Config config = getConfiguration();
-    final Map<String, String> annotations = KubernetesContext.getServiceAnnotations(config);
-    return annotations;
+    return KubernetesContext.getServiceAnnotations(config);
   }
 
   private Map<String, String> getPrometheusAnnotations() {
@@ -444,6 +447,8 @@
 
     addVolumesIfPresent(podSpec);
 
+    mountSecretsAsVolumes(podSpec);
+
     return podSpec;
   }
 
@@ -473,6 +478,25 @@
     }
   }
 
+  private void mountSecretsAsVolumes(V1PodSpec podSpec) {
+    final Config config = getConfiguration();
+    final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
+    for (Map.Entry<String, String> secret : secrets.entrySet()) {
+      final V1VolumeMount mount = new V1VolumeMount()
+              .name(secret.getKey())
+              .mountPath(secret.getValue());
+      final V1Volume secretVolume = new V1Volume()
+              .name(secret.getKey())
+              .secret(new V1SecretVolumeSourceBuilder()
+                      .withSecretName(secret.getKey())
+                      .build());
+      podSpec.addVolumesItem(secretVolume);
+      for (V1Container container : podSpec.getContainers()) {
+        container.addVolumeMountsItem(mount);
+      }
+    }
+  }
+
   private V1Container getContainer(List<String> executorCommand, Resource resource,
       int numberOfInstances) {
     final Config configuration = getConfiguration();
@@ -500,8 +524,10 @@
         .valueFrom(new V1EnvVarSource()
             .fieldRef(new V1ObjectFieldSelector()
                 .fieldPath(KubernetesConstants.POD_NAME)));
-    container.setEnv(Arrays.asList(envVarHost, envVarPodName));
+    container.addEnvItem(envVarHost);
+    container.addEnvItem(envVarPodName);
 
+    setSecretKeyRefs(container);
 
     // set container resources
     final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
@@ -573,6 +599,29 @@
     }
   }
 
+  private void setSecretKeyRefs(V1Container container) {
+    final Config config = getConfiguration();
+    final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
+    for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
+      final String[] keyRefParts = secret.getValue().split(":");
+      if (keyRefParts.length != 2) {
+        LOG.log(Level.SEVERE,
+                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+        throw new TopologyRuntimeManagementException(
+                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+      }
+      String name = keyRefParts[0];
+      String key = keyRefParts[1];
+      final V1EnvVar envVar = new V1EnvVar()
+              .name(secret.getKey())
+                .valueFrom(new V1EnvVarSource()
+                  .secretKeyRef(new V1SecretKeySelector()
+                          .key(key)
+                          .name(name)));
+      container.addEnvItem(envVar);
+    }
+  }
+
   public static double roundDecimal(double value, int places) {
     double scale = Math.pow(10, places);
     return Math.round(value * scale) / scale;
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index 95f82f4..401c97f 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -39,8 +39,8 @@
   public void testHostPathVolume() {
     final String path = "/test/dir1";
     final Config config = Config.newBuilder()
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_TYPE, "hostPath")
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_HOSTPATH_PATH, path)
+        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "hostPath")
+        .put(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PATH, path)
         .build();
 
     final V1Volume volume = Volumes.get().create(config);
@@ -54,9 +54,9 @@
     final String path = "/test/dir1";
     final String server = "10.10.10.10";
     final Config config = Config.newBuilder()
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_TYPE, "nfs")
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_NFS_PATH, path)
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_NFS_SERVER, server)
+        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "nfs")
+        .put(KubernetesContext.KUBERNETES_VOLUME_NFS_PATH, path)
+        .put(KubernetesContext.KUBERNETES_VOLUME_NFS_SERVER, server)
         .build();
 
     final V1Volume volume = Volumes.get().create(config);
@@ -71,9 +71,9 @@
     final String volumeId = "aws-ebs-1";
     final String fsType = "ext4";
     final Config config = Config.newBuilder()
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_TYPE, "awsElasticBlockStore")
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID, volumeId)
-        .put(KubernetesContext.HERON_KUBERNETES_VOLUME_AWS_EBS_FS_TYPE, fsType)
+        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "awsElasticBlockStore")
+        .put(KubernetesContext.KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID, volumeId)
+        .put(KubernetesContext.KUBERNETES_VOLUME_AWS_EBS_FS_TYPE, fsType)
         .build();
 
     final V1Volume volume = Volumes.get().create(config);