Merge pull request #29130 [YAML] Guard javascript UDFs with experimental feature enablement.

diff --git a/.github/workflows/beam_IODatastoresCredentialsRotation.yml b/.github/workflows/beam_IODatastoresCredentialsRotation.yml
index 7a402e4..0ededda 100644
--- a/.github/workflows/beam_IODatastoresCredentialsRotation.yml
+++ b/.github/workflows/beam_IODatastoresCredentialsRotation.yml
@@ -77,4 +77,20 @@
       - name: Completing the rotation
         run: |
           gcloud container clusters update io-datastores --complete-credential-rotation --zone=us-central1-a --quiet
-# TODO: Send email to dev@beam.apache.org if something went wrong during credentials rotation
\ No newline at end of file
+      - name: Generate Date
+        run: |
+          date=$(date -u +"%Y-%m-%d")
+          echo "date=$date" >> $GITHUB_ENV
+      - name: Send email
+        uses: dawidd6/action-send-mail@v3
+        with:
+          server_address: smtp.gmail.com
+          server_port: 465
+          secure: true
+          username: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_ADDRESS }}
+          password: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_PASSWORD }}
+          subject: Credentials Rotation Failure on IO-Datastores cluster (${{ env.date }})
+          to: dev@beam.apache.org
+          from: gactions@beam.apache.org
+          body: |
+            Something went wrong during the automatic credentials rotation for IO-Datastores Cluster, performed at ${{ env.date }}. It may be necessary to check the state of the cluster certificates. For further details refer to the following links:\n * Failing job: https://github.com/apache/beam/actions/workflows/beam_IODatastoresCredentialsRotation.yml \n * Job configuration: https://github.com/apache/beam/blob/master/.github/workflows/beam_IODatastoresCredentialsRotation.yml \n * Cluster URL: https://pantheon.corp.google.com/kubernetes/clusters/details/us-central1-a/io-datastores/details?mods=dataflow_dev&project=apache-beam-testing
\ No newline at end of file
diff --git a/.github/workflows/beam_MetricsCredentialsRotation.yml b/.github/workflows/beam_MetricsCredentialsRotation.yml
index 7b97270..eda3ec3 100644
--- a/.github/workflows/beam_MetricsCredentialsRotation.yml
+++ b/.github/workflows/beam_MetricsCredentialsRotation.yml
@@ -49,8 +49,9 @@
 jobs:
   beam_MetricsCredentialsRotation:
     if: |
-      github.event_name == 'workflow_dispatch' ||
-      github.event_name == 'schedule'
+      (github.event_name == 'workflow_dispatch' ||
+      github.event_name == 'schedule') &&
+      github.repository == 'apache/beam'
     runs-on: [self-hosted, ubuntu-20.04, main]
     timeout-minutes: 100
     name: ${{ matrix.job_name }}
@@ -77,4 +78,20 @@
       - name: Completing the rotation
         run: |
           gcloud container clusters update metrics --complete-credential-rotation --zone=us-central1-a --quiet
-# TODO: Send email to dev@beam.apache.org if something went wrong during credentials rotation
\ No newline at end of file
+      - name: Generate Date
+        run: |
+          date=$(date -u +"%Y-%m-%d")
+          echo "date=$date" >> $GITHUB_ENV
+      - name: Send email
+        uses: dawidd6/action-send-mail@v3
+        with:
+          server_address: smtp.gmail.com
+          server_port: 465
+          secure: true
+          username: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_ADDRESS }}
+          password: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_PASSWORD }}
+          subject: Credentials Rotation Failure on Metrics cluster (${{ env.date }})
+          to: dev@beam.apache.org
+          from: gactions@beam.apache.org
+          body: |
+            Something went wrong during the automatic credentials rotation for Metrics Cluster, performed at ${{ env.date }}. It may be necessary to check the state of the cluster certificates. For further details refer to the following links:\n * Failing job: https://github.com/apache/beam/actions/workflows/beam_MetricsCredentialsRotation.yml \n * Job configuration: https://github.com/apache/beam/blob/master/.github/workflows/beam_MetricsCredentialsRotation.yml \n * Cluster URL: https://pantheon.corp.google.com/kubernetes/clusters/details/us-central1-a/metrics/details?mods=dataflow_dev&project=apache-beam-testing
\ No newline at end of file
diff --git a/.github/workflows/beam_Metrics_Report.yml b/.github/workflows/beam_Metrics_Report.yml
index 0c88171..9c4a540 100644
--- a/.github/workflows/beam_Metrics_Report.yml
+++ b/.github/workflows/beam_Metrics_Report.yml
@@ -60,9 +60,10 @@
         job_name: [beam_Metrics_Report]
         job_phrase: [Run Metrics Report]
     if: |
