[BEAM-8625] Implement servlet for exposing sdk harness statuses in Da… (#10553)

* [BEAM-8625] Implement servlet for exposing sdk harness statuses in Dataflow runner
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index a5fb660..5cc0db7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -21,7 +21,7 @@
 import com.google.api.services.dataflow.model.WorkItem;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
 import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
@@ -46,6 +46,7 @@
 import org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction;
 import org.apache.beam.runners.dataflow.worker.graph.ReplacePgbkWithPrecombineFunction;
 import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import org.apache.beam.sdk.fn.IdGenerator;
@@ -265,7 +266,7 @@
   }
 
   private static DebugCapture.Manager initializeAndStartDebugCaptureManager(
-      DataflowWorkerHarnessOptions options, List<DebugCapture.Capturable> debugCapturePages) {
+      DataflowWorkerHarnessOptions options, Collection<Capturable> debugCapturePages) {
     DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages);
     result.start();
     return result;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
index e636ba8..e3de7cb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
@@ -33,10 +33,12 @@
 import org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingService;
 import org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
 import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
+import org.apache.beam.runners.dataflow.worker.status.SdkWorkerStatusServlet;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -62,6 +64,7 @@
     // critical traffic protected from best effort traffic.
     ApiServiceDescriptor controlApiService = DataflowWorkerHarnessHelper.getControlDescriptor();
     ApiServiceDescriptor loggingApiService = DataflowWorkerHarnessHelper.getLoggingDescriptor();
