Improving Kubernetes scheduler logic (#3653)

* Added support for HTTP_NOT_FOUND response code
* Updated to use try-with-resources logic for Response cleanup
* More cleanup. Now throwing TopologyRuntimeManagementException in more places
diff --git a/WORKSPACE b/WORKSPACE
index 8e46253..f6c7a55 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -54,7 +54,7 @@
 
 jersey_version = "2.25.1"
 
-kubernetes_client_version = "8.0.0"
+kubernetes_client_version = "11.0.0"
 
 load("@rules_jvm_external//:defs.bzl", "maven_install")
 load("@rules_jvm_external//:specs.bzl", "maven")
@@ -263,6 +263,7 @@
 http_archive(
     name = "org_apache_zookeeper",
     build_file = "@//:third_party/zookeeper/BUILD",
+    sha256 = "bafc0abe7da696a2020ba11b8ce7d06f6e28e9bf1e5504de09be25b8b589777d",
     strip_prefix = "apache-zookeeper-3.5.8",
     urls = ["https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz"],
 )
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index f4efe2c..0fd62b0 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -70,6 +70,8 @@
 
 import okhttp3.Response;
 
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
 public class V1Controller extends KubernetesController {
 
   private static final Logger LOG =
@@ -219,54 +221,74 @@
         null, null, null);
   }
 
-  boolean deleteService() {
-    try {
-      final Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
+  void deleteService() {
+    try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
           getNamespace(), null, null, 0, null,
-          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
 
-      if (response.isSuccessful()) {
-        LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
-        return true;
-      } else {
+      if (!response.isSuccessful()) {
+        if (response.code() == HTTP_NOT_FOUND) {
+          LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
+                  + getTopologyName());
+          return;
+        }
         LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
-            + getTopologyName() + "] in namespace [" + getNamespace() + "]");
-        LOG.log(Level.SEVERE, "Error killing topoogy message:" + response.message());
+                + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+        LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
         throw new TopologyRuntimeManagementException(
-            KubernetesUtils.errorMessageFromResponse(response));
+                KubernetesUtils.errorMessageFromResponse(response));
       }
-    } catch (IOException | ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology service", e);
-      return false;
+    } catch (ApiException e) {
+      if (e.getCode() == HTTP_NOT_FOUND) {
+        LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
+                + getTopologyName());
+        return;
+      }
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes service", e);
+    } catch (IOException e) {
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes service", e);
     }
+    LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
+            + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
-  boolean deleteStatefulSet() {
-    try {
-      final Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
+  void deleteStatefulSet() {
+    try (Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
           getNamespace(), null, null, 0, null,
-          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
 
-      if (response.isSuccessful()) {
-        LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
-        return true;
-      } else {
+      if (!response.isSuccessful()) {
+        if (response.code() == HTTP_NOT_FOUND) {
+          LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
+                  + getTopologyName());
+          return;
+        }
         LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job ["
-            + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+                + getTopologyName() + "] in namespace [" + getNamespace() + "]");
         LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
         throw new TopologyRuntimeManagementException(
-            KubernetesUtils.errorMessageFromResponse(response));
+                KubernetesUtils.errorMessageFromResponse(response));
       }
-    } catch (IOException | ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", e);
-      return false;
+    } catch (ApiException e) {
+      if (e.getCode() == HTTP_NOT_FOUND) {
+        LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
+                + getTopologyName());
+        return;
+      }
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes StatefulSet", e);
+    } catch (IOException e) {
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes StatefulSet", e);
     }
+    LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
+            + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
   protected List<String> getExecutorCommand(String containerId) {