Revert "Refactor code"
This reverts commit 73e46997494c0aff30179626d81d004e3d27633d.
diff --git a/sdks/python/apache_beam/testing/benchmarks/mltransform/criteo.py b/sdks/python/apache_beam/testing/benchmarks/mltransform/criteo.py
index 00383a1..7f7c477 100644
--- a/sdks/python/apache_beam/testing/benchmarks/mltransform/criteo.py
+++ b/sdks/python/apache_beam/testing/benchmarks/mltransform/criteo.py
@@ -16,16 +16,6 @@
#
# pylint: skip-file
-"""
-This example demonstrates the use of MLTransform to preprocess text data using
-ComputeAndApplyVocabulary.
-
-This examples follows https://github.com/tensorflow/models/blob/master/official/recommendation/ranking/preprocessing/criteo_preprocess.py
-but the instead of tensorflow-transform, it uses Apache Beam's MLTransform.
-MLTransform abstracts the user away from providing tensorflow-transform's
-schema and making it easier for users to use it with Apache Beam.
-"""
-
import logging
import argparse
import numpy as np
@@ -44,7 +34,6 @@
NUMERIC_FEATURE_KEYS = ["int_feature_%d" % x for x in range(1, 14)]
CATEGORICAL_FEATURE_KEYS = ["categorical_feature_%d" % x for x in range(14, 40)]
LABEL_KEY = "clicked"
-MAX_VOCAB_SIZE = 5000000
class FillMissing(beam.DoFn):
@@ -71,18 +60,11 @@
yield (csv_delimiter).join(out_list)
-class HexToIntModRange(beam.DoFn):
- """For categorical features, takes decimal value and mods with max value."""
- def process(self, element):
- elem_list = element.split(csv_delimiter)
- out_list = []
- for i, val in enumerate(elem_list):
- if i > NUM_NUMERIC_FEATURES:
- new_val = int(val, 16) % MAX_VOCAB_SIZE
- else:
- new_val = val
- out_list.append(str(new_val))
- yield str.encode((csv_delimiter).join(out_list))
+def convert_str_to_int(element):
+ for key, value in element.items():
+ if key in NUMERIC_FEATURE_KEYS:
+ element[key] = float(value)
+ return element
def parse_known_args(argv):
@@ -111,8 +93,11 @@
| "FillMissing" >> beam.ParDo(FillMissing())
# For numerical features, set negatives to zero. Then take log(x+1).
| "NegsToZeroLog" >> beam.ParDo(NegsToZeroLog())
- # For categorical features, mod the values with vocab size.
- | "HexToIntModRange" >> beam.ParDo(HexToIntModRange()))
+ | beam.Map(lambda x: str(x).split(csv_delimiter))
+ # Creates 50 GB data.
+ | beam.Map(lambda x: {ordered_columns[i]: x[i]
+ for i in range(len(x))})
+ | beam.Map(convert_str_to_int))
transformed_lines = (
processed_lines
@@ -124,7 +109,6 @@
).with_transform(
Bucketize(columns=NUMERIC_FEATURE_KEYS, num_buckets=_NUM_BUCKETS)))
- # TODO: Write to CSV.
transformed_lines | beam.Map(logging.info)