[BEAM-8594] Remove unnecessary error check in DataFlow Runner
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
index 7e298e7..dcf15f4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFn.java
@@ -56,7 +56,6 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
@@ -253,7 +252,7 @@
}
// Check to see if processing the request failed.
- throwIfFailure(processResponse);
+ MoreFutures.get(processResponse);
waitForInboundTermination.awaitCompletion();
WindowedValue<KV<byte[], TargetWindowT>> sideInputWindow = outputValue.poll();
@@ -300,22 +299,10 @@
processBundleDescriptor.toBuilder().setId(descriptorId).build())
.build())
.build());
- throwIfFailure(response);
+ // Check if the bundle descriptor is registered successfully.
+ MoreFutures.get(response);
processBundleDescriptorId = descriptorId;
}
return processBundleDescriptorId;
}
-
- private static InstructionResponse throwIfFailure(
- CompletionStage<InstructionResponse> responseFuture)
- throws ExecutionException, InterruptedException {
- InstructionResponse response = MoreFutures.get(responseFuture);
- if (!Strings.isNullOrEmpty(response.getError())) {
- throw new IllegalStateException(
- String.format(
- "Client failed to process %s with error [%s].",
- response.getInstructionId(), response.getError()));
- }
- return response;
- }
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 0a3346c..bf42c4d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -370,12 +370,8 @@
* elements consumed from the upstream read operation.
*
* <p>May be called at any time, including before start() and after finish().
- *
- * @throws InterruptedException
- * @throws ExecutionException
*/
- public CompletionStage<BeamFnApi.ProcessBundleProgressResponse> getProcessBundleProgress()
- throws InterruptedException, ExecutionException {
+ public CompletionStage<BeamFnApi.ProcessBundleProgressResponse> getProcessBundleProgress() {
// processBundleId may be reset if this bundle finishes asynchronously.
String processBundleId = this.processBundleId;
@@ -393,13 +389,7 @@
return instructionRequestHandler
.handle(processBundleRequest)
- .thenApply(
- response -> {
- if (!response.getError().isEmpty()) {
- throw new IllegalStateException(response.getError());
- }
- return response.getProcessBundleProgress();
- });
+ .thenApply(InstructionResponse::getProcessBundleProgress);
}
/** Returns the final metrics returned by the SDK harness when it completes the bundle. */
@@ -636,53 +626,36 @@
return true;
}
- private static CompletionStage<BeamFnApi.InstructionResponse> throwIfFailure(
+ private static CompletionStage<BeamFnApi.ProcessBundleResponse> getProcessBundleResponse(
CompletionStage<InstructionResponse> responseFuture) {
return responseFuture.thenApply(
response -> {
- if (!response.getError().isEmpty()) {
- throw new IllegalStateException(
- String.format(
- "Client failed to process %s with error [%s].",
- response.getInstructionId(), response.getError()));
+ switch (response.getResponseCase()) {
+ case PROCESS_BUNDLE:
+ return response.getProcessBundle();
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "SDK harness returned wrong kind of response to ProcessBundleRequest: %s",
+ TextFormat.printToString(response)));
}
- return response;
});
}
- private static CompletionStage<BeamFnApi.ProcessBundleResponse> getProcessBundleResponse(
- CompletionStage<InstructionResponse> responseFuture) {
- return throwIfFailure(responseFuture)
- .thenApply(
- response -> {
- switch (response.getResponseCase()) {
- case PROCESS_BUNDLE:
- return response.getProcessBundle();
- default:
- throw new IllegalStateException(
- String.format(
- "SDK harness returned wrong kind of response to ProcessBundleRequest: %s",
- TextFormat.printToString(response)));
- }
- });
- }
-
private static CompletionStage<BeamFnApi.RegisterResponse> getRegisterResponse(
- CompletionStage<InstructionResponse> responseFuture)
- throws ExecutionException, InterruptedException {
- return throwIfFailure(responseFuture)
- .thenApply(
- response -> {
- switch (response.getResponseCase()) {
- case REGISTER:
- return response.getRegister();
- default:
- throw new IllegalStateException(
- String.format(
- "SDK harness returned wrong kind of response to RegisterRequest: %s",
- TextFormat.printToString(response)));
- }
- });
+ CompletionStage<InstructionResponse> responseFuture) {
+ return responseFuture.thenApply(
+ response -> {
+ switch (response.getResponseCase()) {
+ case REGISTER:
+ return response.getRegister();
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "SDK harness returned wrong kind of response to RegisterRequest: %s",
+ TextFormat.printToString(response)));
+ }
+ });
}
private static void cancelIfNotNull(CompletionStage<?> future) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
index b655732..8a9dc75 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/InstructionRequestHandler.java
@@ -20,7 +20,10 @@
import java.util.concurrent.CompletionStage;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-/** Interface for any function that can handle a Fn API {@link BeamFnApi.InstructionRequest}. */
+/**
+ * Interface for any function that can handle a Fn API {@link BeamFnApi.InstructionRequest}. Any
+ * error responses will be converted to exceptionally completed futures.
+ */
public interface InstructionRequestHandler extends AutoCloseable {
CompletionStage<BeamFnApi.InstructionResponse> handle(BeamFnApi.InstructionRequest request);
}