-      github.event_name == 'schedule' ||
+      (github.event_name == 'schedule' ||
       github.event_name == 'workflow_dispatch' ||
-      github.event.comment.body == 'Run Metrics Report'
+      github.event.comment.body == 'Run Metrics Report') &&
+      github.repository == 'apache/beam'
     steps:
       - uses: actions/checkout@v3
       - name: Setup repository
diff --git a/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml b/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml
index 9cff830..15ab008 100644
--- a/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml
+++ b/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml
@@ -53,10 +53,11 @@
 jobs:
   beam_PostCommit_BeamMetrics_Publish:
     if: |
-      github.event_name == 'push' ||
+      (github.event_name == 'push' ||
       github.event_name == 'workflow_dispatch' ||
       github.event_name == 'schedule' ||
-      github.event.comment.body == 'Run Beam Metrics Deployment'
+      github.event.comment.body == 'Run Beam Metrics Deployment') &&
+      github.repository == 'apache/beam'
     runs-on: [self-hosted, ubuntu-20.04, main]
     timeout-minutes: 100
     name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
diff --git a/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml b/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml
index 9e48325..95f993b 100644
--- a/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml
+++ b/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml
@@ -50,8 +50,9 @@
 jobs:
   beam_Publish_Beam_SDK_Snapshots:
     if: |
-      github.event_name == 'workflow_dispatch' ||
-      github.event_name == 'schedule'
+      (github.event_name == 'workflow_dispatch' ||
+      github.event_name == 'schedule') &&
+      github.repository == 'apache/beam'
     runs-on: [self-hosted, ubuntu-20.04, main]
     timeout-minutes: 100
     name: ${{ matrix.job_name }} (${{ matrix.container_task }})
diff --git a/.github/workflows/beam_Publish_Docker_Snapshots.yml b/.github/workflows/beam_Publish_Docker_Snapshots.yml
index 01b846e..1abc268 100644
--- a/.github/workflows/beam_Publish_Docker_Snapshots.yml
+++ b/.github/workflows/beam_Publish_Docker_Snapshots.yml
@@ -50,9 +50,10 @@
 jobs:
   beam_Publish_Docker_Snapshots:
     if: |
-      github.event_name == 'workflow_dispatch' ||
+      (github.event_name == 'workflow_dispatch' ||
       github.event_name == 'schedule' ||
-      github.event.comment.body == 'Publish Docker Snapshots'
+      github.event.comment.body == 'Publish Docker Snapshots') &&
+      github.repository == 'apache/beam'
     runs-on: [self-hosted, ubuntu-20.04, main]
     timeout-minutes: 100
     name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
diff --git a/.github/workflows/beam_Release_NightlySnapshot.yml b/.github/workflows/beam_Release_NightlySnapshot.yml
index bf22344..735d0e4 100644
--- a/.github/workflows/beam_Release_NightlySnapshot.yml
+++ b/.github/workflows/beam_Release_NightlySnapshot.yml
@@ -50,8 +50,9 @@
         job_name: [beam_Release_NightlySnapshot]
         job_phrase: [Release Nightly Snapshot]
     if: |
