blob: 8e3fb77615ccd1feb81ee92fc8a19f44f3920308 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import shutil
import sys
import tempfile
import unittest
import zipfile
from typing import Any
try:
import datatable
import numpy
import pandas
import pytest
import scipy
import xgboost
import apache_beam as beam
from apache_beam.ml.inference import RunInference
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerDatatable
from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerNumpy
from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerPandas
from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerSciPy
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
except ImportError:
raise unittest.SkipTest('XGBoost dependencies are not installed')
def _compare_prediction_result(a: PredictionResult, b: PredictionResult):
if isinstance(a.example, scipy.sparse.csr_matrix) and isinstance(
b.example, scipy.sparse.csr_matrix):
example_equal = numpy.array_equal(a.example.todense(), b.example.todense())
else:
example_equal = numpy.array_equal(a.example, b.example) # type: ignore[arg-type]
if isinstance(a.inference, dict):
return all(
x == y for x, y in zip(a.inference.values(),
b.inference.values())) and example_equal
return a.inference == b.inference and example_equal
def _compare_keyed_prediction_result(
a: tuple[Any, PredictionResult], b: tuple[Any, PredictionResult]):
a_key, a_val = a
b_key, b_val = b
keys_equal = a_key == b_key
return _compare_prediction_result(a_val, b_val) and keys_equal
def predict_fn(self, data):
self.inference_calls += 1
if isinstance(data, pandas.DataFrame):
data = data.to_numpy()
if isinstance(data, datatable.Frame):
data = data.to_numpy()
if isinstance(data, scipy.sparse.csr_matrix):
data = data.toarray()
return sum(sum(array) for array in data)
@pytest.fixture(autouse=True)
def predict_patched(monkeypatch):
monkeypatch.setattr(xgboost.XGBClassifier, 'predict', predict_fn)
def build_monkeypatched_xgboost_classifier() -> xgboost.XGBClassifier:
model = xgboost.XGBClassifier()
model.inference_calls = 0
model.fit([[0, 0], [0, 1], [1, 0], [1, 1]], [0, 1, 0, 1])
return model
@pytest.mark.uses_xgboost
class XGBoostRunInferenceTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_predict_output(self):
model = build_monkeypatched_xgboost_classifier()
inference_runner = XGBoostModelHandlerNumpy(xgboost.XGBClassifier, 'unused')
batched_examples = [
numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
numpy.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]])
]
expected_predictions = [
PredictionResult(numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), 45),
PredictionResult(numpy.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]), 9),
]
inferences = inference_runner.run_inference(batched_examples, model)
for actual, expected in zip(inferences, expected_predictions):
self.assertTrue(_compare_prediction_result(actual, expected))
def test_single_inference_call(self):
model = build_monkeypatched_xgboost_classifier()
inference_runner = XGBoostModelHandlerNumpy(xgboost.XGBClassifier, 'unused')
self.assertEqual(model.inference_calls, 0)
batched_examples = [numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])]
_ = inference_runner.run_inference(batched_examples, model)
self.assertEqual(model.inference_calls, 1)
def test_multiple_inference_calls(self):
model = build_monkeypatched_xgboost_classifier()
inference_runner = XGBoostModelHandlerNumpy(xgboost.XGBClassifier, 'unused')
self.assertEqual(model.inference_calls, 0)
batched_examples = [
numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
numpy.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
]
_ = inference_runner.run_inference(batched_examples, model)
self.assertEqual(model.inference_calls, 5)
def test_num_bytes_numpy(self):
inference_runner = XGBoostModelHandlerNumpy(
model_class=xgboost.XGBClassifier, model_state='unused')
batched_examples_int = [
numpy.array([[1, 1], [2, 2]]),
numpy.array([[2, 4], [6, 8]]),
]
self.assertEqual(
sys.getsizeof(batched_examples_int[0]) +
sys.getsizeof(batched_examples_int[1]),
inference_runner.get_num_bytes(batched_examples_int))
batched_examples_float = [
numpy.array([[1.0, 1.0], [2.0, 2.0]]),
numpy.array([[2.0, 4.0], [6.0, 8.0]]),
]
self.assertEqual(
sys.getsizeof(batched_examples_float[0]) +
sys.getsizeof(batched_examples_float[1]),
inference_runner.get_num_bytes(batched_examples_float))
def test_num_bytes_pandas(self):
inference_runner = XGBoostModelHandlerPandas(
model_class=xgboost.XGBClassifier, model_state='unused')
batched_examples_int = [
pandas.DataFrame([[1, 1], [2, 2]]),
pandas.DataFrame([[2, 4], [6, 8]]),
]
self.assertEqual(
batched_examples_int[0].memory_usage(deep=True).sum() +
batched_examples_int[1].memory_usage(deep=True).sum(),
inference_runner.get_num_bytes(batched_examples_int))
batched_examples_float = [
pandas.DataFrame([[1.0, 1.0], [2.0, 2.0]]),
pandas.DataFrame([[2.0, 4.0], [6.0, 8.0]]),
]
self.assertEqual(
batched_examples_float[0].memory_usage(deep=True).sum() +
batched_examples_float[1].memory_usage(deep=True).sum(),
inference_runner.get_num_bytes(batched_examples_float))
def test_num_bytes_datatable(self):
inference_runner = XGBoostModelHandlerDatatable(
model_class=xgboost.XGBClassifier, model_state='unused')
batched_examples_int = [
datatable.Frame([[1, 1], [2, 2]]),
datatable.Frame([[2, 4], [6, 8]]),
]
self.assertEqual(
sys.getsizeof(batched_examples_int[0]) +
sys.getsizeof(batched_examples_int[1]),
inference_runner.get_num_bytes(batched_examples_int))
batched_examples_float = [
datatable.Frame([[1.0, 1.0], [2.0, 2.0]]),
datatable.Frame([[2.0, 4.0], [6.0, 8.0]]),
]
self.assertEqual(
sys.getsizeof(batched_examples_float[0]) +
sys.getsizeof(batched_examples_float[1]),
inference_runner.get_num_bytes(batched_examples_float))
def test_num_bytes_scipy(self):
inference_runner = XGBoostModelHandlerSciPy(
model_class=xgboost.XGBClassifier, model_state='unused')
batched_examples_int = [
scipy.sparse.csr_matrix([[1, 1], [2, 2]]),
scipy.sparse.csr_matrix([[2, 4], [6, 8]]),
]
self.assertEqual(
sys.getsizeof(batched_examples_int[0]) +
sys.getsizeof(batched_examples_int[1]),
inference_runner.get_num_bytes(batched_examples_int))
batched_examples_float = [
scipy.sparse.csr_matrix([[1.0, 1.0], [2.0, 2.0]]),
scipy.sparse.csr_matrix([[2.0, 4.0], [6.0, 8.0]]),
]
self.assertEqual(
sys.getsizeof(batched_examples_float[0]) +
sys.getsizeof(batched_examples_float[1]),
inference_runner.get_num_bytes(batched_examples_float))
def test_pipeline_numpy(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
numpy.array([[1, 1], [2, 2]]),
numpy.array([[2, 4], [6, 8]]),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
XGBoostModelHandlerNumpy(
model_class=xgboost.XGBClassifier, model_state=model_state))
expected = [
PredictionResult(numpy.array([[1, 1], [2, 2]]), 6),
PredictionResult(numpy.array([[2, 4], [6, 8]]), 20)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_pipeline_numpy_sets_env_vars_correctly(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
os.environ.pop('FOO', None)
self.assertFalse('FOO' in os.environ)
with TestPipeline() as pipeline:
examples = [
numpy.array([[1, 1], [2, 2]]),
numpy.array([[2, 4], [6, 8]]),
]
handler_with_vars = XGBoostModelHandlerNumpy(
env_vars={'FOO': 'bar'},
model_class=xgboost.XGBClassifier,
model_state=model_state)
_ = (
pipeline
| 'start' >> beam.Create(examples)
| RunInference(handler_with_vars))
pipeline.run()
self.assertTrue('FOO' in os.environ)
self.assertTrue((os.environ['FOO']) == 'bar')
def test_pipeline_pandas(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
pandas.DataFrame([[1, 1], [2, 2]]),
pandas.DataFrame([[2, 4], [6, 8]]),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
XGBoostModelHandlerPandas(
model_class=xgboost.XGBClassifier, model_state=model_state))
expected = [
PredictionResult(pandas.DataFrame([[1, 1], [2, 2]]), 6),
PredictionResult(pandas.DataFrame([[2, 4], [6, 8]]), 20)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
pipeline.run()
def test_pipeline_pandas_sets_env_vars_correctly(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
os.environ.pop('FOO', None)
self.assertFalse('FOO' in os.environ)
with TestPipeline() as pipeline:
examples = [
pandas.DataFrame([[1, 1], [2, 2]]),
pandas.DataFrame([[2, 4], [6, 8]]),
]
handler_with_vars = XGBoostModelHandlerPandas(
env_vars={'FOO': 'bar'},
model_class=xgboost.XGBClassifier,
model_state=model_state)
_ = (
pipeline
| 'start' >> beam.Create(examples)
| RunInference(handler_with_vars))
pipeline.run()
self.assertTrue('FOO' in os.environ)
self.assertTrue((os.environ['FOO']) == 'bar')
def test_pipeline_datatable(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
datatable.Frame([[1, 1], [2, 2]]),
datatable.Frame([[2, 4], [6, 8]]),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
XGBoostModelHandlerDatatable(
model_class=xgboost.XGBClassifier, model_state=model_state))
expected = [
PredictionResult(datatable.Frame([[1, 1], [2, 2]]), 6),
PredictionResult(datatable.Frame([[2, 4], [6, 8]]), 20)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_pipeline_datatable_sets_env_vars_correctly(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
os.environ.pop('FOO', None)
self.assertFalse('FOO' in os.environ)
with TestPipeline() as pipeline:
examples = [
datatable.Frame([[1, 1], [2, 2]]),
datatable.Frame([[2, 4], [6, 8]]),
]
handler_with_vars = XGBoostModelHandlerDatatable(
env_vars={'FOO': 'bar'},
model_class=xgboost.XGBClassifier,
model_state=model_state)
_ = (
pipeline
| 'start' >> beam.Create(examples)
| RunInference(handler_with_vars))
pipeline.run()
self.assertTrue('FOO' in os.environ)
self.assertTrue((os.environ['FOO']) == 'bar')
def test_pipeline_scipy(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
scipy.sparse.csr_matrix(numpy.array([[1, 1], [2, 2]])),
scipy.sparse.csr_matrix(numpy.array([[2, 4], [6, 8]])),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
XGBoostModelHandlerSciPy(
model_class=xgboost.XGBClassifier, model_state=model_state))
expected = [
PredictionResult(
scipy.sparse.csr_matrix(numpy.array([[1, 1], [2, 2]])), 6),
PredictionResult(
scipy.sparse.csr_matrix(numpy.array([[2, 4], [6, 8]])), 20)
]
assert_that(
actual, equal_to(expected, equals_fn=_compare_prediction_result))
def test_pipeline_scipy_sets_env_vars_correctly(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
os.environ.pop('FOO', None)
self.assertFalse('FOO' in os.environ)
with TestPipeline() as pipeline:
examples = [
scipy.sparse.csr_matrix(numpy.array([[1, 1], [2, 2]])),
scipy.sparse.csr_matrix(numpy.array([[2, 4], [6, 8]])),
]
handler_with_vars = XGBoostModelHandlerSciPy(
env_vars={'FOO': 'bar'},
model_class=xgboost.XGBClassifier,
model_state=model_state)
_ = (
pipeline
| 'start' >> beam.Create(examples)
| RunInference(handler_with_vars))
pipeline.run()
self.assertTrue('FOO' in os.environ)
self.assertTrue((os.environ['FOO']) == 'bar')
def test_bad_model_file_raises(self):
model_state = self.tmpdir + os.sep + 'bad_file_name.json'
with self.assertRaises(RuntimeError):
with TestPipeline() as pipeline:
examples = [
datatable.Frame([[1, 1], [2, 2]]),
datatable.Frame([[2, 4], [6, 8]]),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
_ = pcoll | RunInference(
XGBoostModelHandlerNumpy(xgboost.XGBClassifier, model_state))
pipeline.run()
def test_bad_input_type_raises(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
archived_model_state = self.tmpdir + os.sep + 'model.zip'
zip_file = zipfile.ZipFile(archived_model_state, "w", zipfile.ZIP_DEFLATED)
zip_file.write(model_state)
zip_file.close()
with self.assertRaises(xgboost.core.XGBoostError):
model_handler = XGBoostModelHandlerNumpy(
xgboost.XGBClassifier, model_state=archived_model_state)
model_handler.load_model()
def test_pipeline_scipy_with_keys(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
('0', scipy.sparse.csr_matrix([[1, 1], [2, 2]])),
('1', scipy.sparse.csr_matrix([[2, 4], [6, 8]])),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
KeyedModelHandler(
XGBoostModelHandlerSciPy(
model_class=xgboost.XGBClassifier, model_state=model_state)))
expected = [
('0', PredictionResult(scipy.sparse.csr_matrix([[1, 1], [2, 2]]), 6)),
(
'1',
PredictionResult(scipy.sparse.csr_matrix([[2, 4], [6, 8]]), 20))
]
assert_that(
actual,
equal_to(expected, equals_fn=_compare_keyed_prediction_result))
def test_pipeline_numpy_with_keys(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
('0', numpy.array([[1, 1], [2, 2]])),
('1', numpy.array([[2, 4], [6, 8]])),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
KeyedModelHandler(
XGBoostModelHandlerNumpy(
model_class=xgboost.XGBClassifier, model_state=model_state)))
expected = [('0', PredictionResult(numpy.array([[1, 1], [2, 2]]), 6)),
('1', PredictionResult(numpy.array([[2, 4], [6, 8]]), 20))]
assert_that(
actual,
equal_to(expected, equals_fn=_compare_keyed_prediction_result))
def test_pipeline_pandas_with_keys(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
('0', pandas.DataFrame([[1, 1], [2, 2]])),
('1', pandas.DataFrame([[2, 4], [6, 8]])),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
KeyedModelHandler(
XGBoostModelHandlerPandas(
model_class=xgboost.XGBClassifier, model_state=model_state)))
expected = [
('0', PredictionResult(pandas.DataFrame([[1, 1], [2, 2]]), 6)),
('1', PredictionResult(pandas.DataFrame([[2, 4], [6, 8]]), 20))
]
assert_that(
actual,
equal_to(expected, equals_fn=_compare_keyed_prediction_result))
def test_pipeline_datatable_with_keys(self):
model = build_monkeypatched_xgboost_classifier()
model_state = self.tmpdir + os.sep + 'model.json'
model.save_model(model_state)
with TestPipeline() as pipeline:
examples = [
('0', datatable.Frame([[1, 1], [2, 2]])),
('1', datatable.Frame([[2, 4], [6, 8]])),
]
pcoll = pipeline | 'start' >> beam.Create(examples)
actual = pcoll | RunInference(
KeyedModelHandler(
XGBoostModelHandlerDatatable(
model_class=xgboost.XGBClassifier, model_state=model_state)))
expected = [
('0', PredictionResult(datatable.Frame([[1, 1], [2, 2]]), 6)),
('1', PredictionResult(datatable.Frame([[2, 4], [6, 8]]), 20))
]
assert_that(
actual,
equal_to(expected, equals_fn=_compare_keyed_prediction_result))
if __name__ == '__main__':
unittest.main()