[BEAM-10629] Added KnownBuilderInstances to ExternalTransformRegistrar (#12454)
Co-authored-by: Scott Lukas <slukas@google.com>
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
index bbdd3a5..aa5288c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
@@ -17,10 +17,14 @@
*/
package org.apache.beam.sdk.expansion;
+import java.lang.reflect.Constructor;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
/**
* A registrar which contains a mapping from URNs to available {@link ExternalTransformBuilder}s.
@@ -29,6 +33,42 @@
@Experimental(Kind.PORTABILITY)
public interface ExternalTransformRegistrar {
- /** A mapping from URN to an {@link ExternalTransformBuilder} class. */
- Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders();
+ /**
+ * A mapping from URN to an {@link ExternalTransformBuilder} class.
+ *
+ * @deprecated Prefer implementing 'knownBuilderInstances'. This method will be removed in a
+ * future version of Beam.
+ */
+ @Deprecated
+ default Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+ return ImmutableMap.<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>>builder()
+ .build();
+ }
+
+ /** A mapping from URN to an {@link ExternalTransformBuilder} instance. */
+ default Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+ ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformBuilder>builder();
+ Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders = knownBuilders();
+ for (Entry<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilder :
+ knownBuilders.entrySet()) {
+ Preconditions.checkState(
+ ExternalTransformBuilder.class.isAssignableFrom(knownBuilder.getValue()),
+ "Provided identifier %s is not an ExternalTransformBuilder.",
+ knownBuilder.getValue().getName());
+ try {
+ Constructor<? extends ExternalTransformBuilder> constructor =
+ knownBuilder.getValue().getDeclaredConstructor();
+
+ constructor.setAccessible(true);
+ builder.put(knownBuilder.getKey(), constructor.newInstance());
+
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to instantiate ExternalTransformBuilder from constructor.");
+ }
+ }
+ return builder.build();
+ }
}
diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index c20e8d2..09a42e3 100644
--- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -22,7 +22,6 @@
import com.google.auto.service.AutoService;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
import java.util.Collections;
@@ -107,17 +106,21 @@
ImmutableMap.builder();
for (ExternalTransformRegistrar registrar :
ServiceLoader.load(ExternalTransformRegistrar.class)) {
- for (Map.Entry<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> entry :
- registrar.knownBuilders().entrySet()) {
+ for (Map.Entry<String, ExternalTransformBuilder<?, ?, ?>> entry :
+ registrar.knownBuilderInstances().entrySet()) {
String urn = entry.getKey();
- Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass = entry.getValue();
+ ExternalTransformBuilder builderInstance = entry.getValue();
builder.put(
urn,
spec -> {
try {
ExternalTransforms.ExternalConfigurationPayload payload =
ExternalTransforms.ExternalConfigurationPayload.parseFrom(spec.getPayload());
- return translate(payload, builderClass);
+ return builderInstance.buildExternal(
+ payloadToConfig(
+ payload,
+ (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+ builderInstance.getClass()));
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to build transform %s from spec %s", urn, spec), e);
@@ -125,21 +128,17 @@
});
}
}
+
return builder.build();
}
- private static PTransform<?, ?> translate(
+ Object payloadToConfig(
ExternalTransforms.ExternalConfigurationPayload payload,
Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass)
throws Exception {
- Preconditions.checkState(
- ExternalTransformBuilder.class.isAssignableFrom(builderClass),
- "Provided identifier %s is not an ExternalTransformBuilder.",
- builderClass.getName());
-
Object configObject = initConfiguration(builderClass);
populateConfiguration(configObject, payload);
- return buildTransform(builderClass, configObject);
+ return configObject;
}
private static Object initConfiguration(
@@ -239,28 +238,6 @@
return coderBuilder.build();
}
-
- private static PTransform<?, ?> buildTransform(
- Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass, Object configObject)
- throws Exception {
- Constructor<? extends ExternalTransformBuilder<?, ?, ?>> constructor =
- builderClass.getDeclaredConstructor();
- constructor.setAccessible(true);
- ExternalTransformBuilder<?, ?, ?> externalTransformBuilder = constructor.newInstance();
- Method buildMethod = builderClass.getMethod("buildExternal", configObject.getClass());
- buildMethod.setAccessible(true);
-
- PTransform<?, ?> transform =
- (PTransform<?, ?>)
- checkArgumentNotNull(
- buildMethod.invoke(externalTransformBuilder, configObject),
- "Invoking %s.%s(%s) returned null, violating its type.",
- builderClass.getCanonicalName(),
- "buildExternal",
- configObject);
-
- return transform;
- }
}
/**