Merge pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read
* [BEAM-9897] improve credentials mechanism
* [BEAM-9897] add xlang support for SnowflakeIO.read
* [BEAM-9897] fix: python lint
* [BEAM-9897] refactor: revert auth mechanism and add missing docs
* [BEAM-9897] feat: add custom expansion-service
* [BEAM-9897] fix: CI
diff --git a/CHANGES.md b/CHANGES.md
index 04f3094..06c08da 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -100,6 +100,7 @@
reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See
Pydoc for more information.
* Add dispositions for SnowflakeIO.write ([BEAM-10343](https://issues.apache.org/jira/browse/BEAM-10343))
+* Add cross-language support to SnowflakeIO.Read([BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897)).
## New Features / Improvements
diff --git a/sdks/java/io/snowflake/build.gradle b/sdks/java/io/snowflake/build.gradle
index 32ad7af..0a179f5 100644
--- a/sdks/java/io/snowflake/build.gradle
+++ b/sdks/java/io/snowflake/build.gradle
@@ -22,6 +22,7 @@
automaticModuleName: 'org.apache.beam.sdk.io.snowflake')
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()
+
description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake"
ext.summary = "IO to read and write on Snowflake."
dependencies {
diff --git a/sdks/java/io/snowflake/expansion-service/build.gradle b/sdks/java/io/snowflake/expansion-service/build.gradle
new file mode 100644
index 0000000..8a6ea6c
--- /dev/null
+++ b/sdks/java/io/snowflake/expansion-service/build.gradle
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(enableChecker:false,
+ automaticModuleName: 'org.apache.beam.sdk.io.expansion.service',
+ exportJavadoc: false,
+ validateShadowJar: false,
+ shadowClosure: {},
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake ::Expansion Service"
+ ext.summary = "Expansion service serving Snowflake IO"
+
+dependencies {
+ compile project(":sdks:java:expansion-service")
+ compile project(":sdks:java:io:snowflake")
+ runtime library.java.slf4j_jdk14
+}
+
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
index 3876c2f..2b45dc1 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
@@ -18,38 +18,52 @@
package org.apache.beam.sdk.io.snowflake.credentials;
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
+import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar;
/**
* Factory class for creating implementations of {@link SnowflakeCredentials} from {@link
* SnowflakePipelineOptions}.
*/
public class SnowflakeCredentialsFactory {
- public static SnowflakeCredentials of(SnowflakePipelineOptions options) {
- if (oauthOptionsAvailable(options)) {
- return new OAuthTokenSnowflakeCredentials(options.getOauthToken());
- } else if (usernamePasswordOptionsAvailable(options)) {
- return new UsernamePasswordSnowflakeCredentials(options.getUsername(), options.getPassword());
- } else if (keyPairOptionsAvailable(options)) {
+ public static SnowflakeCredentials of(SnowflakePipelineOptions o) {
+ if (oauthOptionsAvailable(o.getOauthToken())) {
+ return new OAuthTokenSnowflakeCredentials(o.getOauthToken());
+ } else if (usernamePasswordOptionsAvailable(o.getUsername(), o.getPassword())) {
+ return new UsernamePasswordSnowflakeCredentials(o.getUsername(), o.getPassword());
+ } else if (keyPairOptionsAvailable(
+ o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase())) {
return new KeyPairSnowflakeCredentials(
- options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase());
+ o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase());
}
throw new RuntimeException("Can't get credentials from Options");
}
- private static boolean oauthOptionsAvailable(SnowflakePipelineOptions options) {
- return options.getOauthToken() != null && !options.getOauthToken().isEmpty();
+ public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) {
+ if (oauthOptionsAvailable(c.getOAuthToken())) {
+ return new OAuthTokenSnowflakeCredentials(c.getOAuthToken());
+ } else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) {
+ return new UsernamePasswordSnowflakeCredentials(c.getUsername(), c.getPassword());
+ } else if (keyPairOptionsAvailable(
+ c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase())) {
+ return new KeyPairSnowflakeCredentials(
+ c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase());
+ }
+ throw new RuntimeException("Can't get credentials from Options");
}
- private static boolean usernamePasswordOptionsAvailable(SnowflakePipelineOptions options) {
- return options.getUsername() != null
- && !options.getUsername().isEmpty()
- && !options.getPassword().isEmpty();
+ private static boolean oauthOptionsAvailable(String token) {
+ return token != null && !token.isEmpty();
}
- private static boolean keyPairOptionsAvailable(SnowflakePipelineOptions options) {
- return options.getUsername() != null
- && !options.getUsername().isEmpty()
- && !options.getPrivateKeyPath().isEmpty()
- && !options.getPrivateKeyPassphrase().isEmpty();
+ private static boolean usernamePasswordOptionsAvailable(String username, String password) {
+ return username != null && !username.isEmpty() && !password.isEmpty();
+ }
+
+ private static boolean keyPairOptionsAvailable(
+ String username, String privateKeyPath, String privateKeyPassphrase) {
+ return username != null
+ && !username.isEmpty()
+ && !privateKeyPath.isEmpty()
+ && !privateKeyPassphrase.isEmpty();
}
}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
new file mode 100644
index 0000000..38162ae
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+/** Parameters abstract class to expose the transforms to an external SDK. */
+public abstract class Configuration {
+ private String serverName;
+ private String username;
+ private String password;
+ private String privateKeyPath;
+ private String privateKeyPassphrase;
+ private String oAuthToken;
+ private String database;
+ private String schema;
+ private String table;
+ private String query;
+ private String stagingBucketName;
+ private String storageIntegrationName;
+
+ public String getServerName() {
+ return serverName;
+ }
+
+ public void setServerName(String serverName) {
+ this.serverName = serverName;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getPrivateKeyPath() {
+ return privateKeyPath;
+ }
+
+ public void setPrivateKeyPath(String privateKeyPath) {
+ this.privateKeyPath = privateKeyPath;
+ }
+
+ public String getPrivateKeyPassphrase() {
+ return privateKeyPassphrase;
+ }
+
+ public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
+ this.privateKeyPassphrase = privateKeyPassphrase;
+ }
+
+ public String getOAuthToken() {
+ return oAuthToken;
+ }
+
+ public void setOAuthToken(String oAuthToken) {
+ this.oAuthToken = oAuthToken;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public String getStagingBucketName() {
+ return stagingBucketName;
+ }
+
+ public void setStagingBucketName(String stagingBucketName) {
+ this.stagingBucketName = stagingBucketName;
+ }
+
+ public String getStorageIntegrationName() {
+ return storageIntegrationName;
+ }
+
+ public void setStorageIntegrationName(String storageIntegrationName) {
+ this.storageIntegrationName = storageIntegrationName;
+ }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
new file mode 100644
index 0000000..1e7be0f
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */
+@Experimental
+@AutoService(ExternalTransformRegistrar.class)
+public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar {
+
+ public static final String URN = "beam:external:java:snowflake:read:v1";
+
+ @Override
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+ return ImmutableMap.of(URN, ReadBuilder.class);
+ }
+
+ /** Parameters class to expose the transform to an external SDK. */
+ public static class ReadConfiguration extends Configuration {}
+
+ public static class ReadBuilder
+ implements ExternalTransformBuilder<ReadConfiguration, PBegin, PCollection<byte[]>> {
+ public ReadBuilder() {}
+
+ @Override
+ public PTransform<PBegin, PCollection<byte[]>> buildExternal(ReadConfiguration c) {
+ SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
+
+ SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
+ SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
+ SnowflakeIO.DataSourceConfiguration.create(credentials)
+ .withServerName(c.getServerName())
+ .withDatabase(c.getDatabase())
+ .withSchema(c.getSchema()));
+
+ return SnowflakeIO.<byte[]>read()
+ .withStorageIntegrationName(c.getStorageIntegrationName())
+ .withStagingBucketName(c.getStagingBucketName())
+ .withDataSourceProviderFn(dataSourceSerializableFunction)
+ .withCsvMapper(CsvMapper.getCsvMapper())
+ .withCoder(ByteArrayCoder.of())
+ .fromTable(c.getTable())
+ .fromQuery(c.getQuery());
+ }
+ }
+
+ private static class CsvMapper implements Serializable {
+
+ public static SnowflakeIO.CsvMapper getCsvMapper() {
+ return (SnowflakeIO.CsvMapper<byte[]>)
+ parts -> {
+ String partsCSV = String.join(",", parts);
+
+ return partsCSV.getBytes(Charset.defaultCharset());
+ };
+ }
+ }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java
new file mode 100644
index 0000000..7e24ee9
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cross-language for SnowflakeIO. */
+@Experimental(Kind.PORTABILITY)
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py
new file mode 100644
index 0000000..e7ffa6a
--- /dev/null
+++ b/sdks/python/apache_beam/io/external/snowflake.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Snowflake transforms tested against Flink portable runner.
+
+ **Setup**
+
+ Transforms provided in this module are cross-language transforms
+ implemented in the Beam Java SDK. During the pipeline construction, Python SDK
+ will connect to a Java expansion service to expand these transforms.
+ To facilitate this, a small amount of setup is needed before using these
+ transforms in a Beam Python pipeline.
+
+ There are several ways to setup cross-language Snowflake transforms.
+
+ * Option 1: use the default expansion service
+ * Option 2: specify a custom expansion service
+
+ See below for details regarding each of these options.
+
+ *Option 1: Use the default expansion service*
+
+ This is the recommended and easiest setup option for using Python Snowflake
+ transforms.This option requires following pre-requisites
+ before running the Beam pipeline.
+
+ * Install Java runtime in the computer from where the pipeline is constructed
+ and make sure that 'java' command is available.
+
+ In this option, Python SDK will either download (for released Beam version) or
+ build (when running from a Beam Git clone) a expansion service jar and use
+ that to expand transforms. Currently Snowflake transforms use the
+ 'beam-sdks-java-io-expansion-service' jar for this purpose.
+
+ *Option 2: specify a custom expansion service*
+
+ In this option, you startup your own expansion service and provide that as
+ a parameter when using the transforms provided in this module.
+
+ This option requires following pre-requisites before running the Beam
+ pipeline.
+
+ * Startup your own expansion service.
+ * Update your pipeline to provide the expansion service address when
+ initiating Snowflake transforms provided in this module.
+
+ Flink Users can use the built-in Expansion Service of the Flink Runner's
+ Job Server. If you start Flink's Job Server, the expansion service will be
+ started on port 8097. For a different address, please set the
+ expansion_service parameter.
+
+ **More information**
+
+ For more information regarding cross-language transforms see:
+ - https://beam.apache.org/roadmap/portability/
+
+ For more information specific to Flink runner see:
+ - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+ReadFromSnowflakeSchema = typing.NamedTuple(
+ 'ReadFromSnowflakeSchema',
+ [
+ ('server_name', unicode),
+ ('schema', unicode),
+ ('database', unicode),
+ ('staging_bucket_name', unicode),
+ ('storage_integration_name', unicode),
+ ('username', typing.Optional[unicode]),
+ ('password', typing.Optional[unicode]),
+ ('private_key_path', typing.Optional[unicode]),
+ ('private_key_passphrase', typing.Optional[unicode]),
+ ('o_auth_token', typing.Optional[unicode]),
+ ('table', typing.Optional[unicode]),
+ ('query', typing.Optional[unicode]),
+ ])
+
+
+def default_io_expansion_service():
+ return BeamJarExpansionService(
+ 'sdks:java:io:snowflake:expansion-service:shadowJar')
+
+
+class ReadFromSnowflake(beam.PTransform):
+ """
+ An external PTransform which reads from Snowflake.
+ """
+
+ URN = 'beam:external:java:snowflake:read:v1'
+
+ def __init__(
+ self,
+ server_name,
+ schema,
+ database,
+ staging_bucket_name,
+ storage_integration_name,
+ csv_mapper,
+ username=None,
+ password=None,
+ private_key_path=None,
+ private_key_passphrase=None,
+ o_auth_token=None,
+ table=None,
+ query=None,
+ expansion_service=None):
+ """
+ Initializes a read operation from Snowflake.
+
+ Required parameters:
+
+ :param server_name: full Snowflake server name with the following format
+ account.region.gcp.snowflakecomputing.com.
+ :param schema: name of the Snowflake schema in the database to use.
+ :param database: name of the Snowflake database to use.
+ :param staging_bucket_name: name of the Google Cloud Storage bucket.::
+ Bucket will be used as a temporary location for storing CSV files.
+ Those temporary directories will be named
+ 'sf_copy_csv_DATE_TIME_RANDOMSUFFIX'
+ and they will be removed automatically once Read operation finishes.
+ :param storage_integration_name: is the name of storage integration
+ object created according to Snowflake documentation.
+ :param csv_mapper: specifies a function which must translate
+ user-defined object to array of strings.
+ SnowflakeIO uses a COPY INTO <location> statement to move data from
+ a Snowflake table to Google Cloud Storage as CSV files.These files
+ are then downloaded via FileIO and processed line by line.
+ Each line is split into an array of Strings using the OpenCSV
+ The csv_mapper function job is to give the user the possibility to
+ convert the array of Strings to a user-defined type,
+ ie. GenericRecord for Avro or Parquet files, or custom objects.
+ Example:
+ def csv_mapper(strings_array)
+ return User(strings_array[0], int(strings_array[1])))
+ :param table: specifies a Snowflake table name.
+ :param query: specifies a Snowflake custom SQL query.
+ :param expansion_service: specifies URL of expansion service.
+
+ Authentication parameters:
+
+ :param username: specifies username for
+ username/password authentication method.
+ :param password: specifies password for
+ username/password authentication method.
+ :param private_key_path: specifies a private key file for
+ key/ pair authentication method.
+ :param private_key_passphrase: specifies password for
+ key/ pair authentication method.
+ :param o_auth_token: specifies access token for
+ OAuth authentication method.
+ """
+ self.params = ReadFromSnowflakeSchema(
+ server_name=server_name,
+ schema=schema,
+ database=database,
+ staging_bucket_name=staging_bucket_name,
+ storage_integration_name=storage_integration_name,
+ username=username,
+ password=password,
+ private_key_path=private_key_path,
+ private_key_passphrase=private_key_passphrase,
+ o_auth_token=o_auth_token,
+ table=table,
+ query=query)
+ self.csv_mapper = csv_mapper
+ self.expansion_service = expansion_service or default_io_expansion_service()
+
+ def expand(self, pbegin):
+ return (
+ pbegin
+ | ExternalTransform(
+ self.URN,
+ NamedTupleBasedPayloadBuilder(self.params),
+ self.expansion_service,
+ )
+ | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b','))
+ | 'CSV mapper' >> beam.Map(self.csv_mapper))
diff --git a/settings.gradle b/settings.gradle
index 0220857..638c216 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -120,6 +120,7 @@
include ":sdks:java:io:redis"
include ":sdks:java:io:solr"
include ":sdks:java:io:snowflake"
+include ":sdks:java:io:snowflake:expansion-service"
include ":sdks:java:io:splunk"
include ":sdks:java:io:thrift"
include ":sdks:java:io:tika"
diff --git a/website/www/site/content/en/roadmap/connectors-multi-sdk.md b/website/www/site/content/en/roadmap/connectors-multi-sdk.md
index 464ad83..3b13f59 100644
--- a/website/www/site/content/en/roadmap/connectors-multi-sdk.md
+++ b/website/www/site/content/en/roadmap/connectors-multi-sdk.md
@@ -80,6 +80,7 @@
* Java KafkaIO - completed - [BEAM-7029](https://issues.apache.org/jira/browse/BEAM-7029)
* Java KinesisIO - In progress - [BEAM-10137](https://issues.apache.org/jira/browse/BEAM-10137), [BEAM-10138](https://issues.apache.org/jira/browse/BEAM-10138)
* Java PubSubIO - In progress - [BEAM-7738](https://issues.apache.org/jira/browse/BEAM-7738)
+* Java SnowflakeIO - In progress - [BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897), [BEAM-9898](https://issues.apache.org/jira/browse/BEAM-9898)
* Java SpannerIO - In progress - [BEAM-10139](https://issues.apache.org/jira/browse/BEAM-10139), [BEAM-10140](https://issues.apache.org/jira/browse/BEAM-10140)
* Java SQL - completed - [BEAM-8603](https://issues.apache.org/jira/browse/BEAM-8603)