Updated to support Kubernetes 1.16 (#3426)

* Updating to fix deprecated Kubernetes APIs

* Removing the old controller logic

* Formatting cleanup

* Trying to use latest k8s java client

* Updated to use the latest K8s client

* Updated tools k8s limits

* Fix for missing statefulset selector in Helm chart

* Updated to K8s client 7.0.0

* Code cleanup

* Updated gson and added gson-fire dependencies

* Updates to better handle k8s resource delete
diff --git a/WORKSPACE b/WORKSPACE
index 5853ef5..0c26015 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -537,8 +537,8 @@
 # end Pulsar Client
 
 # Kubernetes java client
-kubernetes_client_version = "1.0.0-beta1"
-squareup_okhttp_version = "2.7.5"
+kubernetes_client_version = "7.0.0"
+squareup_okhttp_version = "3.14.5"
 
 maven_jar(
   name = "kubernetes_java_client",
@@ -557,25 +557,25 @@
 
 maven_jar(
   name = "squareup_okhttp",
-  artifact = "com.squareup.okhttp:okhttp:" + squareup_okhttp_version
+  artifact = "com.squareup.okhttp3:okhttp:" + squareup_okhttp_version
 )
 maven_jar(
   name = "squareup_okio",
-  artifact = "com.squareup.okio:okio:1.6.0"
+  artifact = "com.squareup.okio:okio:1.17.2"
 )
 maven_jar(
   name = "squareup_okhttp_logging_interceptor",
-  artifact = "com.squareup.okhttp:logging-interceptor:" + squareup_okhttp_version
-)
-
-maven_jar(
-  name = "squareup_okhttp_ws",
-  artifact = "com.squareup.okhttp:okhttp-ws:" + squareup_okhttp_version
+  artifact = "com.squareup.okhttp3:logging-interceptor:" + squareup_okhttp_version
 )
 
 maven_jar(
   name = "google_gson",
-  artifact = "com.google.code.gson:gson:2.6.2"
+  artifact = "com.google.code.gson:gson:2.8.0"
+)
+
+maven_jar(
+  name = "io_gsonfire",
+  artifact = "io.gsonfire:gson-fire:1.8.3"
 )
 
 maven_jar(
diff --git a/deploy/kubernetes/general/apiserver.yaml b/deploy/kubernetes/general/apiserver.yaml
index b11f7ac..470dff3 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -45,7 +45,7 @@
 
 ---
 
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: heron-apiserver
diff --git a/deploy/kubernetes/general/bookkeeper.statefulset.yaml b/deploy/kubernetes/general/bookkeeper.statefulset.yaml
index 89aaf2d..e5c9fe2 100644
--- a/deploy/kubernetes/general/bookkeeper.statefulset.yaml
+++ b/deploy/kubernetes/general/bookkeeper.statefulset.yaml
@@ -36,7 +36,7 @@
     BK_useHostNameAsBookieID: "true"
 ---
 
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: StatefulSet
 metadata:
   name: bookie
@@ -44,6 +44,10 @@
     app: bookkeeper
     component: bookie
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookie
   serviceName: "bookkeeper"
   replicas: 3
   template:
@@ -145,11 +149,15 @@
 ## Auto-Recovery makes sure to restore the replication factor when any bookie
 ## crashes and it's not recovering on its own.
 ##
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: bookie-autorecovery
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookkeeper-replication
   replicas: 2
   template:
     metadata:
diff --git a/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml b/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml
index 219e3c2..90a158f 100644
--- a/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml
+++ b/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml
@@ -36,7 +36,7 @@
     BK_useHostNameAsBookieID: "true"
 ---
 
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: StatefulSet
 metadata:
   name: bookie
@@ -44,6 +44,10 @@
     app: bookkeeper
     component: bookie
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookie
   serviceName: "bookkeeper"
   replicas: 3
   template:
@@ -129,11 +133,15 @@
 ## Auto-Recovery makes sure to restore the replication factor when any bookie
 ## crashes and it's not recovering on its own.
 ##
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: bookie-autorecovery
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookkeeper-replication  
   replicas: 2
   template:
     metadata:
diff --git a/deploy/kubernetes/general/bookkeeper.yaml b/deploy/kubernetes/general/bookkeeper.yaml
index 4fd985b..2ba4891 100644
--- a/deploy/kubernetes/general/bookkeeper.yaml
+++ b/deploy/kubernetes/general/bookkeeper.yaml
@@ -38,7 +38,7 @@
 ## cannot be moved across different nodes.
 ## For this reason, we run BK as a daemon set, one for each node in the
 ## cluster, unless restricted by label selectors
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: DaemonSet
 metadata:
   name: bookie
@@ -46,6 +46,11 @@
     app: bookkeeper
     component: bookie
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookie
+      cluster: bookkeeper
   template:
     metadata:
       labels:
@@ -130,11 +135,15 @@
 ## Auto-Recovery makes sure to restore the replication factor when any bookie
 ## crashes and it's not recovering on its own.
 ##
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: bookie-autorecovery
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookkeeper-replication
   replicas: 2
   template:
     metadata:
diff --git a/deploy/kubernetes/general/tools.yaml b/deploy/kubernetes/general/tools.yaml
index d4ab5ce..a6497f1 100644
--- a/deploy/kubernetes/general/tools.yaml
+++ b/deploy/kubernetes/general/tools.yaml
@@ -18,12 +18,15 @@
 ##
 ## Deployment Pod for tracker and ui
 ##
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: heron-tracker
   namespace: default
 spec:
+  selector:
+    matchLabels:
+      app: heron-tracker
   template:
     metadata:
       labels:
@@ -57,6 +60,13 @@
               --name=localzk
               --hostport=zookeeper:2181
               --rootpath="/heron"
+          resources:
+            requests:
+              cpu: "100m"
+              memory: "200M"
+            limits:
+              cpu: "400m"
+              memory: "512M"
         - name: heron-ui
           image: heron/heron:latest
           ports:
@@ -68,6 +78,13 @@
               heron-ui
               --port=8889
               --base_url=/api/v1/namespaces/default/services/heron-ui:8889/proxy
+          resources:
+            requests:
+              cpu: "100m"
+              memory: "200M"
+            limits:
+              cpu: "400m"
+              memory: "512M"
 ---
 
 ##
diff --git a/deploy/kubernetes/general/zookeeper.yaml b/deploy/kubernetes/general/zookeeper.yaml
index c644621..835bee5 100644
--- a/deploy/kubernetes/general/zookeeper.yaml
+++ b/deploy/kubernetes/general/zookeeper.yaml
@@ -33,7 +33,7 @@
 ---
 
 ## Define a StatefulSet for ZK servers
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: StatefulSet
 metadata:
   name: zk
@@ -43,6 +43,10 @@
 spec:
   serviceName: zookeeper
   replicas: 1
+  selector:
+    matchLabels:
+      app: heron
+      component: zookeeper
   template:
     metadata:
       labels:
@@ -66,7 +70,7 @@
                 topologyKey: "kubernetes.io/hostname"
       containers:
         - name: zookeeper
-          image: heron/heron:0.16.2
+          image: heron/heron:latest
           command: ["sh", "-c"]
           args:
             - >
diff --git a/deploy/kubernetes/gke/gcs-apiserver.yaml b/deploy/kubernetes/gke/gcs-apiserver.yaml
index ee0b380..d268a0f 100644
--- a/deploy/kubernetes/gke/gcs-apiserver.yaml
+++ b/deploy/kubernetes/gke/gcs-apiserver.yaml
@@ -45,7 +45,7 @@
 
 ---
 
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: heron-apiserver
diff --git a/deploy/kubernetes/helm/templates/bookie.yaml b/deploy/kubernetes/helm/templates/bookie.yaml
index 7a76c56..ed01baf 100644
--- a/deploy/kubernetes/helm/templates/bookie.yaml
+++ b/deploy/kubernetes/helm/templates/bookie.yaml
@@ -57,10 +57,10 @@
 ## For this reason, we run BK as a daemon set, one for each node in the
 ## cluster, unless restricted by label selectors
 {{- if or (eq .Values.platform "gke") (eq .Values.platform "minikube") }}
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: StatefulSet
 {{- else }}
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: DaemonSet
 {{- end }}
 
@@ -70,6 +70,11 @@
     app: {{ .Release.Name }}-bookkeeper
     component: {{ .Release.Name }}-bookie
 spec:
+  selector:
+    matchLabels:
+      app: {{ .Release.Name }}-bookkeeper
+      component: {{ .Release.Name }}-bookie
+      cluster: {{ .Release.Name }}-bookkeeper
 {{- if or (eq .Values.platform "gke") (eq .Values.platform "minikube") }}
   serviceName: {{ .Release.Name }}-bookie
   replicas: {{ $bookieReplicas }}
diff --git a/deploy/kubernetes/helm/templates/tools.yaml b/deploy/kubernetes/helm/templates/tools.yaml
index 14455a8..6068b59 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -28,7 +28,7 @@
   HERON_APISERVER_MEM_MAX: {{ $apiServerMemory | quote }}
 ---
 
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: {{ .Release.Name }}-tools
@@ -38,6 +38,10 @@
     release: {{ .Release.Name }}
     heritage: {{ .Release.Service }}
 spec:
+  selector:
+    matchLabels:
+      app: {{ .Release.Name }}-tools
+      release: {{ .Release.Name }}
   template:
     metadata:
       labels:
@@ -80,8 +84,8 @@
               cpu: "100m"
               memory: "200M"
             limits:
-              cpu: "200m"
-              memory: "300M"
+              cpu: "400m"
+              memory: "512M"
           ports:
             - containerPort: 8888
               name: api-port
@@ -101,8 +105,8 @@
               cpu: "100m"
               memory: "200M"
             limits:
-              cpu: "200m"
-              memory: "300M"
+              cpu: "400m"
+              memory: "512M"
           ports:
             - containerPort: 8889
               name: app-port
diff --git a/deploy/kubernetes/helm/templates/zookeeper.yaml b/deploy/kubernetes/helm/templates/zookeeper.yaml
index 4e4f5b2..725957f 100644
--- a/deploy/kubernetes/helm/templates/zookeeper.yaml
+++ b/deploy/kubernetes/helm/templates/zookeeper.yaml
@@ -39,16 +39,18 @@
   {{- end }}
 
 ---
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: StatefulSet
 metadata:
   name: {{ .Release.Name }}-zookeeper
   labels:
     app: {{ .Release.Name }}-zookeeper
-    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
     release: {{ .Release.Name | quote }}
-    heritage: {{ .Release.Service | quote }}
 spec:
+  selector:
+    matchLabels:
+      app: {{ .Release.Name }}-zookeeper
+      release: {{ .Release.Name | quote }}    
   serviceName: {{ .Release.Name }}-zookeeper-headless
   replicas: {{ $zkReplicas }}
   template:
diff --git a/deploy/kubernetes/minikube/apiserver.yaml b/deploy/kubernetes/minikube/apiserver.yaml
index 2a89c1a..3c4a9b2 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -46,7 +46,7 @@
 
 ---
 
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: heron-apiserver
diff --git a/deploy/kubernetes/minikube/bookkeeper.yaml b/deploy/kubernetes/minikube/bookkeeper.yaml
index d5ec668..f5778c3 100644
--- a/deploy/kubernetes/minikube/bookkeeper.yaml
+++ b/deploy/kubernetes/minikube/bookkeeper.yaml
@@ -36,7 +36,7 @@
 ## cannot be moved across different nodes.
 ## For this reason, we run BK as a daemon set, one for each node in the
 ## cluster, unless restricted by label selectors
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: DaemonSet
 metadata:
   name: bookie
@@ -44,6 +44,11 @@
     app: bookkeeper
     component: bookie
 spec:
+  selector:
+    matchLabels:
+      app: bookkeeper
+      component: bookie
+      cluster: bookkeeper
   template:
     metadata:
       labels:
diff --git a/deploy/kubernetes/minikube/tools.yaml b/deploy/kubernetes/minikube/tools.yaml
index 535d9b6..1854407 100644
--- a/deploy/kubernetes/minikube/tools.yaml
+++ b/deploy/kubernetes/minikube/tools.yaml
@@ -18,12 +18,15 @@
 ##
 ## Deployment Pod for tracker and ui
 ##
-apiVersion: extensions/v1beta1
+apiVersion: apps/v1
 kind: Deployment
 metadata:
   name: heron-tracker
   namespace: default
 spec:
+  selector:
+    matchLabels:
+      app: heron-tracker
   template:
     metadata:
       labels:
@@ -43,6 +46,13 @@
               --name=kubernetes
               --hostport=zookeeper:2181
               --rootpath="/heron"
+          resources:
+            requests:
+              cpu: "100m"
+              memory: "200M"
+            limits:
+              cpu: "400m"
+              memory: "512M"
         - name: heron-ui
           image: heron/heron:latest
           ports:
@@ -54,6 +64,13 @@
               heron-ui
               --port=8889
               --base_url=/api/v1/namespaces/default/services/heron-ui:8889/proxy
+          resources:
+            requests:
+              cpu: "100m"
+              memory: "200M"
+            limits:
+              cpu: "400m"
+              memory: "512M"
 ---
 
 ##
diff --git a/deploy/kubernetes/minikube/zookeeper.yaml b/deploy/kubernetes/minikube/zookeeper.yaml
index f4e5f04..779db9d 100644
--- a/deploy/kubernetes/minikube/zookeeper.yaml
+++ b/deploy/kubernetes/minikube/zookeeper.yaml
@@ -20,7 +20,7 @@
 ##
 
 ## Define a StatefulSet for ZK servers
-apiVersion: apps/v1beta1
+apiVersion: apps/v1
 kind: StatefulSet
 metadata:
   name: zk
@@ -30,6 +30,10 @@
 spec:
   serviceName: zookeeper
   replicas: 1
+  selector:
+    matchLabels:
+      app: heron
+      component: zookeeper
   template:
     metadata:
       labels:
@@ -37,6 +41,20 @@
         component: zookeeper
 
     spec:
+      # Make sure multiple pods of ZK don't get scheduled on the
+      # same node, unless there are no other available nodes
+      affinity:
+        podAntiAffinity:
+          preferredDuringSchedulingIgnoredDuringExecution:
+            - weight: 1
+              podAffinityTerm:
+                labelSelector:
+                  matchExpressions:
+                    - key: "app"
+                      operator: In
+                      values:
+                        - zookeeper
+                topologyKey: "kubernetes.io/hostname"
       containers:
         - name: zookeeper
           image: heron/heron:latest
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java
similarity index 77%
rename from heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java
rename to heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java
index 2aea58d..be02c0c 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java
@@ -32,11 +32,6 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.squareup.okhttp.Response;
-
 import org.apache.heron.api.utils.TopologyUtils;
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 import org.apache.heron.scheduler.TopologySubmissionException;
@@ -47,39 +42,42 @@
 import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
 
-import io.kubernetes.client.ApiClient;
-import io.kubernetes.client.ApiException;
-import io.kubernetes.client.apis.AppsV1beta1Api;
-import io.kubernetes.client.models.V1Container;
-import io.kubernetes.client.models.V1ContainerPort;
-import io.kubernetes.client.models.V1DeleteOptions;
-import io.kubernetes.client.models.V1EnvVar;
-import io.kubernetes.client.models.V1EnvVarSource;
-import io.kubernetes.client.models.V1LabelSelector;
-import io.kubernetes.client.models.V1ObjectFieldSelector;
-import io.kubernetes.client.models.V1ObjectMeta;
-import io.kubernetes.client.models.V1PodSpec;
-import io.kubernetes.client.models.V1PodTemplateSpec;
-import io.kubernetes.client.models.V1ResourceRequirements;
-import io.kubernetes.client.models.V1Toleration;
-import io.kubernetes.client.models.V1Volume;
-import io.kubernetes.client.models.V1VolumeMount;
-import io.kubernetes.client.models.V1beta1StatefulSet;
-import io.kubernetes.client.models.V1beta1StatefulSetSpec;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1LabelSelector;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
 
-public class AppsV1beta1Controller extends KubernetesController {
+import okhttp3.Response;
+
+public class AppsV1Controller extends KubernetesController {
 
   private static final Logger LOG =
-      Logger.getLogger(AppsV1beta1Controller.class.getName());
+      Logger.getLogger(AppsV1Controller.class.getName());
 
   private static final String ENV_SHARD_ID = "SHARD_ID";
 
-  private final AppsV1beta1Api client;
+  private final AppsV1Api client;
 
-  AppsV1beta1Controller(Config configuration, Config runtimeConfiguration) {
+  AppsV1Controller(Config configuration, Config runtimeConfiguration) {
     super(configuration, runtimeConfiguration);
     final ApiClient apiClient = new ApiClient().setBasePath(getKubernetesUri());
-    client = new AppsV1beta1Api(apiClient);
+    client = new AppsV1Api(apiClient);
   }
 
   @Override
@@ -97,20 +95,13 @@
     for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
       numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
     }
-    final V1beta1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances);
+    final V1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances);
 
     try {
-      final Response response =
-          client.createNamespacedStatefulSetCall(getNamespace(), statefulSet, null,
-              null, null).execute();
-      if (!response.isSuccessful()) {
-        LOG.log(Level.SEVERE, "Error creating topology message: " + response.message());
-        KubernetesUtils.logResponseBodyIfPresent(LOG, response);
-        // construct a message based on the k8s API server response
-        throw new TopologySubmissionException(
-            KubernetesUtils.errorMessageFromResponse(response));
-      }
-    } catch (IOException | ApiException e) {
+      final V1StatefulSet response =
+          client.createNamespacedStatefulSet(getNamespace(), statefulSet, null,
+              null, null);
+    } catch (ApiException e) {
       KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", e);
       throw new TopologySubmissionException(e.getMessage());
     }
@@ -138,7 +129,7 @@
   @Override
   public Set<PackingPlan.ContainerPlan>
       addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
-    final V1beta1StatefulSet statefulSet;
+    final V1StatefulSet statefulSet;
     try {
       statefulSet = getStatefulSet();
     } catch (ApiException ae) {
@@ -148,7 +139,7 @@
     final int currentContainerCount = statefulSet.getSpec().getReplicas();
     final int newContainerCount = currentContainerCount + containersToAdd.size();
 
-    final V1beta1StatefulSetSpec newSpec = new V1beta1StatefulSetSpec();
+    final V1StatefulSetSpec newSpec = new V1StatefulSetSpec();
     newSpec.setReplicas(newContainerCount);
 
     try {
@@ -163,7 +154,7 @@
 
   @Override
   public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
-    final V1beta1StatefulSet statefulSet;
+    final V1StatefulSet statefulSet;
     try {
       statefulSet = getStatefulSet();
     } catch (ApiException ae) {
@@ -173,7 +164,7 @@
     final int currentContainerCount = statefulSet.getSpec().getReplicas();
     final int newContainerCount = currentContainerCount - containersToRemove.size();
 
-    final V1beta1StatefulSetSpec newSpec = new V1beta1StatefulSetSpec();
+    final V1StatefulSetSpec newSpec = new V1StatefulSetSpec();
     newSpec.setReplicas(newContainerCount);
 
     try {
@@ -184,37 +175,35 @@
     }
   }
 
-  private void doPatch(V1beta1StatefulSetSpec patchedSpec) throws ApiException {
+  private void doPatch(V1StatefulSetSpec patchedSpec) throws ApiException {
     final String body =
-        String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
-            patchedSpec.getReplicas().toString());
-    final ArrayList<JsonObject> arr = new ArrayList<>();
-    arr.add(((JsonElement) deserialize(body, JsonElement.class)).getAsJsonObject());
-    LOG.fine("Update body: " + arr);
-    client.patchNamespacedStatefulSet(getTopologyName(), getNamespace(), arr, null);
+            String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
+                    patchedSpec.getReplicas().toString());
+    final V1Patch patch = new V1Patch(body);
+    client.patchNamespacedStatefulSet(getTopologyName(),
+            getNamespace(), patch, null, null, null, null);
   }
 
   private static final String JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT =
       "{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}";
 
-  private Object deserialize(String jsonStr, Class<?> targetClass) {
-    return (new Gson()).fromJson(jsonStr, targetClass);
-  }
-
-  V1beta1StatefulSet getStatefulSet() throws ApiException {
+  V1StatefulSet getStatefulSet() throws ApiException {
     return client.readNamespacedStatefulSet(getTopologyName(), getNamespace(), null, null, null);
   }
 
   boolean deleteStatefulSet() {
     try {
-      final V1DeleteOptions options = new V1DeleteOptions();
-      options.setGracePeriodSeconds(0L);
-      options.setPropagationPolicy(KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY);
       final Response response = client.deleteNamespacedStatefulSetCall(getTopologyName(),
-          getNamespace(), options, null, null, null, null, null, null)
-          .execute();
+          getNamespace(), null, null, 0, null,
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
 
-      if (!response.isSuccessful()) {
+      if (response.isSuccessful()) {
+        LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
+            + "] in namespace [" + getNamespace() + "] is deleted.");
+        return true;
+      } else {
+        LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job ["
+            + getTopologyName() + "]: in namespace [" + getNamespace() + "]");
         LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
@@ -225,18 +214,15 @@
       KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", e);
       return false;
     }
-
-    return true;
   }
 
   boolean isStatefulSet() {
     try {
-      final Response response =
-          client.readNamespacedStatefulSetCall(getTopologyName(), getNamespace(),
-              null, null, null, null, null)
-              .execute();
-      return response.isSuccessful();
-    } catch (IOException | ApiException e) {
+      final V1StatefulSet response =
+          client.readNamespacedStatefulSet(getTopologyName(), getNamespace(),
+              null, null, null);
+      return response.getKind().equals("StatefulSet");
+    } catch (ApiException e) {
       LOG.warning("isStatefulSet check " +  e.getMessage());
     }
     return false;
@@ -269,11 +255,11 @@
   }
 
 
-  private V1beta1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) {
+  private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) {
     final String topologyName = getTopologyName();
     final Config runtimeConfiguration = getRuntimeConfiguration();
 
-    final V1beta1StatefulSet statefulSet = new V1beta1StatefulSet();
+    final V1StatefulSet statefulSet = new V1StatefulSet();
 
     // setup stateful set metadata
     final V1ObjectMeta objectMeta = new V1ObjectMeta();
@@ -281,7 +267,7 @@
     statefulSet.metadata(objectMeta);
 
     // create the stateful set spec
-    final V1beta1StatefulSetSpec statefulSetSpec = new V1beta1StatefulSetSpec();
+    final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec();
     statefulSetSpec.serviceName(topologyName);
     statefulSetSpec.setReplicas(Runtime.numContainers(runtimeConfiguration).intValue());
 
@@ -414,10 +400,11 @@
 
     // set container resources
     final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
-    final Map<String, String> requests = new HashMap<>();
+    final Map<String, Quantity> requests = new HashMap<>();
     requests.put(KubernetesConstants.MEMORY,
-        KubernetesUtils.Megabytes(resource.getRam()));
-    requests.put(KubernetesConstants.CPU, Double.toString(resource.getCpu()));
+        Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));
+    requests.put(KubernetesConstants.CPU,
+         Quantity.fromString(Double.toString(roundDecimal(resource.getCpu(), 3))));
     resourceRequirements.setRequests(requests);
     container.setResources(resourceRequirements);
 
@@ -468,4 +455,9 @@
       container.volumeMounts(Collections.singletonList(mount));
     }
   }
+
+  public static double roundDecimal(double value, int places) {
+    double scale = Math.pow(10, places);
+    return Math.round(value * scale) / scale;
+  }
 }
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java
index 4197535..2c10ba9 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java
@@ -23,13 +23,12 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.squareup.okhttp.Response;
-
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 
-import io.kubernetes.client.ApiClient;
-import io.kubernetes.client.ApiException;
-import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import okhttp3.Response;
 
 public class KubernetesCompat {
 
@@ -42,13 +41,20 @@
     try {
       final String labelSelector = KubernetesConstants.LABEL_TOPOLOGY + "=" + topology;
       final Response response =
-          client.deleteCollectionNamespacedPodCall(namespace, null, null, null, null,
-          labelSelector, null, null, null, null, null, null)
-          .execute();
-      if (!response.isSuccessful()) {
+          client.deleteCollectionNamespacedPodCall(namespace, null, null, null, null, null,
+          null, labelSelector, null, null,
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY,
+          null, null, null, null, null).execute();
+
+      if (response.isSuccessful()) {
+        LOG.log(Level.INFO, "Pods for the Job [" + topology
+            + "] in namespace [" + namespace + "] are deleted.");
+        return true;
+      } else {
+        LOG.log(Level.SEVERE, "Error when deleting the Pods of the job ["
+            + topology + "]: in namespace [" + namespace + "]");
         LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
-
         throw new TopologyRuntimeManagementException(
             KubernetesUtils.errorMessageFromResponse(response));
       }
@@ -59,7 +65,5 @@
       }
       return false;
     }
-
-    return true;
   }
 }
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
index bb6a919..71545be 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -50,7 +50,7 @@
   private UpdateTopologyManager updateTopologyManager;
 
   protected KubernetesController getController() {
-    return new AppsV1beta1Controller(configuration, runtimeConfiguration);
+    return new AppsV1Controller(configuration, runtimeConfiguration);
   }
 
   @Override
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index edf310a..5f963fd 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -23,15 +23,15 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.squareup.okhttp.Response;
-
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.scheduler.utils.Runtime;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 
-import io.kubernetes.client.ApiException;
+import io.kubernetes.client.openapi.ApiException;
+
+import okhttp3.Response;
 
 final class KubernetesUtils {
 
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
index 32194af..90f15bd 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
@@ -24,10 +24,10 @@
 
 import org.apache.heron.spi.common.Config;
 
-import io.kubernetes.client.models.V1AWSElasticBlockStoreVolumeSource;
-import io.kubernetes.client.models.V1HostPathVolumeSource;
-import io.kubernetes.client.models.V1NFSVolumeSource;
-import io.kubernetes.client.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1AWSElasticBlockStoreVolumeSource;
+import io.kubernetes.client.openapi.models.V1HostPathVolumeSource;
+import io.kubernetes.client.openapi.models.V1NFSVolumeSource;
+import io.kubernetes.client.openapi.models.V1Volume;
 
 final class Volumes {
 
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index 32ff5ec..95f82f4 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -24,7 +24,7 @@
 
 import org.apache.heron.spi.common.Config;
 
-import io.kubernetes.client.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1Volume;
 
 public class VolumesTests {
 
diff --git a/third_party/java/BUILD b/third_party/java/BUILD
index af66048..501830f 100644
--- a/third_party/java/BUILD
+++ b/third_party/java/BUILD
@@ -242,8 +242,8 @@
         "@squareup_okhttp//jar",
         "@squareup_okio//jar",
         "@squareup_okhttp_logging_interceptor//jar",
-        "@squareup_okhttp_ws//jar",
         "@google_gson//jar",
+        "@io_gsonfire//jar",
         "//third_party/java:joda_time",
         "@kubernetes_java_client_proto//jar",
         "@org_yaml_snakeyaml//jar",
@@ -258,8 +258,8 @@
         "@squareup_okhttp//jar",
         "@squareup_okio//jar",
         "@squareup_okhttp_logging_interceptor//jar",
-        "@squareup_okhttp_ws//jar",
         "@google_gson//jar",
+        "@io_gsonfire//jar",
         "//third_party/java:joda_time",
         "@kubernetes_java_client_proto//jar",
         "@org_yaml_snakeyaml//jar",