Updated K8s Scheduler to set Resource Limits and Requests (#3664)

What was previously used for K8s request is now used as a container limit.
There is an optional setting to either set or not set the K8s request to the same value.
diff --git a/deploy/kubernetes/helm/templates/tools.yaml b/deploy/kubernetes/helm/templates/tools.yaml
index 92c1a10..0a6fe15 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -161,6 +161,7 @@
               -D heron.class.packing.algorithm=org.apache.heron.packing.binpacking.FirstFitDecreasingPacking
               -D heron.class.repacking.algorithm=org.apache.heron.packing.binpacking.FirstFitDecreasingPacking
               {{- end }}
+              -D heron.kubernetes.resource.request.mode={{ .Values.topologyResourceRequestMode }}
           envFrom:
             - configMapRef:
                 name: {{ .Release.Name }}-tools-config
diff --git a/deploy/kubernetes/helm/values.yaml.template b/deploy/kubernetes/helm/values.yaml.template
index 6ba169b..9484d98 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -42,6 +42,10 @@
   # set to `-` to set base-url to the default k8s proxy URL
   # set to `null` to remove the use of base_url
   url: "-"
+
+# Can be EQUAL_TO_LIMIT or NOT_SET
+topologyResourceRequestMode: EQUAL_TO_LIMIT
+
 # Topologies uploader
 uploader:
   class: dlog # s3
diff --git a/heron/config/src/yaml/conf/kubernetes/scheduler.yaml b/heron/config/src/yaml/conf/kubernetes/scheduler.yaml
index 0f66a09..cbfaf3c 100644
--- a/heron/config/src/yaml/conf/kubernetes/scheduler.yaml
+++ b/heron/config/src/yaml/conf/kubernetes/scheduler.yaml
@@ -32,3 +32,9 @@
 
 # docker repo for executor
 heron.executor.docker.image:                'heron/heron:latest'
+
+# logic to be used for calculating the Kubernetes resource request and limits
+# value can be "NOT_SET", "EQUAL_TO_LIMIT"
+# "NOT_SET" will not set a K8s request
+# "EQUAL_TO_LIMIT" will set the K8s request to the same values as the K8s limit
+heron.kubernetes.resource.request.mode:  EQUAL_TO_LIMIT
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 34b50a8..f938cd0 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -33,6 +33,26 @@
   public static final String HERON_KUBERNETES_SCHEDULER_IMAGE_PULL_POLICY =
       "heron.kubernetes.scheduler.imagePullPolicy";
 
+  public enum KubernetesResourceRequestMode {
+    /**
+     * The Kubernetes Request will not be set by the Heron Kubernetes Scheduler.
+     * The generated pods will use the Kubernetes default values.
+     */
+    NOT_SET,
+    /**
+     * The Kubernetes Pod Resource Request will be set to the same values as
+     * provided in the Resource Limit. This mode effectively guarantees the
+     * cpu and memory will be reserved.
+     */
+    EQUAL_TO_LIMIT;
+  }
+  /**
+   * This config item is used to determine how to configure the K8s Resource Request.
+   * The format of this flag is the string encoded values of the
+   * underlying KubernetesRequestMode value.
+   */
+  public static final String KUBERNETES_RESOURCE_REQUEST_MODE =
+          "heron.kubernetes.resource.request.mode";
 
   public static final String HERON_KUBERNETES_VOLUME_NAME = "heron.kubernetes.volume.name";
   public static final String HERON_KUBERNETES_VOLUME_TYPE = "heron.kubernetes.volume.type";
@@ -87,6 +107,11 @@
     return isNotEmpty(imagePullPolicy);
   }
 
+  public static KubernetesResourceRequestMode getKubernetesRequestMode(Config config) {
+    return KubernetesResourceRequestMode.valueOf(
+            config.getStringValue(KUBERNETES_RESOURCE_REQUEST_MODE));
+  }
+
   static String getVolumeType(Config config) {
     return config.getStringValue(HERON_KUBERNETES_VOLUME_TYPE);
   }
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesController.java
index 9f16693..69ef84b 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesController.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesController.java
@@ -76,4 +76,5 @@
   abstract boolean killTopology();
 
   abstract boolean restart(int shardId);
+
 }
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 0fd62b0..2056372 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -483,12 +483,24 @@
 
     // set container resources
     final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
-    final Map<String, Quantity> requests = new HashMap<>();
-    requests.put(KubernetesConstants.MEMORY,
-        Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));
-    requests.put(KubernetesConstants.CPU,
-         Quantity.fromString(Double.toString(roundDecimal(resource.getCpu(), 3))));
-    resourceRequirements.setRequests(requests);
+    // Set the Kubernetes container resource limit
+    final Map<String, Quantity> limits = new HashMap<>();
+    limits.put(KubernetesConstants.MEMORY,
+            Quantity.fromString(KubernetesUtils.Megabytes(
+                    resource.getRam())));
+    limits.put(KubernetesConstants.CPU,
+            Quantity.fromString(Double.toString(roundDecimal(
+                    resource.getCpu(), 3))));
+    resourceRequirements.setLimits(limits);
+    KubernetesContext.KubernetesResourceRequestMode requestMode =
+            KubernetesContext.getKubernetesRequestMode(configuration);
+    // Set the Kubernetes container resource request
+    if (requestMode == KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) {
+      LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit");
+      resourceRequirements.setRequests(limits);
+    } else {
+      LOG.log(Level.CONFIG, "Not setting K8s request because config was NOT_SET");
+    }
     container.setResources(resourceRequirements);
 
     // set container ports