blob: e13741e86b4a159614eb1ba41f8d8e502fb6d585 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.managed;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@AutoService(SchemaTransformProvider.class)
public class ManagedSchemaTransformProvider
extends TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
@Override
public String identifier() {
return "beam:transform:managed:v1";
}
private final Map<String, SchemaTransformProvider> schemaTransformProviders = new HashMap<>();
public ManagedSchemaTransformProvider() {}
ManagedSchemaTransformProvider(@Nullable Collection<String> supportedIdentifiers) {
try {
for (SchemaTransformProvider schemaTransformProvider :
ServiceLoader.load(SchemaTransformProvider.class)) {
if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) {
throw new IllegalArgumentException(
"Found multiple SchemaTransformProvider implementations with the same identifier "
+ schemaTransformProvider.identifier());
}
if (supportedIdentifiers == null
|| supportedIdentifiers.contains(schemaTransformProvider.identifier())) {
schemaTransformProviders.put(
schemaTransformProvider.identifier(), schemaTransformProvider);
}
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
@VisibleForTesting
abstract static class ManagedConfig {
public static Builder builder() {
return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder();
}
@SchemaFieldDescription(
"Identifier of the underlying SchemaTransform to discover and instantiate.")
public abstract String getTransformIdentifier();
@SchemaFieldDescription(
"URL path to the YAML config file used to build the underlying SchemaTransform.")
public abstract @Nullable String getConfigUrl();
@SchemaFieldDescription("YAML string config used to build the underlying SchemaTransform.")
public abstract @Nullable String getConfig();
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTransformIdentifier(String identifier);
public abstract Builder setConfigUrl(@Nullable String configUrl);
public abstract Builder setConfig(@Nullable String yamlConfig);
public abstract ManagedConfig build();
}
protected void validate() {
boolean configExists = !Strings.isNullOrEmpty(getConfig());
boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl());
List<Boolean> configs = Arrays.asList(configExists, configUrlExists);
checkArgument(
1 == configs.stream().filter(Predicates.equalTo(true)).count(),
"Please specify a config or a config URL, but not both.");
}
public @Nullable String resolveUnderlyingConfig() {
String yamlTransformConfig = getConfig();
// If YAML string is empty, then attempt to read from YAML file
if (Strings.isNullOrEmpty(yamlTransformConfig)) {
try {
MatchResult.Metadata fileMetaData =
FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(getConfigUrl()));
ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes());
FileSystems.open(fileMetaData.resourceId()).read(buffer);
yamlTransformConfig = new String(buffer.array(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return yamlTransformConfig;
}
}
@Override
protected SchemaTransform from(ManagedConfig managedConfig) {
managedConfig.validate();
SchemaTransformProvider schemaTransformProvider =
Preconditions.checkNotNull(
schemaTransformProviders.get(managedConfig.getTransformIdentifier()),
"Could not find transform with identifier %s, or it may not be supported",
managedConfig.getTransformIdentifier());
return new ManagedSchemaTransform(managedConfig, schemaTransformProvider);
}
static class ManagedSchemaTransform extends SchemaTransform {
private final ManagedConfig managedConfig;
private final Row underlyingTransformConfig;
private final SchemaTransformProvider underlyingTransformProvider;
ManagedSchemaTransform(
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
// parse config before expansion to check if it matches underlying transform's config schema
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
Row underlyingTransformConfig;
try {
underlyingTransformConfig = getRowConfig(managedConfig, transformConfigSchema);
} catch (Exception e) {
throw new IllegalArgumentException(
"Encountered an error when retrieving a Row configuration", e);
}
this.managedConfig = managedConfig;
this.underlyingTransformConfig = underlyingTransformConfig;
this.underlyingTransformProvider = underlyingTransformProvider;
}
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
return input.apply(underlyingTransformProvider.from(underlyingTransformConfig));
}
public ManagedConfig getManagedConfig() {
return this.managedConfig;
}
Row getConfigurationRow() {
try {
// To stay consistent with our SchemaTransform configuration naming conventions,
// we sort lexicographically and convert field names to snake_case
return SchemaRegistry.createDefault()
.getToRowFunction(ManagedConfig.class)
.apply(managedConfig)
.sorted()
.toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
}
}
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
// May return an empty row (perhaps the underlying transform doesn't have any required
// parameters)
return YamlUtils.toBeamRow(config.resolveUnderlyingConfig(), transformSchema, false);
}
Map<String, SchemaTransformProvider> getAllProviders() {
return schemaTransformProviders;
}
// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}
@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}