| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.managed; |
| |
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import com.google.auto.service.AutoService; |
| import com.google.auto.value.AutoValue; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.ServiceLoader; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.io.FileSystems; |
| import org.apache.beam.sdk.io.fs.MatchResult; |
| import org.apache.beam.sdk.schemas.AutoValueSchema; |
| import org.apache.beam.sdk.schemas.NoSuchSchemaException; |
| import org.apache.beam.sdk.schemas.Schema; |
| import org.apache.beam.sdk.schemas.SchemaRegistry; |
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; |
| import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; |
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; |
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; |
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; |
| import org.apache.beam.sdk.schemas.utils.YamlUtils; |
| import org.apache.beam.sdk.values.PCollectionRowTuple; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; |
| |
| @AutoService(SchemaTransformProvider.class) |
| public class ManagedSchemaTransformProvider |
| extends TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> { |
| |
| @Override |
| public String identifier() { |
| return "beam:transform:managed:v1"; |
| } |
| |
| private final Map<String, SchemaTransformProvider> schemaTransformProviders = new HashMap<>(); |
| |
| public ManagedSchemaTransformProvider() {} |
| |
| ManagedSchemaTransformProvider(@Nullable Collection<String> supportedIdentifiers) { |
| try { |
| for (SchemaTransformProvider schemaTransformProvider : |
| ServiceLoader.load(SchemaTransformProvider.class)) { |
| if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { |
| throw new IllegalArgumentException( |
| "Found multiple SchemaTransformProvider implementations with the same identifier " |
| + schemaTransformProvider.identifier()); |
| } |
| if (supportedIdentifiers == null |
| || supportedIdentifiers.contains(schemaTransformProvider.identifier())) { |
| schemaTransformProviders.put( |
| schemaTransformProvider.identifier(), schemaTransformProvider); |
| } |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e.getMessage()); |
| } |
| } |
| |
| @DefaultSchema(AutoValueSchema.class) |
| @AutoValue |
| @VisibleForTesting |
| abstract static class ManagedConfig { |
| public static Builder builder() { |
| return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); |
| } |
| |
| @SchemaFieldDescription( |
| "Identifier of the underlying SchemaTransform to discover and instantiate.") |
| public abstract String getTransformIdentifier(); |
| |
| @SchemaFieldDescription( |
| "URL path to the YAML config file used to build the underlying SchemaTransform.") |
| public abstract @Nullable String getConfigUrl(); |
| |
| @SchemaFieldDescription("YAML string config used to build the underlying SchemaTransform.") |
| public abstract @Nullable String getConfig(); |
| |
| @AutoValue.Builder |
| public abstract static class Builder { |
| public abstract Builder setTransformIdentifier(String identifier); |
| |
| public abstract Builder setConfigUrl(@Nullable String configUrl); |
| |
| public abstract Builder setConfig(@Nullable String yamlConfig); |
| |
| public abstract ManagedConfig build(); |
| } |
| |
| protected void validate() { |
| boolean configExists = !Strings.isNullOrEmpty(getConfig()); |
| boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl()); |
| List<Boolean> configs = Arrays.asList(configExists, configUrlExists); |
| checkArgument( |
| 1 == configs.stream().filter(Predicates.equalTo(true)).count(), |
| "Please specify a config or a config URL, but not both."); |
| } |
| |
| public @Nullable String resolveUnderlyingConfig() { |
| String yamlTransformConfig = getConfig(); |
| // If YAML string is empty, then attempt to read from YAML file |
| if (Strings.isNullOrEmpty(yamlTransformConfig)) { |
| try { |
| MatchResult.Metadata fileMetaData = |
| FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(getConfigUrl())); |
| ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes()); |
| FileSystems.open(fileMetaData.resourceId()).read(buffer); |
| yamlTransformConfig = new String(buffer.array(), StandardCharsets.UTF_8); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return yamlTransformConfig; |
| } |
| } |
| |
| @Override |
| protected SchemaTransform from(ManagedConfig managedConfig) { |
| managedConfig.validate(); |
| SchemaTransformProvider schemaTransformProvider = |
| Preconditions.checkNotNull( |
| schemaTransformProviders.get(managedConfig.getTransformIdentifier()), |
| "Could not find transform with identifier %s, or it may not be supported", |
| managedConfig.getTransformIdentifier()); |
| |
| return new ManagedSchemaTransform(managedConfig, schemaTransformProvider); |
| } |
| |
| static class ManagedSchemaTransform extends SchemaTransform { |
| private final ManagedConfig managedConfig; |
| private final Row underlyingTransformConfig; |
| private final SchemaTransformProvider underlyingTransformProvider; |
| |
| ManagedSchemaTransform( |
| ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) { |
| // parse config before expansion to check if it matches underlying transform's config schema |
| Schema transformConfigSchema = underlyingTransformProvider.configurationSchema(); |
| Row underlyingTransformConfig; |
| try { |
| underlyingTransformConfig = getRowConfig(managedConfig, transformConfigSchema); |
| } catch (Exception e) { |
| throw new IllegalArgumentException( |
| "Encountered an error when retrieving a Row configuration", e); |
| } |
| |
| this.managedConfig = managedConfig; |
| this.underlyingTransformConfig = underlyingTransformConfig; |
| this.underlyingTransformProvider = underlyingTransformProvider; |
| } |
| |
| @Override |
| public PCollectionRowTuple expand(PCollectionRowTuple input) { |
| return input.apply(underlyingTransformProvider.from(underlyingTransformConfig)); |
| } |
| |
| public ManagedConfig getManagedConfig() { |
| return this.managedConfig; |
| } |
| |
| Row getConfigurationRow() { |
| try { |
| // To stay consistent with our SchemaTransform configuration naming conventions, |
| // we sort lexicographically and convert field names to snake_case |
| return SchemaRegistry.createDefault() |
| .getToRowFunction(ManagedConfig.class) |
| .apply(managedConfig) |
| .sorted() |
| .toSnakeCase(); |
| } catch (NoSuchSchemaException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| static Row getRowConfig(ManagedConfig config, Schema transformSchema) { |
| // May return an empty row (perhaps the underlying transform doesn't have any required |
| // parameters) |
| return YamlUtils.toBeamRow(config.resolveUnderlyingConfig(), transformSchema, false); |
| } |
| |
| Map<String, SchemaTransformProvider> getAllProviders() { |
| return schemaTransformProviders; |
| } |
| |
| // TODO: set global snake_case naming convention and remove these special cases |
| @Override |
| public SchemaTransform from(Row rowConfig) { |
| return super.from(rowConfig.toCamelCase()); |
| } |
| |
| @Override |
| public Schema configurationSchema() { |
| return super.configurationSchema().toSnakeCase(); |
| } |
| } |