Revert global snake_case convention for SchemaTransforms (#31109)

* revert global snake_case convention and make it a special case for iceberg and managed

* remove docs and comments too

* cleanup

* revert python and yaml changes too

* fix test
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e3d6056..b268333 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to run",
-  "modification": 1
+  "modification": 2
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index cfd298a..d5c6c72 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
-import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.lang.reflect.ParameterizedType;
 import java.util.List;
@@ -28,10 +26,8 @@
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaProvider;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.values.Row;
 
@@ -45,9 +41,6 @@
  * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
  * SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
  *
- * <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
- * {@code snake_case} naming convention.
- *
  * <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
  * we provide implementations for more standard Beam transforms. We provide no backwards
  * compatibility guarantees and it should not be implemented outside of the Beam repository.
@@ -85,11 +78,10 @@
   }
 
   @Override
-  public final Schema configurationSchema() {
+  public Schema configurationSchema() {
     try {
       // Sort the fields by name to ensure a consistent schema is produced
-      // We also establish a `snake_case` convention for all SchemaTransform configurations
-      return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
+      return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
     } catch (NoSuchSchemaException e) {
       throw new RuntimeException(
           "Unable to find schema for "
@@ -98,12 +90,9 @@
     }
   }
 
-  /**
-   * Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
-   * `snake_case` naming convention.
-   */
+  /** Produces a {@link SchemaTransform} from a Row configuration. */
   @Override
-  public final SchemaTransform from(Row configuration) {
+  public SchemaTransform from(Row configuration) {
     return from(configFromRow(configuration));
   }
 
@@ -114,20 +103,9 @@
 
   private ConfigT configFromRow(Row configuration) {
     try {
-      SchemaRegistry registry = SchemaRegistry.createDefault();
-
-      // Configuration objects handled by the AutoValueSchema provider will expect Row fields with
-      // camelCase naming convention
-      SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
-      if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
-          && checkNotNull(
-                  ((DefaultSchemaProvider) schemaProvider)
-                      .getUnderlyingSchemaProvider(configurationClass()))
-              .getClass()
-              .equals(AutoValueSchema.class)) {
-        configuration = configuration.toCamelCase();
-      }
-      return registry.getFromRowFunction(configurationClass()).apply(configuration);
+      return SchemaRegistry.createDefault()
+          .getFromRowFunction(configurationClass())
+          .apply(configuration);
     } catch (NoSuchSchemaException e) {
       throw new RuntimeException(
           "Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index 2eef0e3..b1dc091 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -130,8 +130,8 @@
 
     Row inputConfig =
         Row.withSchema(provider.configurationSchema())
-            .withFieldValue("string_field", "field1")
-            .withFieldValue("integer_field", Integer.valueOf(13))
+            .withFieldValue("stringField", "field1")
+            .withFieldValue("integerField", Integer.valueOf(13))
             .build();
 
     Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
@@ -150,8 +150,8 @@
     SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
     Row inputConfig =
         Row.withSchema(provider.configurationSchema())
-            .withFieldValue("string_field", "field1")
-            .withFieldValue("integer_field", Integer.valueOf(13))
+            .withFieldValue("stringField", "field1")
+            .withFieldValue("integerField", Integer.valueOf(13))
             .build();
 
     assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index fb32e18..bfe2fab 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -25,6 +25,7 @@
 import org.apache.beam.sdk.managed.ManagedTransformConstants;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -131,4 +132,15 @@
       return PCollectionRowTuple.of(OUTPUT_TAG, output);
     }
   }
+
+  // TODO: set global snake_case naming convention and remove these special cases
+  @Override
+  public SchemaTransform from(Row rowConfig) {
+    return super.from(rowConfig.toCamelCase());
+  }
+
+  @Override
+  public Schema configurationSchema() {
+    return super.configurationSchema().toSnakeCase();
+  }
 }
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b490693..71183c6 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -176,4 +176,15 @@
       }
     }
   }
+
+  // TODO: set global snake_case naming convention and remove these special cases
+  @Override
+  public SchemaTransform from(Row rowConfig) {
+    return super.from(rowConfig.toCamelCase());
+  }
+
+  @Override
+  public Schema configurationSchema() {
+    return super.configurationSchema().toSnakeCase();
+  }
 }
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index bf9895e..f6e231c 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -112,17 +112,17 @@
 
     assertEquals(
         Sets.newHashSet(
-            "bootstrap_servers",
+            "bootstrapServers",
             "topic",
             "schema",
-            "auto_offset_reset_config",
-            "consumer_config_updates",
+            "autoOffsetResetConfig",
+            "consumerConfigUpdates",
             "format",
-            "confluent_schema_registry_subject",
-            "confluent_schema_registry_url",
-            "error_handling",
-            "file_descriptor_path",
-            "message_name"),
+            "confluentSchemaRegistrySubject",
+            "confluentSchemaRegistryUrl",
+            "errorHandling",
+            "fileDescriptorPath",
+            "messageName"),
         kafkaProvider.configurationSchema().getFields().stream()
             .map(field -> field.getName())
             .collect(Collectors.toSet()));
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index cb5088a..e13741e 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -198,7 +198,6 @@
     }
   }
 
