Fixed Kubernetes scheduler to provide Java remote debug ports (#3704)
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index e411818..be45918 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -68,7 +68,7 @@
public static final int CHECKPOINT_MGR_PORT = 6009;
// port number the start with when more than one port needed for remote debugging
public static final int JVM_REMOTE_DEBUGGER_PORT = 6010;
- public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "remote-debugger";
+ public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "rmt-debug";
public static final Map<ExecutorPort, Integer> EXECUTOR_PORTS = new HashMap<>();
static {
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index a236462..747ed3c 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -109,8 +110,7 @@
final V1Service topologyService = createTopologyService();
try {
- final V1Service response =
- coreClient.createNamespacedService(getNamespace(), topologyService, null,
+ coreClient.createNamespacedService(getNamespace(), topologyService, null,
null, null);
} catch (ApiException e) {
KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e);
@@ -126,8 +126,7 @@
final V1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances);
try {
- final V1StatefulSet response =
- appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null,
+ appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null,
null, null);
} catch (ApiException e) {
KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", e);
@@ -297,15 +296,26 @@
+ "] in namespace [" + getNamespace() + "] is deleted.");
}
- protected List<String> getExecutorCommand(String containerId) {
+ protected List<String> getExecutorCommand(String containerId, int numOfInstances) {
+ final Config configuration = getConfiguration();
+ final Config runtimeConfiguration = getRuntimeConfiguration();
final Map<ExecutorPort, String> ports =
KubernetesConstants.EXECUTOR_PORTS.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getValue().toString()));
- final Config configuration = getConfiguration();
- final Config runtimeConfiguration = getRuntimeConfiguration();
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))
+ && numOfInstances != 0) {
+ List<String> remoteDebuggingPorts = new LinkedList<>();
+ IntStream.range(0, numOfInstances).forEach(i -> {
+ int port = KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i;
+ remoteDebuggingPorts.add(String.valueOf(port));
+ });
+ ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+ String.join(",", remoteDebuggingPorts));
+ }
+
final String[] executorCommand =
SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
containerId, ports);
@@ -383,7 +393,7 @@
templateMetaData.annotations(annotations);
podTemplateSpec.setMetadata(templateMetaData);
- final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID);
+ final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID, numberOfInstances);
podTemplateSpec.spec(getPodSpec(command, containerResource, numberOfInstances));
statefulSetSpec.setTemplate(podTemplateSpec);
@@ -573,7 +583,6 @@
ports.add(port);
});
-
if (remoteDebugEnabled) {
IntStream.range(0, numberOfInstances).forEach(i -> {
final String portName =