-      github.event_name == 'workflow_dispatch' ||
-      github.event_name == 'schedule'
+      (github.event_name == 'workflow_dispatch' ||
+      github.event_name == 'schedule') &&
+      github.repository == 'apache/beam'
     steps:
       - uses: actions/checkout@v4
       - name: Setup repository
diff --git a/.github/workflows/beam_Release_Python_NightlySnapshot.yml b/.github/workflows/beam_Release_Python_NightlySnapshot.yml
index a9f4ac5..fe13235 100644
--- a/.github/workflows/beam_Release_Python_NightlySnapshot.yml
+++ b/.github/workflows/beam_Release_Python_NightlySnapshot.yml
@@ -49,8 +49,9 @@
       matrix:
         job_name: [beam_Release_Python_NightlySnapshot]
         job_phrase: [Release Nightly Snapshot Python]
-    if: github.event_name == 'workflow_dispatch' ||
-        github.event_name == 'schedule'
+    if: (github.event_name == 'workflow_dispatch' ||
+        github.event_name == 'schedule') &&
+        github.repository == 'apache/beam'
 
     steps:
       - uses: actions/checkout@v4
diff --git a/.github/workflows/build_runner_image.yml b/.github/workflows/build_runner_image.yml
index 6071d93..c0f4cac 100644
--- a/.github/workflows/build_runner_image.yml
+++ b/.github/workflows/build_runner_image.yml
@@ -30,6 +30,7 @@
   docker_repo: apache-beam-testing/beam-github-actions/beam-arc-runner
 jobs:
   build-and-version-runner:
+    if: github.repository == 'apache/beam'
     env:
         working-directory: .github/gh-actions-self-hosted-runners/arc/images/
     runs-on: [self-hosted, ubuntu-20.04]
diff --git a/CHANGES.md b/CHANGES.md
index 6d67ccd..0eee0a3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -86,6 +86,7 @@
 
 * Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's BigtableIO.BigtableSource when you have more cores than bytes to read (Java) [#28793](https://github.com/apache/beam/issues/28793).
 * `watch_file_pattern` arg of the [RunInference](https://github.com/apache/beam/blob/104c10b3ee536a9a3ea52b4dbf62d86b669da5d9/sdks/python/apache_beam/ml/inference/base.py#L997) arg had no effect prior to 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0, follow the documentation at https://beam.apache.org/documentation/ml/side-input-updates/ and use `WatchFilePattern` PTransform as a SideInput. ([#28948](https://github.com/apache/beam/pulls/28948))
+* `MLTransform` doesn't output artifacts such as min, max and quantiles. Instead, `MLTransform` will add a feature to output these artifacts as human readable format - [#29017](https://github.com/apache/beam/issues/29017). For now, to use the artifacts such as min and max that were produced by the eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which reads artifacts that were produced earlier in a different `MLTransform` ([#29016](https://github.com/apache/beam/pull/29016/))
 
 ## Security Fixes
 * Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index e4f3468..9e4d28a 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -304,6 +304,7 @@
       commandLine "docker", "rmi", "--force", "${dockerJavaImageName}"
     }
     exec {
+      ignoreExitValue true
       commandLine "gcloud", "--quiet", "container", "images", "untag", "${dockerJavaImageName}"
     }
     exec {
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py
index 1d2197e..0db1071 100644
--- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py
@@ -31,7 +31,7 @@
   import tensorflow_transform as tft  # pylint: disable=unused-import
   from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1
   from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary
-  from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data
+  from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_scalar
 except ImportError:
   raise unittest.SkipTest('tensorflow_transform is not installed.')
 
@@ -46,8 +46,8 @@
 
 def check_mltransform_scale_to_0_1():
   expected = '''[START mltransform_scale_to_0_1]
-Row(x=array([0.       , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
-Row(x=array([0.42857143, 0.14285715, 1.        ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
+Row(x=array([0.       , 0.5714286, 0.2857143], dtype=float32))
+Row(x=array([0.42857143, 0.14285715, 1.        ], dtype=float32))
   [END mltransform_scale_to_0_1] '''.splitlines()[1:-1]
   return expected
 
@@ -80,7 +80,7 @@
     self.assertEqual(predicted, expected)
 
   def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout):
