blob: b12849afc8163cee0384d3b1c0d536ca8fbeb1b2 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import os
import shutil
import tempfile
import unittest
import numpy as np
from parameterized import parameterized
import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apache_beam.ml.transforms import base
from apache_beam.ml.transforms import tft
except ImportError:
tft = None # type: ignore[assignment]
if not tft:
raise unittest.SkipTest('tensorflow_transform is not installed.')
class ScaleZScoreTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_z_score(self):
data = [
{
'x': 1
},
{
'x': 2
},
{
'x': 3
},
{
'x': 4
},
{
'x': 5
},
{
'x': 6
},
]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
expected_data = [
np.array([-1.46385], dtype=np.float32),
np.array([-0.87831], dtype=np.float32),
np.array([-0.29277], dtype=np.float32),
np.array([0.29277], dtype=np.float32),
np.array([0.87831], dtype=np.float32),
np.array([1.46385], dtype=np.float32),
]
actual_data = (result | beam.Map(lambda x: x.x))
assert_that(
actual_data, equal_to(expected_data, equals_fn=np.array_equal))
def test_z_score_list_data(self):
list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
expected_data = [
np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32),
np.array([0.29277, 0.87831, 1.46385], dtype=np.float32)
]
actual_data = (list_result | beam.Map(lambda x: x.x))
assert_that(
actual_data, equal_to(expected_data, equals_fn=np.array_equal))
class ScaleTo01Test(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_ScaleTo01_list(self):
list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))
expected_output = [
np.array([0, 0.2, 0.4], dtype=np.float32),
np.array([0.6, 0.8, 1], dtype=np.float32)
]
actual_output = (list_result | beam.Map(lambda x: x.x))
assert_that(
actual_output, equal_to(expected_output, equals_fn=np.array_equal))
def test_ScaleTo01(self):
data = [{'x': 1}, {'x': 2}, {'x': 3}, {'x': 4}, {'x': 5}, {'x': 6}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))
expected_output = (
np.array([0], dtype=np.float32),
np.array([0.2], dtype=np.float32),
np.array([0.4], dtype=np.float32),
np.array([0.6], dtype=np.float32),
np.array([0.8], dtype=np.float32),
np.array([1], dtype=np.float32))
actual_output = (result | beam.Map(lambda x: x.x))
assert_that(
actual_output, equal_to(expected_output, equals_fn=np.array_equal))
class ScaleToGaussianTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_gaussian_list_uniform_distribution(self):
list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToGaussian(columns=['x'])))
expected_data = [
np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32),
np.array([0.29277, 0.87831, 1.46385], dtype=np.float32)
]
actual_data = (list_result | beam.Map(lambda x: x.x))
assert_that(
actual_data, equal_to(expected_data, equals_fn=np.array_equal))
def test_gaussian_list_skewed_distribution(self):
list_data = [{'x': [1, 2, 4]}, {'x': [8, 16, 32]}]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToGaussian(columns=['x'])))
expected_data = [
np.array([-0.87733847, -0.78498703, -0.6002842], dtype=np.float32),
np.array([-0.23087855, 0.5079328, 1.9855555], dtype=np.float32)
]
actual_data = (list_result | beam.Map(lambda x: x.x))
assert_that(actual_data, equal_to(expected_data, equals_fn=np.allclose))
def test_gaussian_uniform(self):
data = [
{
'x': 1
},
{
'x': 2
},
{
'x': 3
},
{
'x': 4
},
{
'x': 5
},
{
'x': 6
},
]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
expected_data = [
np.array([-1.46385], dtype=np.float32),
np.array([-0.87831], dtype=np.float32),
np.array([-0.29277], dtype=np.float32),
np.array([0.29277], dtype=np.float32),
np.array([0.87831], dtype=np.float32),
np.array([1.46385], dtype=np.float32),
]
actual_data = (result | beam.Map(lambda x: x.x))
assert_that(
actual_data, equal_to(expected_data, equals_fn=np.array_equal))
def test_gaussian_skewed(self):
data = [
{
'x': 1
},
{
'x': 2
},
{
'x': 4
},
{
'x': 8
},
{
'x': 16
},
{
'x': 32
},
]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
expected_data = [
np.array([-0.87733847], dtype=np.float32),
np.array([-0.78498703], dtype=np.float32),
np.array([-0.6002842], dtype=np.float32),
np.array([-0.23087855], dtype=np.float32),
np.array([0.5079328], dtype=np.float32),
np.array([1.9855555], dtype=np.float32),
]
actual_data = (result | beam.Map(lambda x: x.x))
assert_that(actual_data, equal_to(expected_data, equals_fn=np.allclose))
class BucketizeTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_bucketize(self):
data = [{'x': 1}, {'x': 2}, {'x': 3}, {'x': 4}, {'x': 5}, {'x': 6}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
transformed_data = (result | beam.Map(lambda x: x.x))
expected_data = [
np.array([0]),
np.array([0]),
np.array([1]),
np.array([1]),
np.array([2]),
np.array([2])
]
assert_that(
transformed_data, equal_to(expected_data, equals_fn=np.array_equal))
def test_bucketize_list(self):
list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
with beam.Pipeline() as p:
list_result = (
p
| "Create" >> beam.Create(list_data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
transformed_data = (
list_result
| "TransformedColumnX" >> beam.Map(lambda ele: ele.x))
expected_data = [
np.array([0, 0, 1], dtype=np.int64),
np.array([1, 2, 2], dtype=np.int64)
]
assert_that(
transformed_data, equal_to(expected_data, equals_fn=np.array_equal))
class ApplyBucketsTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
@parameterized.expand([
(range(1, 100), [25, 50, 75]),
(range(1, 100, 2), [25, 51, 75]),
])
def test_apply_buckets(self, test_inputs, bucket_boundaries):
with beam.Pipeline() as p:
data = [{'x': [i]} for i in test_inputs]
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ApplyBuckets(
columns=['x'], bucket_boundaries=bucket_boundaries)))
expected_output = []
bucket = 0
for x in sorted(test_inputs):
# Increment the bucket number when crossing the boundary
if (bucket < len(bucket_boundaries) and x >= bucket_boundaries[bucket]):
bucket += 1
expected_output.append(np.array([bucket]))
actual_output = (result | beam.Map(lambda x: x.x))
assert_that(
actual_output, equal_to(expected_output, equals_fn=np.array_equal))
class ApplyBucketsWithInterpolationTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
@parameterized.expand([
([-1, 9, 10, 11], [10], [0., 0., 1., 1.]),
([15, 20, 25], [10, 20], [.5, 1, 1]),
])
def test_apply_buckets(self, test_inputs, bucket_boundaries, expected_values):
with beam.Pipeline() as p:
data = [{'x': [i]} for i in test_inputs]
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ApplyBucketsWithInterpolation(
columns=['x'], bucket_boundaries=bucket_boundaries)))
expected_output = []
for x in expected_values:
expected_output.append(np.array(x))
actual_output = (result | beam.Map(lambda x: x.x))
assert_that(
actual_output, equal_to(expected_output, equals_fn=np.allclose))
class ComputeAndApplyVocabTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_compute_and_apply_vocabulary_inputs(self):
num_elements = 100
num_instances = num_elements + 1
input_data = [
{
'x': '%.10i' %
i, # Front-padded to facilitate lexicographic sorting.
} for i in range(num_instances)
]
expected_data = [
{
'x': (len(input_data) - 1) -
i, # Due to reverse lexicographic sorting.
} for i in range(len(input_data))
]
with beam.Pipeline() as p:
actual_data = (
p
| "Create" >> beam.Create(input_data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x'])))
actual_data |= beam.Map(lambda x: x.as_dict())
assert_that(actual_data, equal_to(expected_data))
def test_compute_and_apply_vocabulary(self):
num_elements = 100
num_instances = num_elements + 1
input_data = [
{
'x': ['%.10i' % i, '%.10i' % (i + 1), '%.10i' % (i + 2)],
# Front-padded to facilitate lexicographic sorting.
} for i in range(0, num_instances, 3)
]
# since we have 3 elements in a single list, multiply with 3 for
# each iteration i on the expected output.
excepted_data = [
np.array([(len(input_data) * 3 - 1) - i,
(len(input_data) * 3 - 1) - i - 1,
(len(input_data) * 3 - 1) - i - 2],
dtype=np.int64) # Front-padded to facilitate lexicographic
# sorting.
for i in range(0, len(input_data) * 3, 3)
]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(input_data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x'])))
actual_output = (result | beam.Map(lambda x: x.x))
assert_that(
actual_output, equal_to(excepted_data, equals_fn=np.array_equal))
def test_with_basic_example_list(self):
data = [{
'x': ['I', 'like', 'pie'],
}, {
'x': ['yum', 'yum', 'pie'],
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x'])))
result = result | beam.Map(lambda x: x.x)
expected_result = [np.array([3, 2, 1]), np.array([0, 0, 1])]
assert_that(result, equal_to(expected_result, equals_fn=np.array_equal))
def test_string_split_with_single_delimiter(self):
data = [{
'x': 'I like pie',
}, {
'x': 'yum yum pie'
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(
columns=['x'], split_string_by_delimiter=' ')))
result = result | beam.Map(lambda x: x.x)
expected_result = [np.array([3, 2, 1]), np.array([0, 0, 1])]
assert_that(result, equal_to(expected_result, equals_fn=np.array_equal))
def test_string_split_with_multiple_delimiters(self):
data = [{
'x': 'I like pie',
}, {
'x': 'yum;yum;pie'
}, {
'x': 'yum yum pie'
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(
columns=['x'], split_string_by_delimiter=' ;')))
result = result | beam.Map(lambda x: x.x)
expected_result = [
np.array([3, 2, 1]), np.array([0, 0, 1]), np.array([0, 0, 1])
]
assert_that(result, equal_to(expected_result, equals_fn=np.array_equal))
def test_multiple_columns_with_default_vocab_name(self):
data = [{
'x': ['I', 'like', 'pie'], 'y': ['Apach', 'Beam', 'is', 'awesome']
},
{
'x': ['yum', 'yum', 'pie'],
'y': ['Beam', 'is', 'a', 'unified', 'model']
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x', 'y'])))
expected_data_x = [np.array([3, 2, 1]), np.array([0, 0, 1])]
expected_data_y = [np.array([6, 1, 0, 4]), np.array([1, 0, 5, 2, 3])]
actual_data_x = (result | beam.Map(lambda x: x.x))
actual_data_y = (result | beam.Map(lambda x: x.y))
assert_that(
actual_data_x,
equal_to(expected_data_x, equals_fn=np.array_equal),
label='x')
assert_that(
actual_data_y,
equal_to(expected_data_y, equals_fn=np.array_equal),
label='y')
files = os.listdir(self.artifact_location)
files.remove(base._ATTRIBUTE_FILE_NAME)
assert len(files) == 1
tft_vocab_assets = os.listdir(
os.path.join(
self.artifact_location, files[0], 'transform_fn', 'assets'))
assert len(tft_vocab_assets) == 2
def test_multiple_columns_with_vocab_name(self):
data = [{
'x': ['I', 'like', 'pie'], 'y': ['Apach', 'Beam', 'is', 'awesome']
},
{
'x': ['yum', 'yum', 'pie'],
'y': ['Beam', 'is', 'a', 'unified', 'model']
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(
columns=['x', 'y'], vocab_filename='my_vocab')))
expected_data_x = [np.array([3, 2, 1]), np.array([0, 0, 1])]
expected_data_y = [np.array([6, 1, 0, 4]), np.array([1, 0, 5, 2, 3])]
actual_data_x = (result | beam.Map(lambda x: x.x))
actual_data_y = (result | beam.Map(lambda x: x.y))
assert_that(
actual_data_x,
equal_to(expected_data_x, equals_fn=np.array_equal),
label='x')
assert_that(
actual_data_y,
equal_to(expected_data_y, equals_fn=np.array_equal),
label='y')
files = os.listdir(self.artifact_location)
files.remove(base._ATTRIBUTE_FILE_NAME)
assert len(files) == 1
tft_vocab_assets = os.listdir(
os.path.join(
self.artifact_location, files[0], 'transform_fn', 'assets'))
assert len(tft_vocab_assets) == 2
class TFIDIFTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_tfidf_compute_vocab_size_during_runtime(self):
raw_data = [
dict(x=["I", "like", "pie", "pie", "pie"]),
dict(x=["yum", "yum", "pie"])
]
with beam.Pipeline() as p:
transforms = [
tft.ComputeAndApplyVocabulary(columns=['x']),
tft.TFIDF(columns=['x'])
]
actual_output = (
p
| "Create" >> beam.Create(raw_data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=transforms))
actual_output |= beam.Map(lambda x: x.as_dict())
def equals_fn(a, b):
is_equal = True
for key, value in a.items():
value_b = a[key]
is_equal = is_equal and np.array_equal(value, value_b)
return is_equal
expected_output = ([{
'x': np.array([3, 2, 0, 0, 0]),
'x_tfidf_weight': np.array([0.6, 0.28109303, 0.28109303],
dtype=np.float32),
'x_vocab_index': np.array([0, 2, 3], dtype=np.int64)
},
{
'x': np.array([1, 1, 0]),
'x_tfidf_weight': np.array(
[0.33333334, 0.9369768], dtype=np.float32),
'x_vocab_index': np.array([0, 1], dtype=np.int32)
}])
assert_that(actual_output, equal_to(expected_output, equals_fn=equals_fn))
class ScaleToMinMaxTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_scale_to_min_max(self):
data = [{
'x': 4,
}, {
'x': 1,
}, {
'x': 5,
}, {
'x': 2,
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleByMinMax(
columns=['x'],
min_value=-1,
max_value=1,
),
))
result = result | beam.Map(lambda x: x.as_dict())
expected_data = [{
'x': np.array([0.5], dtype=np.float32)
}, {
'x': np.array([-1.0], dtype=np.float32)
}, {
'x': np.array([1.0], dtype=np.float32)
}, {
'x': np.array([-0.5], dtype=np.float32)
}]
assert_that(result, equal_to(expected_data))
def test_fail_max_value_less_than_min(self):
with self.assertRaises(ValueError):
tft.ScaleByMinMax(columns=['x'], min_value=10, max_value=0)
class NGramsTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_ngrams_on_list_separated_words_default_args(self):
data = [{
'x': ['I', 'like', 'pie'],
}, {
'x': ['yum', 'yum', 'pie']
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[tft.NGrams(columns=['x'])]))
result = result | beam.Map(lambda x: x.x)
expected_data = [
np.array([b'I', b'like', b'pie'], dtype=object),
np.array([b'yum', b'yum', b'pie'], dtype=object)
]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_ngrams_on_list_separated_words(self):
data = [{
'x': ['I', 'like', 'pie'],
}, {
'x': ['yum', 'yum', 'pie']
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.NGrams(
columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
]))
result = result | beam.Map(lambda x: x.x)
expected_data = [
np.array(
[b'I', b'I like', b'I like pie', b'like', b'like pie', b'pie'],
dtype=object),
np.array(
[b'yum', b'yum yum', b'yum yum pie', b'yum', b'yum pie', b'pie'],
dtype=object)
]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_with_string_split_delimiter(self):
data = [{
'x': 'I like pie',
}, {
'x': 'yum yum pie'
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.NGrams(
columns=['x'],
split_string_by_delimiter=' ',
ngram_range=(1, 3),
ngrams_separator=' ')
]))
result = result | beam.Map(lambda x: x.x)
expected_data = [
np.array(
[b'I', b'I like', b'I like pie', b'like', b'like pie', b'pie'],
dtype=object),
np.array(
[b'yum', b'yum yum', b'yum yum pie', b'yum', b'yum pie', b'pie'],
dtype=object)
]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_with_multiple_string_delimiters(self):
data = [{
'x': 'I?like?pie',
}, {
'x': 'yum yum pie'
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.NGrams(
columns=['x'],
split_string_by_delimiter=' ?',
ngram_range=(1, 3),
ngrams_separator=' ')
]))
result = result | beam.Map(lambda x: x.x)
expected_data = [
np.array(
[b'I', b'I like', b'I like pie', b'like', b'like pie', b'pie'],
dtype=object),
np.array(
[b'yum', b'yum yum', b'yum yum pie', b'yum', b'yum pie', b'pie'],
dtype=object)
]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
class BagOfWordsTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_bag_of_words_on_list_seperated_words_default_ngrams(self):
data = [{
'x': ['I', 'like', 'pie', 'pie', 'pie'],
}, {
'x': ['yum', 'yum', 'pie']
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[tft.BagOfWords(columns=['x'])]))
result = result | beam.Map(lambda x: x.x)
expected_data = [
np.array([b'I', b'like', b'pie'], dtype=object),
np.array([b'yum', b'pie'], dtype=object)
]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_bag_of_words_on_list_seperated_words_custom_ngrams(self):
data = [{
'x': ['I', 'like', 'pie', 'I', 'like', 'pie'],
}, {
'x': ['yum', 'yum', 'pie']
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.BagOfWords(
columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
]))
result = result | beam.Map(lambda x: x.x)
expected_data = [[
b'I',
b'I like',
b'I like pie',
b'like',
b'like pie',
b'like pie I',
b'pie',
b'pie I',
b'pie I like'
], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_bag_of_words_on_numpy_data(self):
data = [{
'x': np.array(['I', 'like', 'pie', 'I', 'like', 'pie'], dtype=object),
}, {
'x': np.array(['yum', 'yum', 'pie'], dtype=object)
}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.BagOfWords(
columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
]))
result = result | beam.Map(lambda x: x.x)
expected_data = [[
b'I',
b'I like',
b'I like pie',
b'like',
b'like pie',
b'like pie I',
b'pie',
b'pie I',
b'pie I like'
], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_bag_of_words_on_by_splitting_input_text(self):
data = [{'x': 'I like pie I like pie'}, {'x': 'yum yum pie'}]
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.BagOfWords(
columns=['x'],
split_string_by_delimiter=' ',
ngram_range=(1, 3),
ngrams_separator=' ')
]))
result = result | beam.Map(lambda x: x.x)
expected_data = [[
b'I',
b'I like',
b'I like pie',
b'like',
b'like pie',
b'like pie I',
b'pie',
b'pie I',
b'pie I like'
], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
def test_count_per_key_on_list(self):
data = [{
'x': ['I', 'like', 'pie', 'pie', 'pie'],
}, {
'x': ['yum', 'yum', 'pie']
}, {
'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple']
}]
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location,
transforms=[
tft.BagOfWords(
columns=['x'],
compute_word_count=True,
key_vocab_filename='my_vocab')
]))
def validate_count_per_key(key_vocab_filename):
files = os.listdir(self.artifact_location)
files.remove(base._ATTRIBUTE_FILE_NAME)
key_vocab_location = os.path.join(
self.artifact_location,
files[0],
'transform_fn/assets',
key_vocab_filename + '_x')
with open(key_vocab_location, 'r') as f:
key_vocab_list = [line.strip() for line in f]
return key_vocab_list
expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana']
actual_data = validate_count_per_key('my_vocab')
self.assertEqual(expected_data, actual_data)
class HashStringsTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_single_bucket(self):
list_data = [{'x': 'this is a test string'}]
expected_values = [np.array([0])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.HashStrings(columns=['x'], hash_buckets=1)))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_values, equals_fn=np.array_equal))
def test_multi_bucket_one_string(self):
list_data = [{'x': 'this is a test string'}]
expected_values = [np.array([1])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.HashStrings(columns=['x'], hash_buckets=2)))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_values, equals_fn=np.array_equal))
def test_one_bucket_multi_string(self):
list_data = [{'x': ['these', 'are', 'test', 'strings']}]
expected_values = [np.array([0, 0, 0, 0])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.HashStrings(columns=['x'], hash_buckets=1)))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_values, equals_fn=np.array_equal))
def test_two_bucket_multi_string(self):
list_data = [{'x': ['these', 'are', 'test', 'strings']}]
expected_values = [np.array([1, 0, 1, 0])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.HashStrings(
columns=['x'], hash_buckets=2, key=[123, 456])))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_values, equals_fn=np.array_equal))
def test_multi_buckets_multi_string(self):
# This is a recreation of one of the TFT test cases from
# https://github.com/tensorflow/transform/blob/master/tensorflow_transform/mappers_test.py
list_data = [{'x': ['Cake', 'Pie', 'Sundae']}]
expected_values = [np.array([6, 5, 6])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.HashStrings(
columns=['x'], hash_buckets=11, key=[123, 456])))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_values, equals_fn=np.array_equal))
class DeduplicateTensorPerRowTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.artifact_location)
def test_deduplicate(self):
values = [{
'x': [b'a', b'b', b'a', b'b'],
}, {
'x': [b'b', b'c', b'b', b'c']
}]
expected_output = [np.array([b'a', b'b']), np.array([b'b', b'c'])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(values)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.DeduplicateTensorPerRow(columns=['x'])))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_output, equals_fn=np.array_equal))
def test_deduplicate_no_op(self):
values = [{
'x': [b'a', b'b'],
}, {
'x': [b'c', b'd']
}]
expected_output = [np.array([b'a', b'b']), np.array([b'c', b'd'])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(values)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.DeduplicateTensorPerRow(columns=['x'])))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_output, equals_fn=np.array_equal))
def test_deduplicate_different_output_sizes(self):
values = [{
'x': [b'a', b'b', b'a', b'b'],
}, {
'x': [b'c', b'a', b'd', b'd']
}]
expected_output = [np.array([b'a', b'b']), np.array([b'c', b'a', b'd'])]
with beam.Pipeline() as p:
list_result = (
p
| "listCreate" >> beam.Create(values)
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.DeduplicateTensorPerRow(columns=['x'])))
result = (list_result | beam.Map(lambda x: x.x))
assert_that(result, equal_to(expected_output, equals_fn=np.array_equal))
if __name__ == '__main__':
unittest.main()