Merge pull request #6688 from tvalentyn/cp6686
[BEAM-5744] Cherrypick PR 6686 to 2.8.0. release branch:
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 34f2edb..93dc6f0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -45,7 +46,7 @@
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);
private final ListeningExecutorService executor;
- private final ServerConfiguration configuration;
+ @VisibleForTesting ServerConfiguration configuration;
private final ServerFactory jobServerFactory;
private final ServerFactory artifactServerFactory;
private GrpcFnServer<InMemoryJobService> jobServer;
@@ -54,34 +55,34 @@
/** Configuration for the jobServer. */
public static class ServerConfiguration {
@Option(name = "--job-host", usage = "The job server host name")
- private String host = "";
+ String host = "localhost";
@Option(
name = "--job-port",
usage = "The job service port. 0 to use a dynamic port. (Default: 8099)"
)
- private int port = 8099;
+ int port = 8099;
@Option(
name = "--artifact-port",
usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)"
)
- private int artifactPort = 8098;
+ int artifactPort = 8098;
@Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
- private String artifactStagingPath =
+ String artifactStagingPath =
Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
@Option(
name = "--clean-artifacts-per-job",
usage = "When true, remove each job's staged artifacts when it completes"
)
- private Boolean cleanArtifactsPerJob = false;
+ boolean cleanArtifactsPerJob = false;
@Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
- private String flinkMasterUrl = "[auto]";
+ String flinkMasterUrl = "[auto]";
- public String getFlinkMasterUrl() {
+ String getFlinkMasterUrl() {
return this.flinkMasterUrl;
}
@@ -89,9 +90,9 @@
name = "--sdk-worker-parallelism",
usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
)
- private String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
+ String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
- public String getSdkWorkerParallelism() {
+ String getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
}
@@ -209,7 +210,7 @@
.build();
jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, jobServerFactory);
}
- LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+ LOG.info("JobService started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
return jobServiceGrpcFnServer;
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
new file mode 100644
index 0000000..fc44d8e
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.apache.beam.sdk.options.PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
+import static org.apache.beam.sdk.options.PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Charsets;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link FlinkJobServerDriver}. */
+public class FlinkJobServerDriverTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriverTest.class);
+
+ @Test
+ public void testConfigurationDefaults() {
+ FlinkJobServerDriver.ServerConfiguration config =
+ new FlinkJobServerDriver.ServerConfiguration();
+ assertThat(config.host, is("localhost"));
+ assertThat(config.port, is(8099));
+ assertThat(config.artifactPort, is(8098));
+ assertThat(config.flinkMasterUrl, is("[auto]"));
+ assertThat(config.sdkWorkerParallelism, is(SDK_WORKER_PARALLELISM_PIPELINE));
+ assertThat(config.cleanArtifactsPerJob, is(false));
+ FlinkJobServerDriver flinkJobServerDriver = FlinkJobServerDriver.fromConfig(config);
+ assertThat(flinkJobServerDriver, is(not(nullValue())));
+ }
+
+ @Test
+ public void testConfigurationFromArgs() {
+ FlinkJobServerDriver driver =
+ FlinkJobServerDriver.fromParams(
+ new String[] {
+ "--job-host=test",
+ "--job-port",
+ "42",
+ "--artifact-port",
+ "43",
+ "--flink-master-url=jobmanager",
+ "--sdk-worker-parallelism=stage",
+ "--clean-artifacts-per-job",
+ });
+ assertThat(driver.configuration.host, is("test"));
+ assertThat(driver.configuration.port, is(42));
+ assertThat(driver.configuration.artifactPort, is(43));
+ assertThat(driver.configuration.flinkMasterUrl, is("jobmanager"));
+ assertThat(driver.configuration.sdkWorkerParallelism, is(SDK_WORKER_PARALLELISM_STAGE));
+ assertThat(driver.configuration.cleanArtifactsPerJob, is(true));
+ }
+
+ @Test
+ public void testConfigurationFromConfig() {
+ FlinkJobServerDriver.ServerConfiguration config =
+ new FlinkJobServerDriver.ServerConfiguration();
+ FlinkJobServerDriver driver = FlinkJobServerDriver.fromConfig(config);
+ assertThat(driver.configuration, is(config));
+ }
+
+ @Test(timeout = 30_000)
+ public void testJobServerDriver() throws Exception {
+ FlinkJobServerDriver driver = null;
+ Thread driverThread = null;
+ final PrintStream oldOut = System.out;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream newOut = new PrintStream(baos);
+ try {
+ System.setErr(newOut);
+ int freePort = getFreePort();
+ int freePort2 = getFreePort();
+ driver =
+ FlinkJobServerDriver.fromParams(
+ new String[] {
+ "--job-port", String.valueOf(freePort),
+ "--artifact-port", String.valueOf(freePort2)
+ });
+ driverThread = new Thread(driver);
+ driverThread.start();
+ boolean success = false;
+ while (!success) {
+ newOut.flush();
+ String output = baos.toString(Charsets.UTF_8.name());
+ if (output.contains("JobService started on localhost:" + freePort)
+ && output.contains("ArtifactStagingService started on localhost:" + freePort2)) {
+ success = true;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ assertThat(driverThread.isAlive(), is(true));
+ } finally {
+ System.setErr(oldOut);
+ if (driver != null) {
+ driver.stop();
+ }
+ if (driverThread != null) {
+ driverThread.interrupt();
+ driverThread.join();
+ }
+ }
+ }
+
+ private static int getFreePort() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ }
+}