[BEAM-8387] Remove sdk-worker-parallelism option from JobServerDriver
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 24edb28..b6e040c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -70,9 +70,6 @@
}
PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class);
- if (portableOptions.getSdkWorkerParallelism() == 0L) {
- portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
- }
PortablePipelineRunner pipelineRunner;
if (portableOptions.getOutputExecutablePath() == null
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
index a90f7e6..1e345d0 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -43,7 +43,6 @@
assertThat(config.getArtifactPort(), is(8098));
assertThat(config.getExpansionPort(), is(8097));
assertThat(config.getFlinkMasterUrl(), is("[auto]"));
- assertThat(config.getSdkWorkerParallelism(), is(1L));
assertThat(config.isCleanArtifactsPerJob(), is(true));
FlinkJobServerDriver flinkJobServerDriver = FlinkJobServerDriver.fromConfig(config);
assertThat(flinkJobServerDriver, is(not(nullValue())));
@@ -62,7 +61,6 @@
"--expansion-port",
"44",
"--flink-master-url=jobmanager",
- "--sdk-worker-parallelism=4",
"--clean-artifacts-per-job=false",
});
FlinkJobServerDriver.FlinkServerConfiguration config =
@@ -72,7 +70,6 @@
assertThat(config.getArtifactPort(), is(43));
assertThat(config.getExpansionPort(), is(44));
assertThat(config.getFlinkMasterUrl(), is("jobmanager"));
- assertThat(config.getSdkWorkerParallelism(), is(4L));
assertThat(config.isCleanArtifactsPerJob(), is(false));
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index f8977ff..0c5bf94 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -94,15 +94,6 @@
handler = ExplicitBooleanOptionHandler.class)
private boolean cleanArtifactsPerJob = true;
- @Option(
- name = "--sdk-worker-parallelism",
- usage =
- "Default parallelism for SDK worker processes. This option is only applied when the "
- + "pipeline option sdkWorkerParallelism is set to 0."
- + "Default is 1, If 0, worker parallelism will be dynamically decided by runner."
- + "See also: sdkWorkerParallelism Pipeline Option")
- private long sdkWorkerParallelism = 1L;
-
public String getHost() {
return host;
}
@@ -126,10 +117,6 @@
public boolean isCleanArtifactsPerJob() {
return cleanArtifactsPerJob;
}
-
- public long getSdkWorkerParallelism() {
- return this.sdkWorkerParallelism;
- }
}
protected static ServerFactory createJobServerFactory(ServerConfiguration configuration) {