blob: c2ea9fa1e95597dd6992c748a31c29b9a72757d1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import io
import math
import os
import pickle
import platform
import shutil
import sys
import tempfile
import unittest
from typing import Any
from typing import Dict
from typing import Optional
from typing import Sequence
import joblib
import numpy
import pandas
from sklearn import linear_model
from sklearn import svm
from sklearn.base import BaseEstimator
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
import apache_beam as beam
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.sklearn_inference import _default_numpy_inference_fn
from apache_beam.ml.inference.sklearn_inference import _default_pandas_inference_fn
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
def _compare_prediction_result(a, b):
example_equal = numpy.array_equal(a.example, b.example)
if isinstance(a.inference, dict):
return all(
x == y for x, y in zip(a.inference.values(),
b.inference.values())) and example_equal
return a.inference == b.inference and example_equal
def _compare_dataframe_predictions(a_in, b_in):
keys_equal = True
if isinstance(a_in, tuple) and not isinstance(a_in, PredictionResult):
a_key, a = a_in
b_key, b = b_in
keys_equal = a_key == b_key
else:
a = a_in
b = b_in
example_equal = pandas.DataFrame.equals(a.example, b.example)
if isinstance(a.inference, dict):
return all(
math.floor(a) == math.floor(b) for a,
b in zip(a.inference.values(), b.inference.values())) and example_equal
inference_equal = math.floor(a.inference) == math.floor(b.inference)
return inference_equal and example_equal and keys_equal
class FakeModel:
def __init__(self):
self.total_predict_calls = 0
def predict(self, input_vector: numpy.ndarray):
self.total_predict_calls += 1
return numpy.sum(input_vector, axis=1)
class FakeNumpyModelDictOut:
def __init__(self):
self.total_predict_calls = 0
def predict(self, input_vector: numpy.ndarray):
self.total_predict_calls += 1
out = numpy.sum(input_vector, axis=1)
return {"out1": out, "out2": out}
class FakePandasModelDictOut:
def __init__(self):
self.total_predict_calls = 0
def predict(self, df: pandas.DataFrame):
self.total_predict_calls += 1
out = df.loc[:, 'number_2']
return {"out1": out, "out2": out}
def build_model():
x = [[0, 0], [1, 1]]
y = [0, 1]
model = svm.SVC()
model.fit(x, y)
return model
def pandas_dataframe():
csv_string = (
'category_1,number_1,category_2,number_2,label,number_3\n'
'red,4,frog,5,6,7\n'
'blue,3,horse,8,9,10\n'
'red,0,cow,1,2,3\n'
'blue,4,frog,1,1,1\n'
'red,1,horse,4,2,3')
csv_string_io = io.StringIO(csv_string)
return pandas.read_csv(csv_string_io)
def build_pandas_pipeline():
"""Builds a common type of pandas pipeline with preprocessing."""
categorical_columns = ['category_1', 'category_2']
numerical_columns = ['number_1', 'number_2', 'number_3']
categorical_transformer = OneHotEncoder(handle_unknown='ignore')
numerical_transformer = StandardScaler()
preprocessor = ColumnTransformer(
transformers=[
("numerical", numerical_transformer, numerical_columns),
("categorical", categorical_transformer, categorical_columns),
])
pipeline = Pipeline(
steps=[("preprocessor",
preprocessor), ("classifier", linear_model.SGDRegressor())])
data = pandas_dataframe()
labels = data['label']
pipeline.fit(data, labels)
return pipeline
def convert_inference_to_floor(prediction_result):
return math.floor(prediction_result.inference)
def alternate_numpy_inference_fn(
model: BaseEstimator,
batch: Sequence[numpy.ndarray],
inference_args: Optional[Dict[str, Any]] = None) -> Any:
return [0]
def alternate_pandas_inference_fn(
model: BaseEstimator,
batch: Sequence[pandas.DataFrame],
inference_args: Optional[Dict[str, Any]] = None) -> Any:
# vectorize data for better performance
vectorized_batch = pandas.concat(batch, axis=0)
predictions = model.predict(vectorized_batch)
splits = [
vectorized_batch.iloc[[i]] for i in range(vectorized_batch.shape[0])
]
predictions = predictions - 1
return predictions, splits
class SkLearnRunInferenceTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_predict_output(self):
fake_model = FakeModel()
inference_runner = SklearnModelHandlerNumpy(model_uri='unused')
batched_examples = [
numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
]
expected_predictions = [
PredictionResult(numpy.array([1, 2, 3]), 6),
PredictionResult(numpy.array([4, 5, 6]), 15),
PredictionResult(numpy.array([7, 8, 9]), 24)
]
inferences = inference_runner.run_inference(batched_examples, fake_model)
for actual, expected in zip(inferences, expected_predictions):
self.assertTrue(_compare_prediction_result(actual, expected))
def test_custom_inference_fn(self):
fake_model = FakeModel()
inference_runner = SklearnModelHandlerNumpy(
model_uri='unused', inference_fn=alternate_numpy_inference_fn)
batched_examples = [
numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
]
expected_predictions = [
PredictionResult(numpy.array([1, 2, 3]), 0),
PredictionResult(numpy.array([4, 5, 6]), 0),
PredictionResult(numpy.array([7, 8, 9]), 0)
]
inferences = inference_runner.run_inference(batched_examples, fake_model)
for actual, expected in zip(inferences, expected_predictions):
self.assertTrue(_compare_prediction_result(actual, expected))
def test_predict_output_dict(self):
fake_model = FakeNumpyModelDictOut()
inference_runner = SklearnModelHandlerNumpy(model_uri='unused')
batched_examples = [
numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
]
expected_predictions = [
PredictionResult(numpy.array([1, 2, 3]), {
"out1": 6, "out2": 6
}),
PredictionResult(numpy.array([4, 5, 6]), {
"out1": 15, "out2": 15
}),
PredictionResult(numpy.array([7, 8, 9]), {
"out1": 24, "out2": 24
})
]
inferences = inference_runner.run_inference(batched_examples, fake_model)
for actual, expected in zip(inferences, expected_predictions):
self.assertTrue(_compare_prediction_result(actual, expected))
def test_data_vectorized(self):
fake_model = FakeModel()
inference_runner = SklearnModelHandlerNumpy(model_uri='unused')
batched_examples = [
numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
]
# even though there are 3 examples, the data should
# be vectorized and only 1 call should happen.
inference_runner.run_inference(batched_examples, fake_model)
self.assertEqual(1, fake_model.total_predict_calls)
def test_num_bytes_numpy(self):
inference_runner = SklearnModelHandlerNumpy(model_uri='unused')
batched_examples_int = [
numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9])
]
self.assertEqual(
sys.getsizeof(batched_examples_int[0]) * 3,
inference_runner.get_num_bytes(batched_examples_int))
batched_examples_float = [
numpy.array([1.0, 2.0, 3.0]),
numpy.array([4.1, 5.2, 6.3]),
numpy.array([7.7, 8.8, 9.9])
]
self.assertEqual(
sys.getsizeof(batched_examples_float[0]) * 3,
inference_runner.get_num_bytes(batched_examples_float))
def test_pipeline_pickled(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_model(), file)
with TestPipeline() as pipeline:
examples = [numpy.array([0, 0]), numpy.array([1, 1])]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
SklearnModelHandlerNumpy(model_uri=temp_file_name))
expected = [
PredictionResult(numpy.array([0, 0]), 0),
PredictionResult(numpy.array([1, 1]), 1)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_pipeline_pickled_custom_batching(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_model(), file)
def batch_validator_numpy_inference_fn(
model: BaseEstimator,
batch: Sequence[numpy.ndarray],
inference_args: Optional[Dict[str, Any]] = None) -> Any:
if len(batch) != 2:
raise Exception(
f'Expected batch of size 2, received batch of size {len(batch)}')
return _default_numpy_inference_fn(model, batch, inference_args)
with TestPipeline() as pipeline:
examples = [numpy.array([0, 0]), numpy.array([1, 1])]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
SklearnModelHandlerNumpy(
model_uri=temp_file_name,
inference_fn=batch_validator_numpy_inference_fn,
min_batch_size=2,
max_batch_size=2))
expected = [
PredictionResult(numpy.array([0, 0]), 0),
PredictionResult(numpy.array([1, 1]), 1)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_pipeline_pickled_large_model(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_model(), file)
def large_model_validator_numpy_inference_fn(
model: BaseEstimator,
batch: Sequence[numpy.ndarray],
inference_args: Optional[Dict[str, Any]] = None) -> Any:
multi_process_shared_loaded = "multi_process_shared" in str(type(model))
if not multi_process_shared_loaded:
raise Exception(
f'Loaded model of type {type(model)}, was ' +
'expecting multi_process_shared_model')
return _default_numpy_inference_fn(model, batch, inference_args)
with TestPipeline() as pipeline:
examples = [numpy.array([0, 0]), numpy.array([1, 1])]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
SklearnModelHandlerNumpy(
model_uri=temp_file_name,
inference_fn=large_model_validator_numpy_inference_fn,
large_model=True))
expected = [
PredictionResult(numpy.array([0, 0]), 0),
PredictionResult(numpy.array([1, 1]), 1)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_pipeline_joblib(self):
temp_file_name = self.tmpdir + os.sep + 'joblib_file'
with open(temp_file_name, 'wb') as file:
joblib.dump(build_model(), file)
with TestPipeline() as pipeline:
examples = [numpy.array([0, 0]), numpy.array([1, 1])]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
SklearnModelHandlerNumpy(
model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB))
expected = [
PredictionResult(numpy.array([0, 0]), 0),
PredictionResult(numpy.array([1, 1]), 1)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_bad_file_raises(self):
with self.assertRaises(RuntimeError):
with TestPipeline() as pipeline:
examples = [numpy.array([0, 0])]
pcoll = pipeline | 'start' >> beam.Create(examples)
_ = pcoll | RunInference(
SklearnModelHandlerNumpy(model_uri='/var/bad_file_name'))
pipeline.run()
def test_bad_input_type_raises(self):
with self.assertRaisesRegex(AssertionError,
'Unsupported serialization type'):
with tempfile.NamedTemporaryFile(delete=False) as file:
model_handler = SklearnModelHandlerNumpy(
model_uri=file.name, model_file_type=None)
model_handler.load_model()
def test_env_vars_set_correctly_numpy(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_model(), file)
handler_with_vars = SklearnModelHandlerNumpy(
env_vars={'FOO': 'bar'}, model_uri=temp_file_name)
os.environ.pop('FOO', None)
self.assertFalse('FOO' in os.environ)
examples = [numpy.array([0, 0]), numpy.array([1, 1])]
with TestPipeline() as pipeline:
_ = (
pipeline
| 'start' >> beam.Create(examples)
| RunInference(handler_with_vars))
pipeline.run()
self.assertTrue('FOO' in os.environ)
self.assertTrue((os.environ['FOO']) == 'bar')
def test_pipeline_pandas(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_pandas_pipeline(), file)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
pcoll = pipeline | 'start' >> beam.Create(splits)
actual = pcoll | RunInference(
SklearnModelHandlerPandas(model_uri=temp_file_name))
expected = [
PredictionResult(splits[0], 5),
PredictionResult(splits[1], 8),
PredictionResult(splits[2], 1),
PredictionResult(splits[3], 1),
PredictionResult(splits[4], 2),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
def test_pipeline_pandas_env_vars_set_correctly(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_pandas_pipeline(), file)
handler_with_vars = SklearnModelHandlerPandas(
env_vars={'FOO': 'bar'}, model_uri=temp_file_name)
os.environ.pop('FOO', None)
self.assertFalse('FOO' in os.environ)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
_ = (
pipeline
| 'start' >> beam.Create(splits)
| RunInference(handler_with_vars))
pipeline.run()
self.assertTrue('FOO' in os.environ)
self.assertTrue((os.environ['FOO']) == 'bar')
def test_pipeline_pandas_custom_batching(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_pandas_pipeline(), file)
def batch_validator_pandas_inference_fn(
model: BaseEstimator,
batch: Sequence[numpy.ndarray],
inference_args: Optional[Dict[str, Any]] = None) -> Any:
if len(batch) != 5:
raise Exception(
f'Expected batch of size 5, received batch of size {len(batch)}')
return _default_pandas_inference_fn(model, batch, inference_args)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
pcoll = pipeline | 'start' >> beam.Create(splits)
actual = pcoll | RunInference(
SklearnModelHandlerPandas(
model_uri=temp_file_name,
inference_fn=batch_validator_pandas_inference_fn,
min_batch_size=5,
max_batch_size=5))
expected = [
PredictionResult(splits[0], 5),
PredictionResult(splits[1], 8),
PredictionResult(splits[2], 1),
PredictionResult(splits[3], 1),
PredictionResult(splits[4], 2),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
def test_pipeline_pandas_large_model(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_pandas_pipeline(), file)
def large_model_validator_pandas_inference_fn(
model: BaseEstimator,
batch: Sequence[numpy.ndarray],
inference_args: Optional[Dict[str, Any]] = None) -> Any:
multi_process_shared_loaded = "multi_process_shared" in str(type(model))
if not multi_process_shared_loaded:
raise Exception(
f'Loaded model of type {type(model)}, was ' +
'expecting multi_process_shared_model')
return _default_pandas_inference_fn(model, batch, inference_args)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
pcoll = pipeline | 'start' >> beam.Create(splits)
actual = pcoll | RunInference(
SklearnModelHandlerPandas(
model_uri=temp_file_name,
inference_fn=large_model_validator_pandas_inference_fn,
large_model=True))
expected = [
PredictionResult(splits[0], 5),
PredictionResult(splits[1], 8),
PredictionResult(splits[2], 1),
PredictionResult(splits[3], 1),
PredictionResult(splits[4], 2),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
def test_pipeline_pandas_custom_inference(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_pandas_pipeline(), file)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
pcoll = pipeline | 'start' >> beam.Create(splits)
actual = pcoll | RunInference(
SklearnModelHandlerPandas(
model_uri=temp_file_name,
inference_fn=alternate_pandas_inference_fn))
expected = [
PredictionResult(splits[0], 4),
PredictionResult(splits[1], 7),
PredictionResult(splits[2], 0),
PredictionResult(splits[3], 0),
PredictionResult(splits[4], 1),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
def test_pipeline_pandas_dict_out(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(FakePandasModelDictOut(), file)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
pcoll = pipeline | 'start' >> beam.Create(splits)
actual = pcoll | RunInference(
SklearnModelHandlerPandas(model_uri=temp_file_name))
expected = [
PredictionResult(splits[0], {
'out1': 5, 'out2': 5
}),
PredictionResult(splits[1], {
'out1': 8, 'out2': 8
}),
PredictionResult(splits[2], {
'out1': 1, 'out2': 1
}),
PredictionResult(splits[3], {
'out1': 1, 'out2': 1
}),
PredictionResult(splits[4], {
'out1': 4, 'out2': 4
}),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
@unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
def test_pipeline_pandas_joblib(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
joblib.dump(build_pandas_pipeline(), file)
with TestPipeline() as pipeline:
dataframe = pandas_dataframe()
splits = [dataframe.loc[[i]] for i in dataframe.index]
pcoll = pipeline | 'start' >> beam.Create(splits)
actual = pcoll | RunInference(
SklearnModelHandlerPandas(
model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB))
expected = [
PredictionResult(splits[0], 5),
PredictionResult(splits[1], 8),
PredictionResult(splits[2], 1),
PredictionResult(splits[3], 1),
PredictionResult(splits[4], 2),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
def test_pipeline_pandas_with_keys(self):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(build_pandas_pipeline(), file)
with TestPipeline() as pipeline:
data_frame = pandas_dataframe()
keys = [str(i) for i in range(5)]
splits = [data_frame.loc[[i]] for i in data_frame.index]
keyed_rows = [(key, value) for key, value in zip(keys, splits)]
pcoll = pipeline | 'start' >> beam.Create(keyed_rows)
actual = pcoll | RunInference(
KeyedModelHandler(
SklearnModelHandlerPandas(model_uri=temp_file_name)))
expected = [
('0', PredictionResult(splits[0], 5)),
('1', PredictionResult(splits[1], 8)),
('2', PredictionResult(splits[2], 1)),
('3', PredictionResult(splits[3], 1)),
('4', PredictionResult(splits[4], 2)),
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
def test_infer_too_many_rows_in_dataframe(self):
with self.assertRaisesRegex(
ValueError, r'Only dataframes with single rows are supported'):
data_frame_too_many_rows = pandas_dataframe()
fake_model = FakeModel()
inference_runner = SklearnModelHandlerPandas(model_uri='unused')
inference_runner.run_inference([data_frame_too_many_rows], fake_model)
if __name__ == '__main__':
unittest.main()