[BEAM-8625] Implement servlet for exposing sdk harness statuses in Da… (#10553)
* [BEAM-8625] Implement servlet for exposing sdk harness statuses in Dataflow runner
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index a5fb660..5cc0db7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -21,7 +21,7 @@
import com.google.api.services.dataflow.model.WorkItem;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
@@ -46,6 +46,7 @@
import org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction;
import org.apache.beam.runners.dataflow.worker.graph.ReplacePgbkWithPrecombineFunction;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.sdk.fn.IdGenerator;
@@ -265,7 +266,7 @@
}
private static DebugCapture.Manager initializeAndStartDebugCaptureManager(
- DataflowWorkerHarnessOptions options, List<DebugCapture.Capturable> debugCapturePages) {
+ DataflowWorkerHarnessOptions options, Collection<Capturable> debugCapturePages) {
DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages);
result.start();
return result;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
index e636ba8..e3de7cb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
@@ -33,10 +33,12 @@
import org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingService;
import org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
+import org.apache.beam.runners.dataflow.worker.status.SdkWorkerStatusServlet;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -62,6 +64,7 @@
// critical traffic protected from best effort traffic.
ApiServiceDescriptor controlApiService = DataflowWorkerHarnessHelper.getControlDescriptor();
ApiServiceDescriptor loggingApiService = DataflowWorkerHarnessHelper.getLoggingDescriptor();
+ ApiServiceDescriptor statusApiService = DataflowWorkerHarnessHelper.getStatusDescriptor();
LOG.info(
"{} started, using port {} for control, {} for logging.",
@@ -93,6 +96,7 @@
Server servicesServer = null;
Server loggingServer = null;
+ Server statusServer = null;
try (BeamFnLoggingService beamFnLoggingService =
new BeamFnLoggingService(
loggingApiService,
@@ -110,6 +114,11 @@
controlApiService,
streamObserverFactory::from,
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
+ BeamWorkerStatusGrpcService beamWorkerStatusGrpcService =
+ statusApiService == null
+ ? null
+ : BeamWorkerStatusGrpcService.create(
+ statusApiService, GrpcContextHeaderAccessorProvider.getHeaderAccessor());
GrpcStateService beamFnStateService = GrpcStateService.create()) {
servicesServer =
@@ -120,22 +129,41 @@
loggingServer =
serverFactory.create(ImmutableList.of(beamFnLoggingService), loggingApiService);
+ // gRPC server for obtaining SDK harness runtime status information.
+ if (beamWorkerStatusGrpcService != null) {
+ statusServer =
+ serverFactory.create(ImmutableList.of(beamWorkerStatusGrpcService), statusApiService);
+ }
+
start(
pipeline,
pipelineOptions,
beamFnControlService,
beamFnDataService,
controlApiService,
- beamFnStateService);
+ beamFnStateService,
+ beamWorkerStatusGrpcService);
+
+ if (statusServer != null) {
+ statusServer.shutdown();
+ }
servicesServer.shutdown();
loggingServer.shutdown();
+
+ // wait 30 secs for outstanding requests to finish.
+ if (statusServer != null) {
+ statusServer.awaitTermination(30, TimeUnit.SECONDS);
+ }
+ servicesServer.awaitTermination(30, TimeUnit.SECONDS);
+ loggingServer.awaitTermination(30, TimeUnit.SECONDS);
} finally {
- if (servicesServer != null) {
- servicesServer.awaitTermination(30, TimeUnit.SECONDS);
+ if (statusServer != null && !statusServer.isTerminated()) {
+ statusServer.shutdownNow();
+ }
+ if (servicesServer != null && !servicesServer.isTerminated()) {
servicesServer.shutdownNow();
}
- if (loggingServer != null) {
- loggingServer.awaitTermination(30, TimeUnit.SECONDS);
+ if (loggingServer != null && !loggingServer.isTerminated()) {
loggingServer.shutdownNow();
}
}
@@ -148,7 +176,8 @@
BeamFnControlService beamFnControlService,
BeamFnDataGrpcService beamFnDataService,
ApiServiceDescriptor stateApiServiceDescriptor,
- GrpcStateService beamFnStateService)
+ GrpcStateService beamFnStateService,
+ BeamWorkerStatusGrpcService beamWorkerStatusGrpcService)
throws Exception {
SdkHarnessRegistry sdkHarnessRegistry =
@@ -161,6 +190,12 @@
StreamingDataflowWorker worker =
StreamingDataflowWorker.forStreamingFnWorkerHarness(
Collections.emptyList(), client, pipelineOptions, pipeline, sdkHarnessRegistry);
+ // Add SDK status servlet and capture page only if Fn worker status server is started.
+ if (beamWorkerStatusGrpcService != null) {
+ SdkWorkerStatusServlet sdkWorkerStatusServlet =
+ new SdkWorkerStatusServlet(beamWorkerStatusGrpcService);
+ worker.addWorkerStatusPage(sdkWorkerStatusServlet);
+ }
worker.startStatusPages();
worker.start();
ExecutorService executor = Executors.newSingleThreadExecutor();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 81faed0..f2b0b27 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -36,6 +36,7 @@
import java.io.PrintWriter;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
@@ -818,7 +819,7 @@
new TimerTask() {
@Override
public void run() {
- List<Capturable> pages = statusPages.getDebugCapturePages();
+ Collection<Capturable> pages = statusPages.getDebugCapturePages();
if (pages.isEmpty()) {
LOG.warn("No captured status pages.");
}
@@ -871,6 +872,13 @@
statusPages.start();
}
+ public void addWorkerStatusPage(BaseStatusServlet page) {
+ statusPages.addServlet(page);
+ if (page instanceof Capturable) {
+ statusPages.addCapturePage((Capturable) page);
+ }
+ }
+
public void stop() {
try {
if (globalConfigRefreshTimer != null) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java
index f02ddda..c6f873f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/DebugCapture.java
@@ -30,6 +30,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -99,13 +100,13 @@
private String project, job, host, region;
private Dataflow client = null;
private ScheduledExecutorService executor = null;
- private List<Capturable> capturables;
+ private Collection<Capturable> capturables;
private boolean enabled;
private long lastCaptureUsec = 0;
@VisibleForTesting Config captureConfig = new Config();
- public Manager(DataflowWorkerHarnessOptions options, List<Capturable> capturables) {
+ public Manager(DataflowWorkerHarnessOptions options, Collection<Capturable> capturables) {
try {
client = options.getDataflowClient();
} catch (Exception e) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/SdkWorkerStatusServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/SdkWorkerStatusServlet.java
new file mode 100644
index 0000000..b304952
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/SdkWorkerStatusServlet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.status;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
+import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Servlet dedicated to provide live status info retrieved from SDK Harness. Note this is different
+ * from {@link WorkerStatusPages} which incorporates all info for Dataflow runner including this
+ * SDKWorkerStatus page.
+ */
+public class SdkWorkerStatusServlet extends BaseStatusServlet implements Capturable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServlet.class);
+ private final transient BeamWorkerStatusGrpcService statusGrpcService;
+
+ public SdkWorkerStatusServlet(BeamWorkerStatusGrpcService statusGrpcService) {
+ super("sdk_status");
+ this.statusGrpcService = statusGrpcService;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ String id = request.getParameter("id");
+ if (Strings.isNullOrEmpty(id)) {
+ // return all connected sdk statuses if no id provided.
+ response.setContentType("text/html;charset=utf-8");
+ ServletOutputStream writer = response.getOutputStream();
+ try (PrintWriter out =
+ new PrintWriter(new OutputStreamWriter(writer, StandardCharsets.UTF_8))) {
+ captureData(out);
+ }
+ } else {
+ response.setContentType("text/plain;charset=utf-8");
+ ServletOutputStream writer = response.getOutputStream();
+ writer.println(statusGrpcService.getSingleWorkerStatus(id, 10, TimeUnit.SECONDS));
+ }
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.flushBuffer();
+ }
+
+ @Override
+ public String pageName() {
+ return "/sdk_status";
+ }
+
+ @Override
+ public void captureData(PrintWriter writer) {
+ Map<String, String> allStatuses = statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS);
+
+ writer.println("<html>");
+ writer.println("<h1>SDK harness</h1>");
+ // add links to each sdk section for easier navigation.
+ for (String sdkId : allStatuses.keySet()) {
+ writer.print(String.format("<a href=\"#%s\">%s</a> ", sdkId, sdkId));
+ }
+ writer.println();
+
+ for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
+ writer.println(String.format("<h2 id=\"%s\">%s</h2>", entry.getKey(), entry.getKey()));
+ writer.println("<a href=\"#top\">return to top</a>");
+ writer.println("<div style=\"white-space:pre-wrap\">");
+ writer.println(entry.getValue());
+ writer.println("</div>");
+ writer.println("");
+ }
+ writer.println("</html>");
+ }
+}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java
index cd3b9de..764a607 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/WorkerStatusPages.java
@@ -18,7 +18,8 @@
package org.apache.beam.runners.dataflow.worker.status;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.function.BooleanSupplier;
import javax.servlet.ServletException;
@@ -39,6 +40,7 @@
private static final Logger LOG = LoggerFactory.getLogger(WorkerStatusPages.class);
private final Server statusServer;
+ private final List<Capturable> capturePages;
private final StatuszServlet statuszServlet = new StatuszServlet();
private final ThreadzServlet threadzServlet = new ThreadzServlet();
private final ServletHandler servletHandler = new ServletHandler();
@@ -46,6 +48,7 @@
@VisibleForTesting
WorkerStatusPages(Server server, MemoryMonitor memoryMonitor, BooleanSupplier healthyIndicator) {
this.statusServer = server;
+ this.capturePages = new ArrayList<>();
this.statusServer.setHandler(servletHandler);
// Install the default servlets (threadz, healthz, heapz, statusz)
@@ -54,6 +57,9 @@
addServlet(new HeapzServlet(memoryMonitor));
addServlet(statuszServlet);
+ // Add default capture pages (threadz, statusz)
+ this.capturePages.add(threadzServlet);
+ this.capturePages.add(statuszServlet);
// Add some status pages
addStatusDataProvider("resources", "Resources", memoryMonitor);
}
@@ -107,8 +113,12 @@
}
/** Returns the set of pages than should be captured by DebugCapture. */
- public List<Capturable> getDebugCapturePages() {
- return Arrays.asList(threadzServlet, statuszServlet);
+ public Collection<Capturable> getDebugCapturePages() {
+ return this.capturePages;
+ }
+
+ public void addCapturePage(Capturable page) {
+ this.capturePages.add(page);
}
/** Redirect all invalid pages to /statusz. */