Prompt: What is MLTransform in Apache Beam and how can I use it to process my data?

Response: MLTransform is a class in Apache Beam that consolidates various transforms for preprocessing and transforming data in machine learning pipelines. It offers a range of capabilities:

  • Consistent preprocessing: ensuring uniform preprocessing steps across training and inference stages, thereby maintaining consistent results.
  • Text embedding generation: generating embeddings on text data using Large Language Models (LLMs), facilitating tasks such as semantic analysis.
  • Full dataset pass: conducting a comprehensive pass on the dataset, beneficial for transformations requiring analysis of the entire dataset.

The feature that sets MLTransform apart is its capability to perform a full pass on the dataset, enabling the execution of common preprocessing steps, such as:

  • Normalization: normalizing input values using minimum-maximum scaling for better convergence during model training.
  • Bucketization: converting floating-point values to integers using bucketization based on the distribution of input data.
  • Vocabulary generation: converting strings to integers by generating a vocabulary over the entire dataset, essential for categorical features.
  • TF-IDF calculation: calculating TF-IDF (Term Frequency-Inverse Document Frequency) weights for text data, useful for feature representation in natural language processing tasks.

MLTransform facilitates the generation of text embeddings and implementation of specialized processing modules for diverse machine learning tasks. It includes:

  • Text embedding transforms: enabling the generation of embeddings for pushing data into vector databases or inference purposes. These include SentenceTransformerEmbeddings (leveraging models from the Hugging Face Sentence Transformers) and VertexAITextEmbeddings (utilizing models from the Vertex AI text-embeddings API).
  • Data processing transforms: a range of transforms, such as ApplyBuckets, Bucketize, ComputeAndApplyVocabulary, and more, sourced from the TensorFlow Transforms (TFT) library.

Apache Beam ensures consistency in preprocessing steps for both training and test data by implementing a workflow for writing and reading artifacts within the MLTransform class. These artifacts are additional data elements generated by data transformations, such as minimum and maximum values from a ScaleTo01 transformation. The MLTransform class enables users to specify whether it creates or retrieves artifacts using the write_artifact_location and read_artifact_location parameters. When using the write_artifact_location parameter, MLTransform runs specified transformations on the dataset and creates artifacts from these transformations, storing them for future use in the specified location. When employing the read_artifact_location parameter, MLTransform retrieves artifacts from the specified location and incorporates them into the transform. This artifact workflow ensures that test data undergoes the same preprocessing steps as training data. For instance, after training a machine learning model on data preprocessed by MLTransform with the write_artifact_location parameter, you can run inference on new test data using the read_artifact_location parameter to fetch transformation artifacts created during preprocessing from a specified location.

To preprocess data using MLTransform, follow these steps:

  • Import necessary modules. Import the required modules from Apache Beam, including MLTransform and any specific transforms you intend to use.
  • Prepare your data. Organize your data in a suitable format, such as a list of dictionaries, where each dictionary represents a data record.
  • Set up artifact location. Create a temporary directory or specify a location where artifacts generated during preprocessing will be stored.
  • Instantiate transform objects. Create instances of the transforms you want to apply to your data, specifying any required parameters such as columns to transform.
  • Build your pipeline. Construct your Apache Beam pipeline, including the necessary steps to create your data collection, apply the MLTransform with specified transformations, and any additional processing or output steps.

Here is the code snippet utilizing MLTransform that you can modify and integrate into your Apache Beam pipeline:

  import apache_beam as beam
  from apache_beam.ml.transforms.base import MLTransform
  from apache_beam.ml.transforms.tft import <TRANSFORM_NAME>
  import tempfile

  data = [
      {
          <DATA>
      },
  ]

  artifact_location = tempfile.mkdtemp()
  <TRANSFORM_FUNCTION_NAME> = <TRANSFORM_NAME>(columns=['x'])

  with beam.Pipeline() as p:
    transformed_data = (
        p
        | beam.Create(data)
        | MLTransform(write_artifact_location=artifact_location).with_transform(
            <TRANSFORM_FUNCTION_NAME>)
        | beam.Map(print))

In this example, MLTransform writes artifacts to the specified location using write_artifact_location. In the provided code snippet, replace the following values:

  • TRANSFORM_NAME denotes the transform to be used.
  • DATA denotes input data.
  • TRANSFORM_FUNCTION_NAME denotes the name that you assign to your transform function in your code.

The following example illustrates using MLTransform to normalize data between 0 and 1 using the ScaleToZScore transformation:

scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
  (data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))

You can pass data processing transforms to MLTransform using either the with_transform method or a list. The with_transform method is useful when you want to apply a single transform to your dataset. On the other hand, providing a list of transforms is beneficial when you need to apply multiple transforms sequentially:

MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)