-    mltransform_compute_and_apply_vocabulary_with_non_columnar_data()
+    mltransform_compute_and_apply_vocabulary_with_scalar()
     predicted = mock_stdout.getvalue().splitlines()
     expected = check_mltransform_compute_and_apply_vocabulary_with_scalar()
     self.assertEqual(predicted, expected)
diff --git a/sdks/python/apache_beam/ml/transforms/base.py b/sdks/python/apache_beam/ml/transforms/base.py
index a0bc4a9..b3a30bb 100644
--- a/sdks/python/apache_beam/ml/transforms/base.py
+++ b/sdks/python/apache_beam/ml/transforms/base.py
@@ -67,16 +67,6 @@
       inputs: input data.
     """
 
-  @abc.abstractmethod
-  def get_artifacts(
-      self, data: OperationInputT,
-      output_column_prefix: str) -> Optional[Dict[str, OperationOutputT]]:
-    """
-    If the operation generates any artifacts, they can be returned from this
-    method.
-    """
-    pass
-
   def __call__(self, data: OperationInputT,
                output_column_name: str) -> Dict[str, OperationOutputT]:
     """
@@ -84,9 +74,6 @@
     This method will invoke the apply() method of the class.
     """
     transformed_data = self.apply_transform(data, output_column_name)
-    artifacts = self.get_artifacts(data, output_column_name)
-    if artifacts:
-      transformed_data = {**transformed_data, **artifacts}
     return transformed_data
 
   def get_counter(self):
diff --git a/sdks/python/apache_beam/ml/transforms/handlers_test.py b/sdks/python/apache_beam/ml/transforms/handlers_test.py
index 3342ec7..327c8c7 100644
--- a/sdks/python/apache_beam/ml/transforms/handlers_test.py
+++ b/sdks/python/apache_beam/ml/transforms/handlers_test.py
@@ -58,14 +58,6 @@
     return {output_column_name: inputs * 10}
 
 
-class _FakeOperationWithArtifacts(TFTOperation):
-  def apply_transform(self, inputs, output_column_name, **kwargs):
-    return {output_column_name: inputs}
-
-  def get_artifacts(self, data, col_name):
-    return {'artifact': tf.convert_to_tensor([1])}
-
-
 class IntType(NamedTuple):
   x: int
 
@@ -106,16 +98,6 @@
     actual_result = process_handler.process_data_fn(inputs)
     self.assertDictEqual(actual_result, expected_result)
 
-  def test_preprocessing_fn_with_artifacts(self):
-    process_handler = handlers.TFTProcessHandler(
-        transforms=[_FakeOperationWithArtifacts(columns=['x'])],
-        artifact_location=self.artifact_location)
-    inputs = {'x': [1, 2, 3]}
-    preprocessing_fn = process_handler.process_data_fn
-    actual_result = preprocessing_fn(inputs)
-    expected_result = {'x': [1, 2, 3], 'artifact': tf.convert_to_tensor([1])}
-    self.assertDictEqual(actual_result, expected_result)
-
   def test_input_type_from_schema_named_tuple_pcoll(self):
     data = [{'x': 1}]
     with beam.Pipeline() as p:
diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py
index 1d49264..c7b8ff0 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -45,9 +45,7 @@
 import tensorflow as tf
 import tensorflow_transform as tft
 from apache_beam.ml.transforms.base import BaseOperation
-from tensorflow_transform import analyzers
 from tensorflow_transform import common_types
-from tensorflow_transform import tf_utils
 
 __all__ = [
     'ComputeAndApplyVocabulary',
@@ -77,6 +75,8 @@
   return wrapper
 
 
+# TODO: https://github.com/apache/beam/pull/29016
+# Add support for outputting artifacts to a text file in human readable form.
 class TFTOperation(BaseOperation[common_types.TensorType,
                                  common_types.TensorType]):
   def __init__(self, columns: List[str]) -> None:
@@ -95,13 +95,6 @@
           "Columns are not specified. Please specify the column for the "
           " op %s" % self.__class__.__name__)
 
-  def get_artifacts(self, data: common_types.TensorType,
-                    col_name: str) -> Dict[str, common_types.TensorType]:
-    """
-    Returns the artifacts generated by the operation.
-    """
-    return {}
-
   @tf.function
   def _split_string_with_delimiter(self, data, delimiter):
     """
