Added support for adding Kubernetes annotations to the topology pod and service (#3699)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index f938cd0..f6359b8 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -19,6 +19,11 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 
@@ -44,7 +49,7 @@
      * provided in the Resource Limit. This mode effectively guarantees the
      * cpu and memory will be reserved.
      */
-    EQUAL_TO_LIMIT;
+    EQUAL_TO_LIMIT
   }
   /**
    * This config item is used to determine how to configure the K8s Resource Request.
@@ -83,6 +88,11 @@
   public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
       "heron.kubernetes.container.volumeMount.path";
 
+  public static final String HERON_KUBERNETES_POD_ANNOTATION =
+      "heron.kubernetes.pod.annotation.";
+  public static final String HERON_KUBERNETES_SERVICE_ANNOTATION =
+      "heron.kubernetes.service.annotation.";
+
   private KubernetesContext() {
   }
 
@@ -152,6 +162,38 @@
     return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
   }
 
+  static Set<String> getConfigKeys(Config config, String keyPrefix) {
+    Set<String> annotations = new HashSet<>();
+    for (String s : config.getKeySet()) {
+      if (s.startsWith(keyPrefix)) {
+        annotations.add(s);
+      }
+    }
+    return annotations;
+  }
+
+  public static Map<String, String> getPodAnnotations(Config config) {
+    final Map<String, String> annotations = new HashMap<>();
+    final Set<String> keys = getConfigKeys(config, HERON_KUBERNETES_POD_ANNOTATION);
+    for (String s : keys) {
+      String value = config.getStringValue(s);
+      annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_POD_ANNOTATION,
+              ""), value);
+    }
+    return annotations;
+  }
+
+  public static Map<String, String> getServiceAnnotations(Config config) {
+    final Map<String, String> annotations = new HashMap<>();
+    final Set<String> keys = getConfigKeys(config, HERON_KUBERNETES_SERVICE_ANNOTATION);
+    for (String s : keys) {
+      String value = config.getStringValue(s);
+      annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_SERVICE_ANNOTATION,
+              ""), value);
+    }
+    return annotations;
+  }
+
   public static boolean hasContainerVolume(Config config) {
     final String name = getContainerVolumeName(config);
     final String path = getContainerVolumeMountPath(config);
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2056372..59399b6 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -104,7 +104,7 @@
 
     final Resource containerResource = getContainerResource(packingPlan);
 
-    final V1Service topologyService = createTopologyyService();
+    final V1Service topologyService = createTopologyService();
     try {
       final V1Service response =
           coreClient.createNamespacedService(getNamespace(), topologyService, null,
@@ -163,7 +163,7 @@
     final int newContainerCount = currentContainerCount + containersToAdd.size();
 
     try {
-      patchStatefulsetReplicas(newContainerCount);
+      patchStatefulSetReplicas(newContainerCount);
     } catch (ApiException ae) {
       throw new TopologyRuntimeManagementException(
           ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
@@ -185,14 +185,14 @@
     final int newContainerCount = currentContainerCount - containersToRemove.size();
 
     try {
-      patchStatefulsetReplicas(newContainerCount);
+      patchStatefulSetReplicas(newContainerCount);
     } catch (ApiException e) {
       throw new TopologyRuntimeManagementException(
           e.getMessage() + "\ndetails\n" + e.getResponseBody());
     }
   }
 
-  private void patchStatefulsetReplicas(int replicas) throws ApiException {
+  private void patchStatefulSetReplicas(int replicas) throws ApiException {
     final String body =
             String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
                     replicas);
@@ -317,7 +317,7 @@
     return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
   }
 
-  private V1Service createTopologyyService() {
+  private V1Service createTopologyService() {
     final String topologyName = getTopologyName();
     final Config runtimeConfiguration = getRuntimeConfiguration();
 
@@ -326,6 +326,7 @@
     // setup service metadata
     final V1ObjectMeta objectMeta = new V1ObjectMeta();
     objectMeta.name(topologyName);
+    objectMeta.annotations(getServiceAnnotations());
     service.setMetadata(objectMeta);
 
     // create the headless service
@@ -370,7 +371,10 @@
 
     // set up pod meta
     final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName));
-    templateMetaData.annotations(getPrometheusAnnotations());
+    Map<String, String> annotations = new HashMap<>();
+    annotations.putAll(getPodAnnotations());
+    annotations.putAll(getPrometheusAnnotations());
+    templateMetaData.annotations(annotations);
     podTemplateSpec.setMetadata(templateMetaData);
 
     final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID);
@@ -383,6 +387,18 @@
     return statefulSet;
   }
 
+  private Map<String, String> getPodAnnotations() {
+    Config config = getConfiguration();
+    final Map<String, String> annotations = KubernetesContext.getPodAnnotations(config);
+    return annotations;
+  }
+
+  private Map<String, String> getServiceAnnotations() {
+    Config config = getConfiguration();
+    final Map<String, String> annotations = KubernetesContext.getServiceAnnotations(config);
+    return annotations;
+  }
+
   private Map<String, String> getPrometheusAnnotations() {
     final Map<String, String> annotations = new HashMap<>();
     annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
@@ -529,7 +545,7 @@
     if (remoteDebugEnabled) {
       IntStream.range(0, numberOfInstances).forEach(i -> {
         final String portName =
-            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + String.valueOf(i);
+            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
         final V1ContainerPort port = new V1ContainerPort();
         port.setName(portName);
         port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);