-  /** */
   @VisibleForTesting
   static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
     // May return an empty row (perhaps the underlying transform doesn't have any required
@@ -209,4 +208,15 @@
   Map<String, SchemaTransformProvider> getAllProviders() {
     return schemaTransformProviders;
   }
+
+  // TODO: set global snake_case naming convention and remove these special cases
+  @Override
+  public SchemaTransform from(Row rowConfig) {
+    return super.from(rowConfig.toCamelCase());
+  }
+
+  @Override
+  public Schema configurationSchema() {
+    return super.configurationSchema().toSnakeCase();
+  }
 }
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index e9edf87..3a34654 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -51,7 +51,7 @@
 
   @Test
   public void testGetConfigRowFromYamlString() {
-    String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+    String yamlString = "extraString: abc\n" + "extraInteger: 123";
     ManagedConfig config =
         ManagedConfig.builder()
             .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
@@ -60,8 +60,8 @@
 
     Row expectedRow =
         Row.withSchema(TestSchemaTransformProvider.SCHEMA)
-            .withFieldValue("extra_string", "abc")
-            .withFieldValue("extra_integer", 123)
+            .withFieldValue("extraString", "abc")
+            .withFieldValue("extraInteger", 123)
             .build();
 
     Row returnedRow =
@@ -84,8 +84,8 @@
     Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
     Row expectedRow =
         Row.withSchema(configSchema)
-            .withFieldValue("extra_string", "abc")
-            .withFieldValue("extra_integer", 123)
+            .withFieldValue("extraString", "abc")
+            .withFieldValue("extraInteger", 123)
             .build();
     Row configRow =
         ManagedSchemaTransformProvider.getRowConfig(
@@ -96,7 +96,7 @@
 
   @Test
   public void testBuildWithYamlString() {
-    String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+    String yamlString = "extraString: abc\n" + "extraInteger: 123";
 
     ManagedConfig config =
         ManagedConfig.builder()
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
index b4b41de..7a41897 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
@@ -84,7 +84,7 @@
 
   @Test
   public void testReCreateTransformFromRowWithConfig() {
-    String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+    String yamlString = "extraString: abc\n" + "extraInteger: 123";
 
     ManagedConfig originalConfig =
         ManagedConfig.builder()
@@ -123,8 +123,8 @@
             .setRowSchema(inputSchema);
     Map<String, Object> underlyingConfig =
         ImmutableMap.<String, Object>builder()
-            .put("extra_string", "abc")
-            .put("extra_integer", 123)
+            .put("extraString", "abc")
+            .put("extraInteger", 123)
             .build();
     String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
     Managed.ManagedTransform transform =
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
index 7ed364d..2600854 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -90,7 +90,7 @@
             .setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
             .build()
             .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
-            .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));
+            .withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));
 
     runTestProviderTest(writeOp);
   }
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml
index 7725c32..3967b60 100644
--- a/sdks/java/managed/src/test/resources/test_config.yaml
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -17,5 +17,5 @@
 # under the License.
 #
 
-extra_string: "abc"
-extra_integer: 123
\ No newline at end of file
+extraString: "abc"
+extraInteger: 123
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index d89ce71..43bd170 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2574,13 +2574,13 @@
             expansion_service=self._expansion_service,
             rearrange_based_on_discovery=True,
             table=table,
-            create_disposition=self._create_disposition,
-            write_disposition=self._write_disposition,
-            triggering_frequency_seconds=self._triggering_frequency,
-            auto_sharding=self._with_auto_sharding,
-            num_streams=self._num_storage_api_streams,
-            use_at_least_once_semantics=self._use_at_least_once,
-            error_handling={
+            createDisposition=self._create_disposition,
+            writeDisposition=self._write_disposition,
+            triggeringFrequencySeconds=self._triggering_frequency,
+            autoSharding=self._with_auto_sharding,
+            numStreams=self._num_storage_api_streams,
+            useAtLeastOnceSemantics=self._use_at_least_once,
+            errorHandling={
                 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
             }))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py
index 0f3944a..f8534f3 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -225,9 +225,9 @@
           identifier=self.schematransform_config.identifier,
           expansion_service=self._expansion_service,
           rearrange_based_on_discovery=True,
