Several types of Apache Beam data processing are applicable to AI/ML projects:
Data processing can be grouped into two main topics. This example first examimes data exploration and then data pipelines in ML that use both data preprocessing and validation. Data postprocessing is not covered because it is similar to prepressing. Postprocessing differs only in the order and type of pipeline.
Pandas is a popular tool for performing data exploration. Pandas is a data analysis and manipulation tool for Python. It uses DataFrames, which is a data structure that contains two-dimensional tabular data and that provides labeled rows and columns for the data. The Apache Beam Python SDK provides a DataFrame API for working with Pandas-like DataFrame objects.
The Beam DataFrame API is intended to provide access to a familiar programming interface within an Apache Beam pipeline. This API allows you to perform data exploration. You can reuse the code for your data preprocessing pipeline. Using the DataFrame API, you can build complex data processing pipelines by invoking standard Pandas commands.
You can use the DataFrame API in combination with the Beam interactive runner in a JupyterLab notebook. Use the notebook to iteratively develop pipelines and display the results of your individual pipeline steps.
The following is an example of data exploration in Apache Beam in a notebook:
import apache_beam as beam from apache_beam.runners.interactive.interactive_runner import InteractiveRunner import apache_beam.runners.interactive.interactive_beam as ib p = beam.Pipeline(InteractiveRunner()) beam_df = p | beam.dataframe.io.read_csv(input_path) # Investigate columns and data types beam_df.dtypes # Generate descriptive statistics ib.collect(beam_df.describe()) # Investigate missing values ib.collect(beam_df.isnull())
For a full end-to-end example that implements data exploration and data preprocessing with Apache Beam and the DataFrame API for your AI/ML project, see the Beam Dataframe API tutorial for AI/ML.
A typical data preprocessing pipeline consists of the following steps:
You can use an Apache Beam pipeline to implement all of these steps. This example shows a pipeline that demonstrates all of the steps previously mentioned:
import apache_beam as beam
from apache_beam.metrics import Metrics
with beam.Pipeline() as pipeline:
# Create data
input_data = (
pipeline
| beam.Create([
{'age': 25, 'height': 176, 'weight': 60, 'city': 'London'},
{'age': 61, 'height': 192, 'weight': 95, 'city': 'Brussels'},
{'age': 48, 'height': 163, 'weight': None, 'city': 'Berlin'}]))
# Clean data
def filter_missing_data(row):
return row['weight'] is not None
cleaned_data = input_data | beam.Filter(filter_missing_data)
# Transform data
def scale_min_max_data(row):
row['age'] = (row['age']/100)
row['height'] = (row['height']-150)/50
row['weight'] = (row['weight']-50)/50
yield row
transformed_data = cleaned_data | beam.FlatMap(scale_min_max_data)
# Enrich data
side_input = pipeline | beam.io.ReadFromText('coordinates.csv')
def coordinates_lookup(row, coordinates):
row['coordinates'] = coordinates.get(row['city'], (0, 0))
del row['city']
yield row
enriched_data = (
transformed_data
| beam.FlatMap(coordinates_lookup, coordinates=beam.pvalue.AsDict(side_input)))
# Metrics
counter = Metrics.counter('main', 'counter')
def count_data(row):
counter.inc()
yield row
output_data = enriched_data | beam.FlatMap(count_data)
# Write data
output_data | beam.io.WriteToText('output.csv')