Merge pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read

* [BEAM-9897] improve credentials mechanism

* [BEAM-9897] add xlang support for SnowflakeIO.read

* [BEAM-9897] fix: python lint

* [BEAM-9897] refactor: revert auth mechanism and add missing docs

* [BEAM-9897] feat: add custom expansion-service

* [BEAM-9897] fix: CI
diff --git a/CHANGES.md b/CHANGES.md
index 04f3094..06c08da 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -100,6 +100,7 @@
   reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See
   Pydoc for more information.
 * Add dispositions for SnowflakeIO.write ([BEAM-10343](https://issues.apache.org/jira/browse/BEAM-10343))
+* Add cross-language support to SnowflakeIO.Read([BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897)).
 
 ## New Features / Improvements
 
diff --git a/sdks/java/io/snowflake/build.gradle b/sdks/java/io/snowflake/build.gradle
index 32ad7af..0a179f5 100644
--- a/sdks/java/io/snowflake/build.gradle
+++ b/sdks/java/io/snowflake/build.gradle
@@ -22,6 +22,7 @@
   automaticModuleName: 'org.apache.beam.sdk.io.snowflake')
 provideIntegrationTestingDependencies()
 enableJavaPerformanceTesting()
+
 description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake"
 ext.summary = "IO to read and write on Snowflake."
 dependencies {
diff --git a/sdks/java/io/snowflake/expansion-service/build.gradle b/sdks/java/io/snowflake/expansion-service/build.gradle
new file mode 100644
index 0000000..8a6ea6c
--- /dev/null
+++ b/sdks/java/io/snowflake/expansion-service/build.gradle
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(enableChecker:false,
+  automaticModuleName: 'org.apache.beam.sdk.io.expansion.service',
+  exportJavadoc: false,
+  validateShadowJar: false,
+  shadowClosure: {},
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake ::Expansion Service"
+  ext.summary = "Expansion service serving Snowflake IO"
+
+dependencies {
+  compile project(":sdks:java:expansion-service")
+  compile project(":sdks:java:io:snowflake")
+  runtime library.java.slf4j_jdk14
+}
+
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
index 3876c2f..2b45dc1 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
@@ -18,38 +18,52 @@
 package org.apache.beam.sdk.io.snowflake.credentials;
 
 import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
+import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar;
 
 /**
  * Factory class for creating implementations of {@link SnowflakeCredentials} from {@link
  * SnowflakePipelineOptions}.
  */
 public class SnowflakeCredentialsFactory {
-  public static SnowflakeCredentials of(SnowflakePipelineOptions options) {
-    if (oauthOptionsAvailable(options)) {
-      return new OAuthTokenSnowflakeCredentials(options.getOauthToken());
-    } else if (usernamePasswordOptionsAvailable(options)) {
-      return new UsernamePasswordSnowflakeCredentials(options.getUsername(), options.getPassword());
-    } else if (keyPairOptionsAvailable(options)) {
+  public static SnowflakeCredentials of(SnowflakePipelineOptions o) {
+    if (oauthOptionsAvailable(o.getOauthToken())) {
+      return new OAuthTokenSnowflakeCredentials(o.getOauthToken());
+    } else if (usernamePasswordOptionsAvailable(o.getUsername(), o.getPassword())) {
+      return new UsernamePasswordSnowflakeCredentials(o.getUsername(), o.getPassword());
+    } else if (keyPairOptionsAvailable(
+        o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase())) {
       return new KeyPairSnowflakeCredentials(
-          options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase());
+          o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase());
     }
     throw new RuntimeException("Can't get credentials from Options");
   }
 
-  private static boolean oauthOptionsAvailable(SnowflakePipelineOptions options) {
-    return options.getOauthToken() != null && !options.getOauthToken().isEmpty();
+  public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) {
+    if (oauthOptionsAvailable(c.getOAuthToken())) {
+      return new OAuthTokenSnowflakeCredentials(c.getOAuthToken());
+    } else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) {
+      return new UsernamePasswordSnowflakeCredentials(c.getUsername(), c.getPassword());
+    } else if (keyPairOptionsAvailable(
+        c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase())) {
+      return new KeyPairSnowflakeCredentials(
+          c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase());
+    }
+    throw new RuntimeException("Can't get credentials from Options");
   }
 
-  private static boolean usernamePasswordOptionsAvailable(SnowflakePipelineOptions options) {
-    return options.getUsername() != null
-        && !options.getUsername().isEmpty()
-        && !options.getPassword().isEmpty();
+  private static boolean oauthOptionsAvailable(String token) {
+    return token != null && !token.isEmpty();
   }
 
-  private static boolean keyPairOptionsAvailable(SnowflakePipelineOptions options) {
-    return options.getUsername() != null
-        && !options.getUsername().isEmpty()
-        && !options.getPrivateKeyPath().isEmpty()
-        && !options.getPrivateKeyPassphrase().isEmpty();
+  private static boolean usernamePasswordOptionsAvailable(String username, String password) {
+    return username != null && !username.isEmpty() && !password.isEmpty();
+  }
+
+  private static boolean keyPairOptionsAvailable(
+      String username, String privateKeyPath, String privateKeyPassphrase) {
+    return username != null
+        && !username.isEmpty()
+        && !privateKeyPath.isEmpty()
+        && !privateKeyPassphrase.isEmpty();
   }
 }
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
new file mode 100644
index 0000000..38162ae
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+/** Parameters abstract class to expose the transforms to an external SDK. */
+public abstract class Configuration {
+  private String serverName;
+  private String username;
+  private String password;
+  private String privateKeyPath;
+  private String privateKeyPassphrase;
+  private String oAuthToken;
+  private String database;
+  private String schema;
+  private String table;
+  private String query;
+  private String stagingBucketName;
+  private String storageIntegrationName;
+
+  public String getServerName() {
+    return serverName;
+  }
+
+  public void setServerName(String serverName) {
+    this.serverName = serverName;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getPrivateKeyPath() {
+    return privateKeyPath;
+  }
+
+  public void setPrivateKeyPath(String privateKeyPath) {
+    this.privateKeyPath = privateKeyPath;
+  }
+
+  public String getPrivateKeyPassphrase() {
+    return privateKeyPassphrase;
+  }
+
+  public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
+    this.privateKeyPassphrase = privateKeyPassphrase;
+  }
+
+  public String getOAuthToken() {
+    return oAuthToken;
+  }
+
+  public void setOAuthToken(String oAuthToken) {
+    this.oAuthToken = oAuthToken;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getSchema() {
+    return schema;
+  }
+
+  public void setSchema(String schema) {
+    this.schema = schema;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public String getStagingBucketName() {
+    return stagingBucketName;
+  }
+
+  public void setStagingBucketName(String stagingBucketName) {
+    this.stagingBucketName = stagingBucketName;
+  }
+
+  public String getStorageIntegrationName() {
+    return storageIntegrationName;
+  }
+
+  public void setStorageIntegrationName(String storageIntegrationName) {
+    this.storageIntegrationName = storageIntegrationName;
+  }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
new file mode 100644
index 0000000..1e7be0f
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */
+@Experimental
+@AutoService(ExternalTransformRegistrar.class)
+public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:snowflake:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(URN, ReadBuilder.class);
+  }
+
+  /** Parameters class to expose the transform to an external SDK. */
+  public static class ReadConfiguration extends Configuration {}
+
+  public static class ReadBuilder
+      implements ExternalTransformBuilder<ReadConfiguration, PBegin, PCollection<byte[]>> {
+    public ReadBuilder() {}
+
+    @Override
+    public PTransform<PBegin, PCollection<byte[]>> buildExternal(ReadConfiguration c) {
+      SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
+
+      SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
+          SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
+              SnowflakeIO.DataSourceConfiguration.create(credentials)
+                  .withServerName(c.getServerName())
+                  .withDatabase(c.getDatabase())
+                  .withSchema(c.getSchema()));
+
+      return SnowflakeIO.<byte[]>read()
+          .withStorageIntegrationName(c.getStorageIntegrationName())
+          .withStagingBucketName(c.getStagingBucketName())
+          .withDataSourceProviderFn(dataSourceSerializableFunction)
+          .withCsvMapper(CsvMapper.getCsvMapper())
+          .withCoder(ByteArrayCoder.of())
+          .fromTable(c.getTable())
+          .fromQuery(c.getQuery());
+    }
+  }
+
+  private static class CsvMapper implements Serializable {
+
+    public static SnowflakeIO.CsvMapper getCsvMapper() {
+      return (SnowflakeIO.CsvMapper<byte[]>)
+          parts -> {
+            String partsCSV = String.join(",", parts);
+
+            return partsCSV.getBytes(Charset.defaultCharset());
+          };
+    }
+  }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java
new file mode 100644
index 0000000..7e24ee9
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cross-language for SnowflakeIO. */
+@Experimental(Kind.PORTABILITY)
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py
new file mode 100644
index 0000000..e7ffa6a
--- /dev/null
+++ b/sdks/python/apache_beam/io/external/snowflake.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Snowflake transforms tested against Flink portable runner.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Snowflake transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Snowflake
+  transforms.This option requires following pre-requisites
+  before running the Beam pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+    and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Snowflake transforms use the
+  'beam-sdks-java-io-expansion-service' jar for this purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+    initiating Snowflake transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+ReadFromSnowflakeSchema = typing.NamedTuple(
+    'ReadFromSnowflakeSchema',
+    [
+        ('server_name', unicode),
+        ('schema', unicode),
+        ('database', unicode),
+        ('staging_bucket_name', unicode),
+        ('storage_integration_name', unicode),
+        ('username', typing.Optional[unicode]),
+        ('password', typing.Optional[unicode]),
+        ('private_key_path', typing.Optional[unicode]),
+        ('private_key_passphrase', typing.Optional[unicode]),
+        ('o_auth_token', typing.Optional[unicode]),
+        ('table', typing.Optional[unicode]),
+        ('query', typing.Optional[unicode]),
+    ])
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+      'sdks:java:io:snowflake:expansion-service:shadowJar')
+
+
+class ReadFromSnowflake(beam.PTransform):
+  """
+    An external PTransform which reads from Snowflake.
+  """
+
+  URN = 'beam:external:java:snowflake:read:v1'
+
+  def __init__(
+      self,
+      server_name,
+      schema,
+      database,
+      staging_bucket_name,
+      storage_integration_name,
+      csv_mapper,
+      username=None,
+      password=None,
+      private_key_path=None,
+      private_key_passphrase=None,
+      o_auth_token=None,
+      table=None,
+      query=None,
+      expansion_service=None):
+    """
+    Initializes a read operation from Snowflake.
+
+    Required parameters:
+
+    :param server_name: full Snowflake server name with the following format
+         account.region.gcp.snowflakecomputing.com.
+    :param schema: name of the Snowflake schema in the database to use.
+    :param database: name of the Snowflake database to use.
+    :param staging_bucket_name: name of the Google Cloud Storage bucket.::
+        Bucket will be used as a temporary location for storing CSV files.
+        Those temporary directories will be named
+        'sf_copy_csv_DATE_TIME_RANDOMSUFFIX'
+        and they will be removed automatically once Read operation finishes.
+    :param storage_integration_name: is the name of storage integration
+        object created according to Snowflake documentation.
+    :param csv_mapper: specifies a function which must translate
+        user-defined object to array of strings.
+        SnowflakeIO uses a COPY INTO <location> statement to move data from
+        a Snowflake table to Google Cloud Storage as CSV files.These files
+        are then downloaded via FileIO and processed line by line.
+        Each line is split into an array of Strings using the OpenCSV
+        The csv_mapper function job is to give the user the possibility to
+        convert the array of Strings to a user-defined type,
+        ie. GenericRecord for Avro or Parquet files, or custom objects.
+        Example:
+        def csv_mapper(strings_array)
+        return User(strings_array[0], int(strings_array[1])))
+    :param table: specifies a Snowflake table name.
+    :param query: specifies a Snowflake custom SQL query.
+    :param expansion_service: specifies URL of expansion service.
+
+    Authentication parameters:
+
+    :param username: specifies username for
+        username/password authentication method.
+    :param password: specifies password for
+        username/password authentication method.
+    :param private_key_path: specifies a private key file for
+        key/ pair authentication method.
+    :param private_key_passphrase: specifies password for
+        key/ pair authentication method.
+    :param o_auth_token: specifies access token for
+        OAuth authentication method.
+    """
+    self.params = ReadFromSnowflakeSchema(
+        server_name=server_name,
+        schema=schema,
+        database=database,
+        staging_bucket_name=staging_bucket_name,
+        storage_integration_name=storage_integration_name,
+        username=username,
+        password=password,
+        private_key_path=private_key_path,
+        private_key_passphrase=private_key_passphrase,
+        o_auth_token=o_auth_token,
+        table=table,
+        query=query)
+    self.csv_mapper = csv_mapper
+    self.expansion_service = expansion_service or default_io_expansion_service()
+
+  def expand(self, pbegin):
+    return (
+        pbegin
+        | ExternalTransform(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(self.params),
+            self.expansion_service,
+        )
+        | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b','))
+        | 'CSV mapper' >> beam.Map(self.csv_mapper))
diff --git a/settings.gradle b/settings.gradle
index 0220857..638c216 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -120,6 +120,7 @@
 include ":sdks:java:io:redis"
 include ":sdks:java:io:solr"
 include ":sdks:java:io:snowflake"
+include ":sdks:java:io:snowflake:expansion-service"
 include ":sdks:java:io:splunk"
 include ":sdks:java:io:thrift"
 include ":sdks:java:io:tika"
diff --git a/website/www/site/content/en/roadmap/connectors-multi-sdk.md b/website/www/site/content/en/roadmap/connectors-multi-sdk.md
index 464ad83..3b13f59 100644
--- a/website/www/site/content/en/roadmap/connectors-multi-sdk.md
+++ b/website/www/site/content/en/roadmap/connectors-multi-sdk.md
@@ -80,6 +80,7 @@
 * Java KafkaIO - completed - [BEAM-7029](https://issues.apache.org/jira/browse/BEAM-7029)
 * Java KinesisIO - In progress - [BEAM-10137](https://issues.apache.org/jira/browse/BEAM-10137), [BEAM-10138](https://issues.apache.org/jira/browse/BEAM-10138)
 * Java PubSubIO - In progress - [BEAM-7738](https://issues.apache.org/jira/browse/BEAM-7738)
+* Java SnowflakeIO - In progress - [BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897), [BEAM-9898](https://issues.apache.org/jira/browse/BEAM-9898)
 * Java SpannerIO - In progress - [BEAM-10139](https://issues.apache.org/jira/browse/BEAM-10139), [BEAM-10140](https://issues.apache.org/jira/browse/BEAM-10140)
 * Java SQL - completed - [BEAM-8603](https://issues.apache.org/jira/browse/BEAM-8603)