-          table_id=self._table_id,
-          instance_id=self._instance_id,
-          project_id=self._project_id)
+          tableId=self._table_id,
+          instanceId=self._instance_id,
+          projectId=self._project_id)
 
       return (
           input
@@ -323,9 +323,9 @@
         identifier=self.schematransform_config.identifier,
         expansion_service=self._expansion_service,
         rearrange_based_on_discovery=True,
-        table_id=self._table_id,
-        instance_id=self._instance_id,
-        project_id=self._project_id)
+        tableId=self._table_id,
+        instanceId=self._instance_id,
+        projectId=self._project_id)
 
     return (
         input.pipeline
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 67adda5..2799bd1 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -39,6 +39,32 @@
   return output
 
 
+def snake_case_to_lower_camel_case(string):
+  """Convert snake_case to lowerCamelCase"""
+  if len(string) <= 1:
+    return string.lower()
+  upper = snake_case_to_upper_camel_case(string)
+  return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = []
+  word = []
+  for i, n in enumerate(string):
+    # If seeing an upper letter after a lower letter, we just witnessed a word
+    # If seeing an upper letter and the next letter is lower, we may have just
+    # witnessed an all caps word
+    if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+                        (i + 1 < len(string) and string[i + 1].islower())):
+      arr.append(''.join(word))
+      word = [n.lower()]
+    else:
+      word.append(n.lower())
+  arr.append(''.join(word))
+  return '_'.join(arr).strip('_')
+
+
 # Information regarding a Wrapper parameter.
 ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
 
@@ -50,7 +76,7 @@
   descriptions = schematransform.configuration_schema._field_descriptions
   fields_with_descriptions = {}
   for field in schema.fields:
-    fields_with_descriptions[field.name] = ParamInfo(
+    fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
         typing_from_runner_api(field.type),
         descriptions[field.name],
         field.name)
@@ -79,11 +105,16 @@
         expansion_service or self.default_expansion_service
 
   def expand(self, input):
+    camel_case_kwargs = {
+        snake_case_to_lower_camel_case(k): v
+        for k, v in self._kwargs.items()
+    }
+
     external_schematransform = SchemaAwareExternalTransform(
         identifier=self.identifier,
         expansion_service=self._expansion_service,
         rearrange_based_on_discovery=True,
-        **self._kwargs)
+        **camel_case_kwargs)
 
     return input | external_schematransform
 
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
index 95720ce..a53001c 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -37,7 +37,9 @@
 from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN
 from apache_beam.transforms.external_transform_provider import ExternalTransform
 from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
+from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case
 from apache_beam.transforms.external_transform_provider import infer_name_from_identifier
+from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case
 from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case
 from apache_beam.transforms.xlang.io import GenerateSequence
 
@@ -52,6 +54,26 @@
     for case in test_cases:
       self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
 
+  def test_snake_case_to_lower_camel_case(self):
+    test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
+                  ("test_double_underscore", "testDoubleUnderscore"),
+                  ("TEST_CAPITALIZED", "testCapitalized"),
+                  ("_prepended_underscore", "prependedUnderscore"),
+                  ("appended_underscore_", "appendedUnderscore")]
+    for case in test_cases:
+      self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
+
+  def test_camel_case_to_snake_case(self):
+    test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
+                  ("TestDoubleUnderscore",
+                   "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
+                  ("BEGINNINGAllCaps",
+                   "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
+                  ("AllCapsMIDDLEWord", "all_caps_middle_word"),
+                  ("lowerCamelCase", "lower_camel_case")]
+    for case in test_cases:
+      self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
+
   def test_infer_name_from_identifier(self):
     standard_test_cases = [
         ("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py
index 1e9c7c6..5f53302 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -889,7 +889,7 @@
     return java_provider.create_transform(
         'WindowIntoStrategy',
         {
-            'serialized_windowing_strategy': windowing_strategy.to_runner_api(
+            'serializedWindowingStrategy': windowing_strategy.to_runner_api(
                 empty_context).SerializeToString()
         },
         None)
diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py
index ea4f496..a75fc05 100644
--- a/sdks/python/gen_xlang_wrappers.py
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -233,6 +233,24 @@
   return (tp, nullable)
 
 
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = []
+  word = []
+  for i, n in enumerate(string):
+    # If seeing an upper letter after a lower letter, we just witnessed a word
+    # If seeing an upper letter and the next letter is lower, we may have just
+    # witnessed an all caps word
+    if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+                        (i + 1 < len(string) and string[i + 1].islower())):
+      arr.append(''.join(word))
+      word = [n.lower()]
+    else:
+      word.append(n.lower())
+  arr.append(''.join(word))
+  return '_'.join(arr).strip('_')
+
+
 def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
   """
   Generates code for external transform wrapper classes (subclasses of
@@ -269,8 +287,9 @@
 
       parameters = []
       for param, info in fields.items():
+        pythonic_name = camel_case_to_snake_case(param)
         param_details = {
-            "name": param,
+            "name": pythonic_name,
             "type": info['type'],
             "description": info['description'],
         }