@@ -240,15 +233,6 @@
     }
     return output_dict
 
-  def get_artifacts(self, data: common_types.TensorType,
-                    col_name: str) -> Dict[str, common_types.TensorType]:
-    mean_var = tft.analyzers._mean_and_var(data)
-    shape = [tf.shape(data)[0], 1]
-    return {
-        col_name + '_mean': tf.broadcast_to(mean_var[0], shape),
-        col_name + '_var': tf.broadcast_to(mean_var[1], shape),
-    }
-
 
 @register_input_dtype(float)
 class ScaleTo01(TFTOperation):
@@ -280,14 +264,6 @@
     self.elementwise = elementwise
     self.name = name
 
-  def get_artifacts(self, data: common_types.TensorType,
-                    col_name: str) -> Dict[str, common_types.TensorType]:
-    shape = [tf.shape(data)[0], 1]
-    return {
-        col_name + '_min': tf.broadcast_to(tft.min(data), shape),
-        col_name + '_max': tf.broadcast_to(tft.max(data), shape)
-    }
-
   def apply_transform(
       self, data: common_types.TensorType,
       output_column_name: str) -> Dict[str, common_types.TensorType]:
@@ -368,34 +344,6 @@
     self.elementwise = elementwise
     self.name = name
 
-  def get_artifacts(self, data: common_types.TensorType,
-                    col_name: str) -> Dict[str, common_types.TensorType]:
-    num_buckets = self.num_buckets
-    epsilon = self.epsilon
-    elementwise = self.elementwise
-
-    if num_buckets < 1:
-      raise ValueError('Invalid num_buckets %d' % num_buckets)
-
-    if isinstance(data, (tf.SparseTensor, tf.RaggedTensor)) and elementwise:
-      raise ValueError(
-          'bucketize requires `x` to be dense if `elementwise=True`')
-
-    x_values = tf_utils.get_values(data)
-
-    if epsilon is None:
-      # See explanation in args documentation for epsilon.
-      epsilon = min(1.0 / num_buckets, 0.01)
-
-    quantiles = analyzers.quantiles(
-        x_values, num_buckets, epsilon, reduce_instance_dims=not elementwise)
-    shape = [
-        tf.shape(data)[0], num_buckets - 1 if num_buckets > 1 else num_buckets
-    ]
-    # These quantiles are used as the bucket boundaries in the later stages.
-    # Should we change the prefix _quantiles to _bucket_boundaries?
-    return {col_name + '_quantiles': tf.broadcast_to(quantiles, shape)}
-
   def apply_transform(
       self, data: common_types.TensorType,
       output_column_name: str) -> Dict[str, common_types.TensorType]:
@@ -572,6 +520,7 @@
       ngram_range: Tuple[int, int] = (1, 1),
       ngrams_separator: Optional[str] = None,
       compute_word_count: bool = False,
