Merge pull request #29130 [YAML] Guard javascript UDFs with experimental feature enablement.
diff --git a/.github/workflows/beam_IODatastoresCredentialsRotation.yml b/.github/workflows/beam_IODatastoresCredentialsRotation.yml
index 7a402e4..0ededda 100644
--- a/.github/workflows/beam_IODatastoresCredentialsRotation.yml
+++ b/.github/workflows/beam_IODatastoresCredentialsRotation.yml
@@ -77,4 +77,20 @@
- name: Completing the rotation
run: |
gcloud container clusters update io-datastores --complete-credential-rotation --zone=us-central1-a --quiet
-# TODO: Send email to dev@beam.apache.org if something went wrong during credentials rotation
\ No newline at end of file
+ - name: Generate Date
+ run: |
+ date=$(date -u +"%Y-%m-%d")
+ echo "date=$date" >> $GITHUB_ENV
+ - name: Send email
+ uses: dawidd6/action-send-mail@v3
+ with:
+ server_address: smtp.gmail.com
+ server_port: 465
+ secure: true
+ username: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_ADDRESS }}
+ password: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_PASSWORD }}
+ subject: Credentials Rotation Failure on IO-Datastores cluster (${{ env.date }})
+ to: dev@beam.apache.org
+ from: gactions@beam.apache.org
+ body: |
+ Something went wrong during the automatic credentials rotation for IO-Datastores Cluster, performed at ${{ env.date }}. It may be necessary to check the state of the cluster certificates. For further details refer to the following links:\n * Failing job: https://github.com/apache/beam/actions/workflows/beam_IODatastoresCredentialsRotation.yml \n * Job configuration: https://github.com/apache/beam/blob/master/.github/workflows/beam_IODatastoresCredentialsRotation.yml \n * Cluster URL: https://pantheon.corp.google.com/kubernetes/clusters/details/us-central1-a/io-datastores/details?mods=dataflow_dev&project=apache-beam-testing
\ No newline at end of file
diff --git a/.github/workflows/beam_MetricsCredentialsRotation.yml b/.github/workflows/beam_MetricsCredentialsRotation.yml
index 7b97270..eda3ec3 100644
--- a/.github/workflows/beam_MetricsCredentialsRotation.yml
+++ b/.github/workflows/beam_MetricsCredentialsRotation.yml
@@ -49,8 +49,9 @@
jobs:
beam_MetricsCredentialsRotation:
if: |
- github.event_name == 'workflow_dispatch' ||
- github.event_name == 'schedule'
+ (github.event_name == 'workflow_dispatch' ||
+ github.event_name == 'schedule') &&
+ github.repository == 'apache/beam'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }}
@@ -77,4 +78,20 @@
- name: Completing the rotation
run: |
gcloud container clusters update metrics --complete-credential-rotation --zone=us-central1-a --quiet
-# TODO: Send email to dev@beam.apache.org if something went wrong during credentials rotation
\ No newline at end of file
+ - name: Generate Date
+ run: |
+ date=$(date -u +"%Y-%m-%d")
+ echo "date=$date" >> $GITHUB_ENV
+ - name: Send email
+ uses: dawidd6/action-send-mail@v3
+ with:
+ server_address: smtp.gmail.com
+ server_port: 465
+ secure: true
+ username: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_ADDRESS }}
+ password: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_PASSWORD }}
+ subject: Credentials Rotation Failure on Metrics cluster (${{ env.date }})
+ to: dev@beam.apache.org
+ from: gactions@beam.apache.org
+ body: |
+ Something went wrong during the automatic credentials rotation for Metrics Cluster, performed at ${{ env.date }}. It may be necessary to check the state of the cluster certificates. For further details refer to the following links:\n * Failing job: https://github.com/apache/beam/actions/workflows/beam_MetricsCredentialsRotation.yml \n * Job configuration: https://github.com/apache/beam/blob/master/.github/workflows/beam_MetricsCredentialsRotation.yml \n * Cluster URL: https://pantheon.corp.google.com/kubernetes/clusters/details/us-central1-a/metrics/details?mods=dataflow_dev&project=apache-beam-testing
\ No newline at end of file
diff --git a/.github/workflows/beam_Metrics_Report.yml b/.github/workflows/beam_Metrics_Report.yml
index 0c88171..9c4a540 100644
--- a/.github/workflows/beam_Metrics_Report.yml
+++ b/.github/workflows/beam_Metrics_Report.yml
@@ -60,9 +60,10 @@
job_name: [beam_Metrics_Report]
job_phrase: [Run Metrics Report]
if: |
- github.event_name == 'schedule' ||
+ (github.event_name == 'schedule' ||
github.event_name == 'workflow_dispatch' ||
- github.event.comment.body == 'Run Metrics Report'
+ github.event.comment.body == 'Run Metrics Report') &&
+ github.repository == 'apache/beam'
steps:
- uses: actions/checkout@v3
- name: Setup repository
diff --git a/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml b/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml
index 9cff830..15ab008 100644
--- a/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml
+++ b/.github/workflows/beam_PostCommit_BeamMetrics_Publish.yml
@@ -53,10 +53,11 @@
jobs:
beam_PostCommit_BeamMetrics_Publish:
if: |
- github.event_name == 'push' ||
+ (github.event_name == 'push' ||
github.event_name == 'workflow_dispatch' ||
github.event_name == 'schedule' ||
- github.event.comment.body == 'Run Beam Metrics Deployment'
+ github.event.comment.body == 'Run Beam Metrics Deployment') &&
+ github.repository == 'apache/beam'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
diff --git a/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml b/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml
index 9e48325..95f993b 100644
--- a/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml
+++ b/.github/workflows/beam_Publish_Beam_SDK_Snapshots.yml
@@ -50,8 +50,9 @@
jobs:
beam_Publish_Beam_SDK_Snapshots:
if: |
- github.event_name == 'workflow_dispatch' ||
- github.event_name == 'schedule'
+ (github.event_name == 'workflow_dispatch' ||
+ github.event_name == 'schedule') &&
+ github.repository == 'apache/beam'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }} (${{ matrix.container_task }})
diff --git a/.github/workflows/beam_Publish_Docker_Snapshots.yml b/.github/workflows/beam_Publish_Docker_Snapshots.yml
index 01b846e..1abc268 100644
--- a/.github/workflows/beam_Publish_Docker_Snapshots.yml
+++ b/.github/workflows/beam_Publish_Docker_Snapshots.yml
@@ -50,9 +50,10 @@
jobs:
beam_Publish_Docker_Snapshots:
if: |
- github.event_name == 'workflow_dispatch' ||
+ (github.event_name == 'workflow_dispatch' ||
github.event_name == 'schedule' ||
- github.event.comment.body == 'Publish Docker Snapshots'
+ github.event.comment.body == 'Publish Docker Snapshots') &&
+ github.repository == 'apache/beam'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
diff --git a/.github/workflows/beam_Release_NightlySnapshot.yml b/.github/workflows/beam_Release_NightlySnapshot.yml
index bf22344..735d0e4 100644
--- a/.github/workflows/beam_Release_NightlySnapshot.yml
+++ b/.github/workflows/beam_Release_NightlySnapshot.yml
@@ -50,8 +50,9 @@
job_name: [beam_Release_NightlySnapshot]
job_phrase: [Release Nightly Snapshot]
if: |
- github.event_name == 'workflow_dispatch' ||
- github.event_name == 'schedule'
+ (github.event_name == 'workflow_dispatch' ||
+ github.event_name == 'schedule') &&
+ github.repository == 'apache/beam'
steps:
- uses: actions/checkout@v4
- name: Setup repository
diff --git a/.github/workflows/beam_Release_Python_NightlySnapshot.yml b/.github/workflows/beam_Release_Python_NightlySnapshot.yml
index a9f4ac5..fe13235 100644
--- a/.github/workflows/beam_Release_Python_NightlySnapshot.yml
+++ b/.github/workflows/beam_Release_Python_NightlySnapshot.yml
@@ -49,8 +49,9 @@
matrix:
job_name: [beam_Release_Python_NightlySnapshot]
job_phrase: [Release Nightly Snapshot Python]
- if: github.event_name == 'workflow_dispatch' ||
- github.event_name == 'schedule'
+ if: (github.event_name == 'workflow_dispatch' ||
+ github.event_name == 'schedule') &&
+ github.repository == 'apache/beam'
steps:
- uses: actions/checkout@v4
diff --git a/.github/workflows/build_runner_image.yml b/.github/workflows/build_runner_image.yml
index 6071d93..c0f4cac 100644
--- a/.github/workflows/build_runner_image.yml
+++ b/.github/workflows/build_runner_image.yml
@@ -30,6 +30,7 @@
docker_repo: apache-beam-testing/beam-github-actions/beam-arc-runner
jobs:
build-and-version-runner:
+ if: github.repository == 'apache/beam'
env:
working-directory: .github/gh-actions-self-hosted-runners/arc/images/
runs-on: [self-hosted, ubuntu-20.04]
diff --git a/CHANGES.md b/CHANGES.md
index 6d67ccd..0eee0a3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -86,6 +86,7 @@
* Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's BigtableIO.BigtableSource when you have more cores than bytes to read (Java) [#28793](https://github.com/apache/beam/issues/28793).
* `watch_file_pattern` arg of the [RunInference](https://github.com/apache/beam/blob/104c10b3ee536a9a3ea52b4dbf62d86b669da5d9/sdks/python/apache_beam/ml/inference/base.py#L997) arg had no effect prior to 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0, follow the documentation at https://beam.apache.org/documentation/ml/side-input-updates/ and use `WatchFilePattern` PTransform as a SideInput. ([#28948](https://github.com/apache/beam/pulls/28948))
+* `MLTransform` doesn't output artifacts such as min, max and quantiles. Instead, `MLTransform` will add a feature to output these artifacts as human readable format - [#29017](https://github.com/apache/beam/issues/29017). For now, to use the artifacts such as min and max that were produced by the eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which reads artifacts that were produced earlier in a different `MLTransform` ([#29016](https://github.com/apache/beam/pull/29016/))
## Security Fixes
* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index e4f3468..9e4d28a 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -304,6 +304,7 @@
commandLine "docker", "rmi", "--force", "${dockerJavaImageName}"
}
exec {
+ ignoreExitValue true
commandLine "gcloud", "--quiet", "container", "images", "untag", "${dockerJavaImageName}"
}
exec {
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py
index 1d2197e..0db1071 100644
--- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py
@@ -31,7 +31,7 @@
import tensorflow_transform as tft # pylint: disable=unused-import
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary
- from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data
+ from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_scalar
except ImportError:
raise unittest.SkipTest('tensorflow_transform is not installed.')
@@ -46,8 +46,8 @@
def check_mltransform_scale_to_0_1():
expected = '''[START mltransform_scale_to_0_1]
-Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
-Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
+Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32))
+Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32))
[END mltransform_scale_to_0_1] '''.splitlines()[1:-1]
return expected
@@ -80,7 +80,7 @@
self.assertEqual(predicted, expected)
def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout):
- mltransform_compute_and_apply_vocabulary_with_non_columnar_data()
+ mltransform_compute_and_apply_vocabulary_with_scalar()
predicted = mock_stdout.getvalue().splitlines()
expected = check_mltransform_compute_and_apply_vocabulary_with_scalar()
self.assertEqual(predicted, expected)
diff --git a/sdks/python/apache_beam/ml/transforms/base.py b/sdks/python/apache_beam/ml/transforms/base.py
index a0bc4a9..b3a30bb 100644
--- a/sdks/python/apache_beam/ml/transforms/base.py
+++ b/sdks/python/apache_beam/ml/transforms/base.py
@@ -67,16 +67,6 @@
inputs: input data.
"""
- @abc.abstractmethod
- def get_artifacts(
- self, data: OperationInputT,
- output_column_prefix: str) -> Optional[Dict[str, OperationOutputT]]:
- """
- If the operation generates any artifacts, they can be returned from this
- method.
- """
- pass
-
def __call__(self, data: OperationInputT,
output_column_name: str) -> Dict[str, OperationOutputT]:
"""
@@ -84,9 +74,6 @@
This method will invoke the apply() method of the class.
"""
transformed_data = self.apply_transform(data, output_column_name)
- artifacts = self.get_artifacts(data, output_column_name)
- if artifacts:
- transformed_data = {**transformed_data, **artifacts}
return transformed_data
def get_counter(self):
diff --git a/sdks/python/apache_beam/ml/transforms/handlers_test.py b/sdks/python/apache_beam/ml/transforms/handlers_test.py
index 3342ec7..327c8c7 100644
--- a/sdks/python/apache_beam/ml/transforms/handlers_test.py
+++ b/sdks/python/apache_beam/ml/transforms/handlers_test.py
@@ -58,14 +58,6 @@
return {output_column_name: inputs * 10}
-class _FakeOperationWithArtifacts(TFTOperation):
- def apply_transform(self, inputs, output_column_name, **kwargs):
- return {output_column_name: inputs}
-
- def get_artifacts(self, data, col_name):
- return {'artifact': tf.convert_to_tensor([1])}
-
-
class IntType(NamedTuple):
x: int
@@ -106,16 +98,6 @@
actual_result = process_handler.process_data_fn(inputs)
self.assertDictEqual(actual_result, expected_result)
- def test_preprocessing_fn_with_artifacts(self):
- process_handler = handlers.TFTProcessHandler(
- transforms=[_FakeOperationWithArtifacts(columns=['x'])],
- artifact_location=self.artifact_location)
- inputs = {'x': [1, 2, 3]}
- preprocessing_fn = process_handler.process_data_fn
- actual_result = preprocessing_fn(inputs)
- expected_result = {'x': [1, 2, 3], 'artifact': tf.convert_to_tensor([1])}
- self.assertDictEqual(actual_result, expected_result)
-
def test_input_type_from_schema_named_tuple_pcoll(self):
data = [{'x': 1}]
with beam.Pipeline() as p:
diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py
index 1d49264..c7b8ff0 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -45,9 +45,7 @@
import tensorflow as tf
import tensorflow_transform as tft
from apache_beam.ml.transforms.base import BaseOperation
-from tensorflow_transform import analyzers
from tensorflow_transform import common_types
-from tensorflow_transform import tf_utils
__all__ = [
'ComputeAndApplyVocabulary',
@@ -77,6 +75,8 @@
return wrapper
+# TODO: https://github.com/apache/beam/pull/29016
+# Add support for outputting artifacts to a text file in human readable form.
class TFTOperation(BaseOperation[common_types.TensorType,
common_types.TensorType]):
def __init__(self, columns: List[str]) -> None:
@@ -95,13 +95,6 @@
"Columns are not specified. Please specify the column for the "
" op %s" % self.__class__.__name__)
- def get_artifacts(self, data: common_types.TensorType,
- col_name: str) -> Dict[str, common_types.TensorType]:
- """
- Returns the artifacts generated by the operation.
- """
- return {}
-
@tf.function
def _split_string_with_delimiter(self, data, delimiter):
"""
@@ -240,15 +233,6 @@
}
return output_dict
- def get_artifacts(self, data: common_types.TensorType,
- col_name: str) -> Dict[str, common_types.TensorType]:
- mean_var = tft.analyzers._mean_and_var(data)
- shape = [tf.shape(data)[0], 1]
- return {
- col_name + '_mean': tf.broadcast_to(mean_var[0], shape),
- col_name + '_var': tf.broadcast_to(mean_var[1], shape),
- }
-
@register_input_dtype(float)
class ScaleTo01(TFTOperation):
@@ -280,14 +264,6 @@
self.elementwise = elementwise
self.name = name
- def get_artifacts(self, data: common_types.TensorType,
- col_name: str) -> Dict[str, common_types.TensorType]:
- shape = [tf.shape(data)[0], 1]
- return {
- col_name + '_min': tf.broadcast_to(tft.min(data), shape),
- col_name + '_max': tf.broadcast_to(tft.max(data), shape)
- }
-
def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
@@ -368,34 +344,6 @@
self.elementwise = elementwise
self.name = name
- def get_artifacts(self, data: common_types.TensorType,
- col_name: str) -> Dict[str, common_types.TensorType]:
- num_buckets = self.num_buckets
- epsilon = self.epsilon
- elementwise = self.elementwise
-
- if num_buckets < 1:
- raise ValueError('Invalid num_buckets %d' % num_buckets)
-
- if isinstance(data, (tf.SparseTensor, tf.RaggedTensor)) and elementwise:
- raise ValueError(
- 'bucketize requires `x` to be dense if `elementwise=True`')
-
- x_values = tf_utils.get_values(data)
-
- if epsilon is None:
- # See explanation in args documentation for epsilon.
- epsilon = min(1.0 / num_buckets, 0.01)
-
- quantiles = analyzers.quantiles(
- x_values, num_buckets, epsilon, reduce_instance_dims=not elementwise)
- shape = [
- tf.shape(data)[0], num_buckets - 1 if num_buckets > 1 else num_buckets
- ]
- # These quantiles are used as the bucket boundaries in the later stages.
- # Should we change the prefix _quantiles to _bucket_boundaries?
- return {col_name + '_quantiles': tf.broadcast_to(quantiles, shape)}
-
def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
@@ -572,6 +520,7 @@
ngram_range: Tuple[int, int] = (1, 1),
ngrams_separator: Optional[str] = None,
compute_word_count: bool = False,
+ key_vocab_filename: str = 'key_vocab_mapping',
name: Optional[str] = None,
):
"""
@@ -592,9 +541,9 @@
n-gram sizes.
seperator: A string that will be inserted between each ngram.
compute_word_count: A boolean that specifies whether to compute
- the unique word count and add it as an artifact to the output.
- Note that the count will be computed over the entire dataset so
- it will be the same value for all inputs.
+ the unique word count over the entire dataset. Defaults to False.
+ key_vocab_filename: The file name for the key vocabulary file when
+ compute_word_count is True.
name: A name for the operation (optional).
Note that original order of the input may not be preserved.
@@ -605,33 +554,26 @@
self.ngrams_separator = ngrams_separator
self.name = name
self.split_string_by_delimiter = split_string_by_delimiter
+ self.key_vocab_filename = key_vocab_filename
if compute_word_count:
self.compute_word_count_fn = count_unqiue_words
else:
- self.compute_word_count_fn = lambda *args, **kwargs: {}
+ self.compute_word_count_fn = lambda *args, **kwargs: None
if ngram_range != (1, 1) and not ngrams_separator:
raise ValueError(
'ngrams_separator must be specified when ngram_range is not (1, 1)')
- def get_artifacts(self, data: tf.SparseTensor,
- col_name: str) -> Dict[str, tf.Tensor]:
- return self.compute_word_count_fn(data, col_name)
-
def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
if self.split_string_by_delimiter:
data = self._split_string_with_delimiter(
data, self.split_string_by_delimiter)
output = tft.bag_of_words(
data, self.ngram_range, self.ngrams_separator, self.name)
+ # word counts are written to the key_vocab_filename
+ self.compute_word_count_fn(data, self.key_vocab_filename)
return {output_col_name: output}
-def count_unqiue_words(data: tf.SparseTensor,
- output_col_name: str) -> Dict[str, tf.Tensor]:
- keys, count = tft.count_per_key(data)
- shape = [tf.shape(data)[0], tf.shape(keys)[0]]
- return {
- output_col_name + '_unique_elements': tf.broadcast_to(keys, shape),
- output_col_name + '_counts': tf.broadcast_to(count, shape)
- }
+def count_unqiue_words(data: tf.SparseTensor, output_vocab_name: str) -> None:
+ tft.count_per_key(data, key_vocabulary_filename=output_vocab_name)
diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py
index 41f59c8..38ded6a 100644
--- a/sdks/python/apache_beam/ml/transforms/tft_test.py
+++ b/sdks/python/apache_beam/ml/transforms/tft_test.py
@@ -17,6 +17,7 @@
# pytype: skip-file
+import os
import shutil
import tempfile
import unittest
@@ -38,31 +39,6 @@
if not tft:
raise unittest.SkipTest('tensorflow_transform is not installed.')
-z_score_expected = {'x_mean': 3.5, 'x_var': 2.9166666666666665}
-
-
-def assert_z_score_artifacts(element):
- element = element.as_dict()
- assert 'x_mean' in element
- assert 'x_var' in element
- assert element['x_mean'] == z_score_expected['x_mean']
- assert element['x_var'] == z_score_expected['x_var']
-
-
-def assert_ScaleTo01_artifacts(element):
- element = element.as_dict()
- assert 'x_min' in element
- assert 'x_max' in element
- assert element['x_min'] == 1
- assert element['x_max'] == 6
-
-
-def assert_bucketize_artifacts(element):
- element = element.as_dict()
- assert 'x_quantiles' in element
- assert np.array_equal(
- element['x_quantiles'], np.array([3, 5], dtype=np.float32))
-
class ScaleZScoreTest(unittest.TestCase):
def setUp(self) -> None:
@@ -100,7 +76,18 @@
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
- _ = (result | beam.Map(assert_z_score_artifacts))
+ expected_data = [
+ np.array([-1.46385], dtype=np.float32),
+ np.array([-0.87831], dtype=np.float32),
+ np.array([-0.29277], dtype=np.float32),
+ np.array([0.29277], dtype=np.float32),
+ np.array([0.87831], dtype=np.float32),
+ np.array([1.46385], dtype=np.float32),
+ ]
+
+ actual_data = (result | beam.Map(lambda x: x.x))
+ assert_that(
+ actual_data, equal_to(expected_data, equals_fn=np.array_equal))
def test_z_score_list_data(self):
list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
@@ -111,7 +98,14 @@
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
- _ = (list_result | beam.Map(assert_z_score_artifacts))
+
+ expected_data = [
+ np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32),
+ np.array([0.29277, 0.87831, 1.46385], dtype=np.float32)
+ ]
+ actual_data = (list_result | beam.Map(lambda x: x.x))
+ assert_that(
+ actual_data, equal_to(expected_data, equals_fn=np.array_equal))
class ScaleTo01Test(unittest.TestCase):
@@ -130,7 +124,6 @@
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))
- _ = (list_result | beam.Map(assert_ScaleTo01_artifacts))
expected_output = [
np.array([0, 0.2, 0.4], dtype=np.float32),
@@ -150,7 +143,6 @@
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))
- _ = (result | beam.Map(assert_ScaleTo01_artifacts))
expected_output = (
np.array([0], dtype=np.float32),
np.array([0.2], dtype=np.float32),
@@ -179,7 +171,6 @@
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
- _ = (result | beam.Map(assert_bucketize_artifacts))
transformed_data = (result | beam.Map(lambda x: x.x))
expected_data = [
@@ -202,8 +193,6 @@
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
- _ = (list_result | beam.Map(assert_bucketize_artifacts))
-
transformed_data = (
list_result
| "TransformedColumnX" >> beam.Map(lambda ele: ele.x))
@@ -214,36 +203,6 @@
assert_that(
transformed_data, equal_to(expected_data, equals_fn=np.array_equal))
- @parameterized.expand([
- (range(1, 10), [4, 7]),
- (range(9, 0, -1), [4, 7]),
- (range(19, 0, -1), [10]),
- (range(1, 100), [25, 50, 75]),
- # similar to the above but with odd number of elements
- (range(1, 100, 2), [25, 51, 75]),
- (range(99, 0, -1), range(10, 100, 10))
- ])
- def test_bucketize_boundaries(self, test_input, expected_boundaries):
- # boundaries are outputted as artifacts for the Bucketize transform.
- data = [{'x': [i]} for i in test_input]
- num_buckets = len(expected_boundaries) + 1
- with beam.Pipeline() as p:
- result = (
- p
- | "Create" >> beam.Create(data)
- | "MLTransform" >> base.MLTransform(
- write_artifact_location=self.artifact_location).with_transform(
- tft.Bucketize(columns=['x'], num_buckets=num_buckets)))
- actual_boundaries = (
- result
- | beam.Map(lambda x: x.as_dict())
- | beam.Map(lambda x: x['x_quantiles']))
-
- def assert_boundaries(actual_boundaries):
- assert np.array_equal(actual_boundaries, expected_boundaries)
-
- _ = (actual_boundaries | beam.Map(assert_boundaries))
-
class ApplyBucketsTest(unittest.TestCase):
def setUp(self) -> None:
@@ -731,10 +690,6 @@
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_count_per_key_on_list(self):
- def map_element_to_count(elements, counts):
- d = {elements[i]: counts[i] for i in range(len(elements))}
- return d
-
data = [{
'x': ['I', 'like', 'pie', 'pie', 'pie'],
}, {
@@ -743,25 +698,28 @@
'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple']
}]
with beam.Pipeline() as p:
- result = (
+ _ = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
- tft.BagOfWords(columns=['x'], compute_word_count=True)
+ tft.BagOfWords(
+ columns=['x'],
+ compute_word_count=True,
+ key_vocab_filename='my_vocab')
]))
- # the unique elements and counts are artifacts and will be
- # stored in the result and same for all the elements in the
- # PCollection.
- result = result | beam.Map(
- lambda x: map_element_to_count(x.x_unique_elements, x.x_counts))
+ def validate_count_per_key(key_vocab_filename):
+ key_vocab_location = os.path.join(
+ self.artifact_location, 'transform_fn/assets', key_vocab_filename)
+ with open(key_vocab_location, 'r') as f:
+ key_vocab_list = [line.strip() for line in f]
+ return key_vocab_list
- expected_data = [{
- b'Apple': 4, b'Banana': 2, b'I': 1, b'like': 1, b'pie': 4, b'yum': 2
- }] * 3 # since there are 3 elements in input.
- assert_that(result, equal_to(expected_data))
+ expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana']
+ actual_data = validate_count_per_key('my_vocab')
+ self.assertEqual(expected_data, actual_data)
if __name__ == '__main__':
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 1e797d9..e4cf09c 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -162,7 +162,7 @@
holdup==1.8.0
extras =
gcp
-allowlist_externals =
+allowlist_externals =
bash
echo
sleep
@@ -194,7 +194,7 @@
extras =
azure
passenv = REQUESTS_CA_BUNDLE
-allowlist_externals =
+allowlist_externals =
wget
az
bash
@@ -311,11 +311,12 @@
# Run all DataFrame API unit tests
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'
-[testenv:py{38,39}-tft-113]
+[testenv:py{38,39}-tft-{113,114}]
deps =
113: tensorflow_transform>=1.13.0,<1.14.0
+ 114: tensorflow_transform>=1.14.0,<1.15.0
commands =
- bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms'
+ bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py'
[testenv:py{38,39,310,311}-pytorch-{19,110,111,112,113}]
deps =
diff --git a/website/www/site/content/en/documentation/ml/preprocess-data.md b/website/www/site/content/en/documentation/ml/preprocess-data.md
index cb79aff..2b291b9 100644
--- a/website/www/site/content/en/documentation/ml/preprocess-data.md
+++ b/website/www/site/content/en/documentation/ml/preprocess-data.md
@@ -105,7 +105,7 @@
When you use the `write_artifact_location` parameter, the `MLTransform` class runs the
specified transformations on the dataset and then creates artifacts from these
transformations. The artifacts are stored in the location that you specify in
-the `write_artifact_location` parameter or in the `MLTransform` output.
+the `write_artifact_location` parameter.
Write mode is useful when you want to store the results of your transformations
for future use. For example, if you apply the same transformations on a
@@ -120,8 +120,7 @@
The `ComputeAndApplyVocabulary`
transform outputs the indices of the vocabulary to the vocabulary file.
- The `ScaleToZScore` transform calculates the mean and variance over the entire dataset
- and then normalizes the entire dataset using the mean and variance. The
- mean and variance are outputted by the `MLTransform` operation.
+ and then normalizes the entire dataset using the mean and variance.
When you use the `write_artifact_location` parameter, these
values are stored as a `tensorflow` graph in the location specified by
the `write_artifact_location` parameter value. You can reuse the values in read mode