+    ApiServiceDescriptor statusApiService = DataflowWorkerHarnessHelper.getStatusDescriptor();
 
     LOG.info(
         "{} started, using port {} for control, {} for logging.",
@@ -93,6 +96,7 @@
 
     Server servicesServer = null;
     Server loggingServer = null;
+    Server statusServer = null;
     try (BeamFnLoggingService beamFnLoggingService =
             new BeamFnLoggingService(
                 loggingApiService,
@@ -110,6 +114,11 @@
                 controlApiService,
                 streamObserverFactory::from,
                 GrpcContextHeaderAccessorProvider.getHeaderAccessor());
+        BeamWorkerStatusGrpcService beamWorkerStatusGrpcService =
+            statusApiService == null
+                ? null
+                : BeamWorkerStatusGrpcService.create(
+                    statusApiService, GrpcContextHeaderAccessorProvider.getHeaderAccessor());
         GrpcStateService beamFnStateService = GrpcStateService.create()) {
 
       servicesServer =
@@ -120,22 +129,41 @@
       loggingServer =
           serverFactory.create(ImmutableList.of(beamFnLoggingService), loggingApiService);
 
+      // gRPC server for obtaining SDK harness runtime status information.
+      if (beamWorkerStatusGrpcService != null) {
+        statusServer =
+            serverFactory.create(ImmutableList.of(beamWorkerStatusGrpcService), statusApiService);
+      }
+
       start(
           pipeline,
           pipelineOptions,
           beamFnControlService,
           beamFnDataService,
           controlApiService,
-          beamFnStateService);
+          beamFnStateService,
+          beamWorkerStatusGrpcService);
+
+      if (statusServer != null) {
+        statusServer.shutdown();
+      }
       servicesServer.shutdown();
       loggingServer.shutdown();
+
+      // wait 30 secs for outstanding requests to finish.
+      if (statusServer != null) {
+        statusServer.awaitTermination(30, TimeUnit.SECONDS);
+      }
+      servicesServer.awaitTermination(30, TimeUnit.SECONDS);
+      loggingServer.awaitTermination(30, TimeUnit.SECONDS);
     } finally {
-      if (servicesServer != null) {
-        servicesServer.awaitTermination(30, TimeUnit.SECONDS);
+      if (statusServer != null && !statusServer.isTerminated()) {
+        statusServer.shutdownNow();
+      }
+      if (servicesServer != null && !servicesServer.isTerminated()) {
         servicesServer.shutdownNow();
       }
-      if (loggingServer != null) {
-        loggingServer.awaitTermination(30, TimeUnit.SECONDS);
+      if (loggingServer != null && !loggingServer.isTerminated()) {
         loggingServer.shutdownNow();
       }
     }
@@ -148,7 +176,8 @@
       BeamFnControlService beamFnControlService,
       BeamFnDataGrpcService beamFnDataService,
       ApiServiceDescriptor stateApiServiceDescriptor,
-      GrpcStateService beamFnStateService)
+      GrpcStateService beamFnStateService,
+      BeamWorkerStatusGrpcService beamWorkerStatusGrpcService)
       throws Exception {
 
     SdkHarnessRegistry sdkHarnessRegistry =
@@ -161,6 +190,12 @@
       StreamingDataflowWorker worker =
           StreamingDataflowWorker.forStreamingFnWorkerHarness(
               Collections.emptyList(), client, pipelineOptions, pipeline, sdkHarnessRegistry);
+      // Add SDK status servlet and capture page only if Fn worker status server is started.
+      if (beamWorkerStatusGrpcService != null) {
+        SdkWorkerStatusServlet sdkWorkerStatusServlet =
+            new SdkWorkerStatusServlet(beamWorkerStatusGrpcService);
+        worker.addWorkerStatusPage(sdkWorkerStatusServlet);
+      }
       worker.startStatusPages();
       worker.start();
       ExecutorService executor = Executors.newSingleThreadExecutor();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 81faed0..f2b0b27 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -36,6 +36,7 @@
 import java.io.PrintWriter;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
@@ -818,7 +819,7 @@
           new TimerTask() {
             @Override
             public void run() {
-              List<Capturable> pages = statusPages.getDebugCapturePages();
+              Collection<Capturable> pages = statusPages.getDebugCapturePages();
               if (pages.isEmpty()) {
                 LOG.warn("No captured status pages.");
               }
@@ -871,6 +872,13 @@
     statusPages.start();
   }
 
+  public void addWorkerStatusPage(BaseStatusServlet page) {
+    statusPages.addServlet(page);
+    if (page instanceof Capturable) {
+      statusPages.addCapturePage((Capturable) page);
+    }
+  }
+
   public void stop() {
     try {
       if (globalConfigRefreshTimer != null) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java
index f02ddda..c6f873f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java
@@ -30,6 +30,7 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -99,13 +100,13 @@
     private String project, job, host, region;
     private Dataflow client = null;
     private ScheduledExecutorService executor = null;
-    private List<Capturable> capturables;
+    private Collection<Capturable> capturables;
     private boolean enabled;
 
     private long lastCaptureUsec = 0;
     @VisibleForTesting Config captureConfig = new Config();
 
-    public Manager(DataflowWorkerHarnessOptions options, List<Capturable> capturables) {
+    public Manager(DataflowWorkerHarnessOptions options, Collection<Capturable> capturables) {
       try {
         client = options.getDataflowClient();
       } catch (Exception e) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/SdkWorkerStatusServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/SdkWorkerStatusServlet.java
new file mode 100644
index 0000000..b304952
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/SdkWorkerStatusServlet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.status;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
+import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Servlet dedicated to provide live status info retrieved from SDK Harness. Note this is different
+ * from {@link WorkerStatusPages} which incorporates all info for Dataflow runner including this
+ * SDKWorkerStatus page.
+ */
+public class SdkWorkerStatusServlet extends BaseStatusServlet implements Capturable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServlet.class);
+  private final transient BeamWorkerStatusGrpcService statusGrpcService;
+
+  public SdkWorkerStatusServlet(BeamWorkerStatusGrpcService statusGrpcService) {
+    super("sdk_status");
+    this.statusGrpcService = statusGrpcService;
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest request, HttpServletResponse response)
+      throws IOException, ServletException {
+    String id = request.getParameter("id");
+    if (Strings.isNullOrEmpty(id)) {
+      // return all connected sdk statuses if no id provided.
+      response.setContentType("text/html;charset=utf-8");
+      ServletOutputStream writer = response.getOutputStream();
+      try (PrintWriter out =
+          new PrintWriter(new OutputStreamWriter(writer, StandardCharsets.UTF_8))) {
+        captureData(out);
+      }
+    } else {
+      response.setContentType("text/plain;charset=utf-8");
+      ServletOutputStream writer = response.getOutputStream();
+      writer.println(statusGrpcService.getSingleWorkerStatus(id, 10, TimeUnit.SECONDS));
+    }
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.flushBuffer();
+  }
+
+  @Override
+  public String pageName() {
+    return "/sdk_status";
+  }
+
+  @Override
+  public void captureData(PrintWriter writer) {
+    Map<String, String> allStatuses = statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS);
+
+    writer.println("<html>");
+    writer.println("<h1>SDK harness</h1>");
+    // add links to each sdk section for easier navigation.
+    for (String sdkId : allStatuses.keySet()) {
+      writer.print(String.format("<a href=\"#%s\">%s</a> ", sdkId, sdkId));
+    }
+    writer.println();
+
+    for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
+      writer.println(String.format("<h2 id=\"%s\">%s</h2>", entry.getKey(), entry.getKey()));
+      writer.println("<a href=\"#top\">return to top</a>");
+      writer.println("<div style=\"white-space:pre-wrap\">");
+      writer.println(entry.getValue());
+      writer.println("</div>");
+      writer.println("");
+    }
+    writer.println("</html>");
+  }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java
index cd3b9de..764a607 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java
@@ -18,7 +18,8 @@
 package org.apache.beam.runners.dataflow.worker.status;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.function.BooleanSupplier;
 import javax.servlet.ServletException;
@@ -39,6 +40,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(WorkerStatusPages.class);
 
   private final Server statusServer;
+  private final List<Capturable> capturePages;
   private final StatuszServlet statuszServlet = new StatuszServlet();
   private final ThreadzServlet threadzServlet = new ThreadzServlet();
   private final ServletHandler servletHandler = new ServletHandler();
@@ -46,6 +48,7 @@
   @VisibleForTesting
   WorkerStatusPages(Server server, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) {
     this.statusServer = server;
+    this.capturePages = new ArrayList<>();
     this.statusServer.setHandler(servletHandler);
 
     // Install the default servlets (threadz, healthz, heapz, statusz)
@@ -54,6 +57,9 @@
     addServlet(new HeapzServlet(memoryMonitor));
     addServlet(statuszServlet);
 
+    // Add default capture pages (threadz, statusz)
+    this.capturePages.add(threadzServlet);
+    this.capturePages.add(statuszServlet);
     // Add some status pages
     addStatusDataProvider("resources", "Resources", memoryMonitor);
   }
@@ -107,8 +113,12 @@
   }
 
   /** Returns the set of pages than should be captured by DebugCapture. */
-  public List<Capturable> getDebugCapturePages() {
-    return Arrays.asList(threadzServlet, statuszServlet);
+  public Collection<Capturable> getDebugCapturePages() {
+    return this.capturePages;
+  }
+
+  public void addCapturePage(Capturable page) {
+    this.capturePages.add(page);
   }
 
   /** Redirect all invalid pages to /statusz. */