+      key_vocab_filename: str = 'key_vocab_mapping',
       name: Optional[str] = None,
   ):
     """
@@ -592,9 +541,9 @@
         n-gram sizes.
       seperator: A string that will be inserted between each ngram.
       compute_word_count: A boolean that specifies whether to compute
-        the unique word count and add it as an artifact to the output.
-        Note that the count will be computed over the entire dataset so
-        it will be the same value for all inputs.
+        the unique word count over the entire dataset. Defaults to False.
+      key_vocab_filename: The file name for the key vocabulary file when
+        compute_word_count is True.
       name: A name for the operation (optional).
 
     Note that original order of the input may not be preserved.
@@ -605,33 +554,26 @@
     self.ngrams_separator = ngrams_separator
     self.name = name
     self.split_string_by_delimiter = split_string_by_delimiter
+    self.key_vocab_filename = key_vocab_filename
     if compute_word_count:
       self.compute_word_count_fn = count_unqiue_words
     else:
-      self.compute_word_count_fn = lambda *args, **kwargs: {}
+      self.compute_word_count_fn = lambda *args, **kwargs: None
 
     if ngram_range != (1, 1) and not ngrams_separator:
       raise ValueError(
           'ngrams_separator must be specified when ngram_range is not (1, 1)')
 
-  def get_artifacts(self, data: tf.SparseTensor,
-                    col_name: str) -> Dict[str, tf.Tensor]:
-    return self.compute_word_count_fn(data, col_name)
-
   def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
     if self.split_string_by_delimiter:
       data = self._split_string_with_delimiter(
           data, self.split_string_by_delimiter)
     output = tft.bag_of_words(
         data, self.ngram_range, self.ngrams_separator, self.name)
+    # word counts are written to the key_vocab_filename
+    self.compute_word_count_fn(data, self.key_vocab_filename)
     return {output_col_name: output}
 
 
-def count_unqiue_words(data: tf.SparseTensor,
-                       output_col_name: str) -> Dict[str, tf.Tensor]:
-  keys, count = tft.count_per_key(data)
-  shape = [tf.shape(data)[0], tf.shape(keys)[0]]
-  return {
-      output_col_name + '_unique_elements': tf.broadcast_to(keys, shape),
-      output_col_name + '_counts': tf.broadcast_to(count, shape)
-  }
+def count_unqiue_words(data: tf.SparseTensor, output_vocab_name: str) -> None:
+  tft.count_per_key(data, key_vocabulary_filename=output_vocab_name)
diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py
index 41f59c8..38ded6a 100644
--- a/sdks/python/apache_beam/ml/transforms/tft_test.py
+++ b/sdks/python/apache_beam/ml/transforms/tft_test.py
@@ -17,6 +17,7 @@
 
 # pytype: skip-file
 
+import os
 import shutil
 import tempfile
 import unittest
@@ -38,31 +39,6 @@
 if not tft:
   raise unittest.SkipTest('tensorflow_transform is not installed.')
 
-z_score_expected = {'x_mean': 3.5, 'x_var': 2.9166666666666665}
-
-
-def assert_z_score_artifacts(element):
-  element = element.as_dict()
-  assert 'x_mean' in element
-  assert 'x_var' in element
-  assert element['x_mean'] == z_score_expected['x_mean']
-  assert element['x_var'] == z_score_expected['x_var']
-
-
-def assert_ScaleTo01_artifacts(element):
-  element = element.as_dict()
-  assert 'x_min' in element
-  assert 'x_max' in element
-  assert element['x_min'] == 1
-  assert element['x_max'] == 6
-
-
-def assert_bucketize_artifacts(element):
-  element = element.as_dict()
-  assert 'x_quantiles' in element
-  assert np.array_equal(
-      element['x_quantiles'], np.array([3, 5], dtype=np.float32))
-
 
 class ScaleZScoreTest(unittest.TestCase):
   def setUp(self) -> None:
@@ -100,7 +76,18 @@
           | "MLTransform" >> base.MLTransform(
               write_artifact_location=self.artifact_location).with_transform(
                   tft.ScaleToZScore(columns=['x'])))
-      _ = (result | beam.Map(assert_z_score_artifacts))
+      expected_data = [
+          np.array([-1.46385], dtype=np.float32),
+          np.array([-0.87831], dtype=np.float32),
+          np.array([-0.29277], dtype=np.float32),
+          np.array([0.29277], dtype=np.float32),
+          np.array([0.87831], dtype=np.float32),
+          np.array([1.46385], dtype=np.float32),
+      ]
+
+      actual_data = (result | beam.Map(lambda x: x.x))
+      assert_that(
+          actual_data, equal_to(expected_data, equals_fn=np.array_equal))
 
   def test_z_score_list_data(self):
     list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
@@ -111,7 +98,14 @@
           | "listMLTransform" >> base.MLTransform(
               write_artifact_location=self.artifact_location).with_transform(
                   tft.ScaleToZScore(columns=['x'])))
-      _ = (list_result | beam.Map(assert_z_score_artifacts))
+
+      expected_data = [
+          np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32),
+          np.array([0.29277, 0.87831, 1.46385], dtype=np.float32)
+      ]
+      actual_data = (list_result | beam.Map(lambda x: x.x))
+      assert_that(
+          actual_data, equal_to(expected_data, equals_fn=np.array_equal))
 
 
 class ScaleTo01Test(unittest.TestCase):
@@ -130,7 +124,6 @@
           | "MLTransform" >> base.MLTransform(
               write_artifact_location=self.artifact_location).with_transform(
                   tft.ScaleTo01(columns=['x'])))
-      _ = (list_result | beam.Map(assert_ScaleTo01_artifacts))
 
       expected_output = [
           np.array([0, 0.2, 0.4], dtype=np.float32),
@@ -150,7 +143,6 @@
               write_artifact_location=self.artifact_location).with_transform(
                   tft.ScaleTo01(columns=['x'])))
 
-      _ = (result | beam.Map(assert_ScaleTo01_artifacts))
       expected_output = (
           np.array([0], dtype=np.float32),
           np.array([0.2], dtype=np.float32),
@@ -179,7 +171,6 @@
           | "MLTransform" >> base.MLTransform(
               write_artifact_location=self.artifact_location).with_transform(
                   tft.Bucketize(columns=['x'], num_buckets=3)))
-      _ = (result | beam.Map(assert_bucketize_artifacts))
 
       transformed_data = (result | beam.Map(lambda x: x.x))
       expected_data = [
@@ -202,8 +193,6 @@
           | "MLTransform" >> base.MLTransform(
               write_artifact_location=self.artifact_location).with_transform(
                   tft.Bucketize(columns=['x'], num_buckets=3)))
-      _ = (list_result | beam.Map(assert_bucketize_artifacts))
-
       transformed_data = (
           list_result
           | "TransformedColumnX" >> beam.Map(lambda ele: ele.x))
@@ -214,36 +203,6 @@
       assert_that(
           transformed_data, equal_to(expected_data, equals_fn=np.array_equal))
 
-  @parameterized.expand([
-      (range(1, 10), [4, 7]),
-      (range(9, 0, -1), [4, 7]),
-      (range(19, 0, -1), [10]),
-      (range(1, 100), [25, 50, 75]),
-      # similar to the above but with odd number of elements
-      (range(1, 100, 2), [25, 51, 75]),
-      (range(99, 0, -1), range(10, 100, 10))
-  ])
-  def test_bucketize_boundaries(self, test_input, expected_boundaries):
-    # boundaries are outputted as artifacts for the Bucketize transform.
-    data = [{'x': [i]} for i in test_input]
-    num_buckets = len(expected_boundaries) + 1
-    with beam.Pipeline() as p:
-      result = (
-          p
-          | "Create" >> beam.Create(data)
-          | "MLTransform" >> base.MLTransform(
-              write_artifact_location=self.artifact_location).with_transform(
-                  tft.Bucketize(columns=['x'], num_buckets=num_buckets)))
-      actual_boundaries = (
-          result
-          | beam.Map(lambda x: x.as_dict())
-          | beam.Map(lambda x: x['x_quantiles']))
-
-      def assert_boundaries(actual_boundaries):
-        assert np.array_equal(actual_boundaries, expected_boundaries)
-
-      _ = (actual_boundaries | beam.Map(assert_boundaries))
-
 
 class ApplyBucketsTest(unittest.TestCase):
   def setUp(self) -> None:
@@ -731,10 +690,6 @@
       assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
 
   def test_count_per_key_on_list(self):
-    def map_element_to_count(elements, counts):
-      d = {elements[i]: counts[i] for i in range(len(elements))}
-      return d
-
     data = [{
         'x': ['I', 'like', 'pie', 'pie', 'pie'],
     }, {
@@ -743,25 +698,28 @@
         'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple']
     }]
     with beam.Pipeline() as p:
-      result = (
+      _ = (
           p
           | "Create" >> beam.Create(data)
           | "MLTransform" >> base.MLTransform(
               write_artifact_location=self.artifact_location,
               transforms=[
-                  tft.BagOfWords(columns=['x'], compute_word_count=True)
+                  tft.BagOfWords(
+                      columns=['x'],
+                      compute_word_count=True,
+                      key_vocab_filename='my_vocab')
               ]))
 
-      # the unique elements and counts are artifacts and will be
-      # stored in the result and same for all the elements in the
-      # PCollection.
-      result = result | beam.Map(
-          lambda x: map_element_to_count(x.x_unique_elements, x.x_counts))
+    def validate_count_per_key(key_vocab_filename):
+      key_vocab_location = os.path.join(
+          self.artifact_location, 'transform_fn/assets', key_vocab_filename)
+      with open(key_vocab_location, 'r') as f:
+        key_vocab_list = [line.strip() for line in f]
+      return key_vocab_list
 
-      expected_data = [{
-          b'Apple': 4, b'Banana': 2, b'I': 1, b'like': 1, b'pie': 4, b'yum': 2
-      }] * 3  # since there are 3 elements in input.
-      assert_that(result, equal_to(expected_data))
+    expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana']
+    actual_data = validate_count_per_key('my_vocab')
+    self.assertEqual(expected_data, actual_data)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 1e797d9..e4cf09c 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -162,7 +162,7 @@
   holdup==1.8.0
 extras =
   gcp
-allowlist_externals = 
+allowlist_externals =
   bash
   echo
   sleep
@@ -194,7 +194,7 @@
 extras =
   azure
 passenv = REQUESTS_CA_BUNDLE
-allowlist_externals = 
+allowlist_externals =
   wget
   az
   bash
@@ -311,11 +311,12 @@
   # Run all DataFrame API unit tests
   bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'
 
-[testenv:py{38,39}-tft-113]
+[testenv:py{38,39}-tft-{113,114}]
 deps =
   113: tensorflow_transform>=1.13.0,<1.14.0
+  114: tensorflow_transform>=1.14.0,<1.15.0
 commands =
-  bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms'
+  bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py'
 
 [testenv:py{38,39,310,311}-pytorch-{19,110,111,112,113}]
 deps =
diff --git a/website/www/site/content/en/documentation/ml/preprocess-data.md b/website/www/site/content/en/documentation/ml/preprocess-data.md
index cb79aff..2b291b9 100644
--- a/website/www/site/content/en/documentation/ml/preprocess-data.md
+++ b/website/www/site/content/en/documentation/ml/preprocess-data.md
@@ -105,7 +105,7 @@
 When you use the `write_artifact_location` parameter, the `MLTransform` class runs the
 specified transformations on the dataset and then creates artifacts from these
 transformations. The artifacts are stored in the location that you specify in
-the `write_artifact_location` parameter or in the `MLTransform` output.
+the `write_artifact_location` parameter.
 
 Write mode is useful when you want to store the results of your transformations
 for future use. For example, if you apply the same transformations on a
@@ -120,8 +120,7 @@
     The `ComputeAndApplyVocabulary`
     transform outputs the indices of the vocabulary to the vocabulary file.
 -   The `ScaleToZScore` transform calculates the mean and variance over the entire dataset
-    and then normalizes the entire dataset using the mean and variance. The
-    mean and variance are outputted by the `MLTransform` operation.
+    and then normalizes the entire dataset using the mean and variance.
     When you use the `write_artifact_location` parameter, these
     values are stored as a `tensorflow` graph in the location specified by
     the `write_artifact_location` parameter value. You can reuse the values in read mode