Merge pull request #6688 from tvalentyn/cp6686

[BEAM-5744] Cherrypick PR 6686  to 2.8.0. release branch: 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 34f2edb..93dc6f0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -45,7 +46,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);
 
   private final ListeningExecutorService executor;
-  private final ServerConfiguration configuration;
+  @VisibleForTesting ServerConfiguration configuration;
   private final ServerFactory jobServerFactory;
   private final ServerFactory artifactServerFactory;
   private GrpcFnServer<InMemoryJobService> jobServer;
@@ -54,34 +55,34 @@
   /** Configuration for the jobServer. */
   public static class ServerConfiguration {
     @Option(name = "--job-host", usage = "The job server host name")
-    private String host = "";
+    String host = "localhost";
 
     @Option(
       name = "--job-port",
       usage = "The job service port. 0 to use a dynamic port. (Default: 8099)"
     )
-    private int port = 8099;
+    int port = 8099;
 
     @Option(
       name = "--artifact-port",
       usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)"
     )
-    private int artifactPort = 8098;
+    int artifactPort = 8098;
 
     @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
-    private String artifactStagingPath =
+    String artifactStagingPath =
         Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
 
     @Option(
       name = "--clean-artifacts-per-job",
       usage = "When true, remove each job's staged artifacts when it completes"
     )
-    private Boolean cleanArtifactsPerJob = false;
+    boolean cleanArtifactsPerJob = false;
 
     @Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
-    private String flinkMasterUrl = "[auto]";
+    String flinkMasterUrl = "[auto]";
 
-    public String getFlinkMasterUrl() {
+    String getFlinkMasterUrl() {
       return this.flinkMasterUrl;
     }
 
@@ -89,9 +90,9 @@
       name = "--sdk-worker-parallelism",
       usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
     )
-    private String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
+    String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
 
-    public String getSdkWorkerParallelism() {
+    String getSdkWorkerParallelism() {
       return this.sdkWorkerParallelism;
     }
   }
@@ -209,7 +210,7 @@
               .build();
       jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, jobServerFactory);
     }
-    LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+    LOG.info("JobService started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
     return jobServiceGrpcFnServer;
   }
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
new file mode 100644
index 0000000..fc44d8e
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.apache.beam.sdk.options.PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
+import static org.apache.beam.sdk.options.PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Charsets;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link FlinkJobServerDriver}. */
+public class FlinkJobServerDriverTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriverTest.class);
+
+  @Test
+  public void testConfigurationDefaults() {
+    FlinkJobServerDriver.ServerConfiguration config =
+        new FlinkJobServerDriver.ServerConfiguration();
+    assertThat(config.host, is("localhost"));
+    assertThat(config.port, is(8099));
+    assertThat(config.artifactPort, is(8098));
+    assertThat(config.flinkMasterUrl, is("[auto]"));
+    assertThat(config.sdkWorkerParallelism, is(SDK_WORKER_PARALLELISM_PIPELINE));
+    assertThat(config.cleanArtifactsPerJob, is(false));
+    FlinkJobServerDriver flinkJobServerDriver = FlinkJobServerDriver.fromConfig(config);
+    assertThat(flinkJobServerDriver, is(not(nullValue())));
+  }
+
+  @Test
+  public void testConfigurationFromArgs() {
+    FlinkJobServerDriver driver =
+        FlinkJobServerDriver.fromParams(
+            new String[] {
+              "--job-host=test",
+              "--job-port",
+              "42",
+              "--artifact-port",
+              "43",
+              "--flink-master-url=jobmanager",
+              "--sdk-worker-parallelism=stage",
+              "--clean-artifacts-per-job",
+            });
+    assertThat(driver.configuration.host, is("test"));
+    assertThat(driver.configuration.port, is(42));
+    assertThat(driver.configuration.artifactPort, is(43));
+    assertThat(driver.configuration.flinkMasterUrl, is("jobmanager"));
+    assertThat(driver.configuration.sdkWorkerParallelism, is(SDK_WORKER_PARALLELISM_STAGE));
+    assertThat(driver.configuration.cleanArtifactsPerJob, is(true));
+  }
+
+  @Test
+  public void testConfigurationFromConfig() {
+    FlinkJobServerDriver.ServerConfiguration config =
+        new FlinkJobServerDriver.ServerConfiguration();
+    FlinkJobServerDriver driver = FlinkJobServerDriver.fromConfig(config);
+    assertThat(driver.configuration, is(config));
+  }
+
+  @Test(timeout = 30_000)
+  public void testJobServerDriver() throws Exception {
+    FlinkJobServerDriver driver = null;
+    Thread driverThread = null;
+    final PrintStream oldOut = System.out;
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream newOut = new PrintStream(baos);
+    try {
+      System.setErr(newOut);
+      int freePort = getFreePort();
+      int freePort2 = getFreePort();
+      driver =
+          FlinkJobServerDriver.fromParams(
+              new String[] {
+                "--job-port", String.valueOf(freePort),
+                "--artifact-port", String.valueOf(freePort2)
+              });
+      driverThread = new Thread(driver);
+      driverThread.start();
+      boolean success = false;
+      while (!success) {
+        newOut.flush();
+        String output = baos.toString(Charsets.UTF_8.name());
+        if (output.contains("JobService started on localhost:" + freePort)
+            && output.contains("ArtifactStagingService started on localhost:" + freePort2)) {
+          success = true;
+        } else {
+          Thread.sleep(100);
+        }
+      }
+      assertThat(driverThread.isAlive(), is(true));
+    } finally {
+      System.setErr(oldOut);
+      if (driver != null) {
+        driver.stop();
+      }
+      if (driverThread != null) {
+        driverThread.interrupt();
+        driverThread.join();
+      }
+    }
+  }
+
+  private static int getFreePort() throws IOException {
+    try (ServerSocket socket = new ServerSocket(0)) {
+      return socket.getLocalPort();
+    }
+  }
+}