Improving Kubernetes scheduler logic (#3653)
* Added support for HTTP_NOT_FOUND response code
* Updated to use try-with-resources logic for Response cleanup
* More cleanup. Now throwing TopologyRuntimeManagementException in more places
diff --git a/WORKSPACE b/WORKSPACE
index 8e46253..f6c7a55 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -54,7 +54,7 @@
jersey_version = "2.25.1"
-kubernetes_client_version = "8.0.0"
+kubernetes_client_version = "11.0.0"
load("@rules_jvm_external//:defs.bzl", "maven_install")
load("@rules_jvm_external//:specs.bzl", "maven")
@@ -263,6 +263,7 @@
http_archive(
name = "org_apache_zookeeper",
build_file = "@//:third_party/zookeeper/BUILD",
+ sha256 = "bafc0abe7da696a2020ba11b8ce7d06f6e28e9bf1e5504de09be25b8b589777d",
strip_prefix = "apache-zookeeper-3.5.8",
urls = ["https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz"],
)
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index f4efe2c..0fd62b0 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -70,6 +70,8 @@
import okhttp3.Response;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
public class V1Controller extends KubernetesController {
private static final Logger LOG =
@@ -219,54 +221,74 @@
null, null, null);
}
- boolean deleteService() {
- try {
- final Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
+ void deleteService() {
+ try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
getNamespace(), null, null, 0, null,
- KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
+ KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
- if (response.isSuccessful()) {
- LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
- + "] in namespace [" + getNamespace() + "] is deleted.");
- return true;
- } else {
+ if (!response.isSuccessful()) {
+ if (response.code() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
+ + getTopologyName());
+ return;
+ }
LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
- + getTopologyName() + "] in namespace [" + getNamespace() + "]");
- LOG.log(Level.SEVERE, "Error killing topoogy message:" + response.message());
+ + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+ LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
KubernetesUtils.logResponseBodyIfPresent(LOG, response);
throw new TopologyRuntimeManagementException(
- KubernetesUtils.errorMessageFromResponse(response));
+ KubernetesUtils.errorMessageFromResponse(response));
}
- } catch (IOException | ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology service", e);
- return false;
+ } catch (ApiException e) {
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
+ + getTopologyName());
+ return;
+ }
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes service", e);
+ } catch (IOException e) {
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes service", e);
}
+ LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
+ + "] in namespace [" + getNamespace() + "] is deleted.");
}
- boolean deleteStatefulSet() {
- try {
- final Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
+ void deleteStatefulSet() {
+ try (Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
getNamespace(), null, null, 0, null,
- KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
+ KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
- if (response.isSuccessful()) {
- LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
- + "] in namespace [" + getNamespace() + "] is deleted.");
- return true;
- } else {
+ if (!response.isSuccessful()) {
+ if (response.code() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
+ + getTopologyName());
+ return;
+ }
LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job ["
- + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+ + getTopologyName() + "] in namespace [" + getNamespace() + "]");
LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
KubernetesUtils.logResponseBodyIfPresent(LOG, response);
throw new TopologyRuntimeManagementException(
- KubernetesUtils.errorMessageFromResponse(response));
+ KubernetesUtils.errorMessageFromResponse(response));
}
- } catch (IOException | ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", e);
- return false;
+ } catch (ApiException e) {
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
+ + getTopologyName());
+ return;
+ }
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes StatefulSet", e);
+ } catch (IOException e) {
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes StatefulSet", e);
}
+ LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
+ + "] in namespace [" + getNamespace() + "] is deleted.");
}
protected List<String> getExecutorCommand(String containerId) {