| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Tests for schemas.""" |
| |
| # pytype: skip-file |
| |
| import typing |
| import unittest |
| |
| import numpy as np |
| import pandas as pd |
| from parameterized import parameterized |
| |
| import apache_beam as beam |
| from apache_beam.coders import RowCoder |
| from apache_beam.coders.typecoders import registry as coders_registry |
| from apache_beam.dataframe import schemas |
| from apache_beam.dataframe import transforms |
| from apache_beam.testing.test_pipeline import TestPipeline |
| from apache_beam.testing.util import assert_that |
| from apache_beam.testing.util import equal_to |
| from apache_beam.typehints import row_type |
| from apache_beam.typehints import typehints |
| from apache_beam.typehints.native_type_compatibility import match_is_named_tuple |
| |
| Simple = typing.NamedTuple( |
| 'Simple', [('name', str), ('id', int), ('height', float)]) |
| coders_registry.register_coder(Simple, RowCoder) |
| Animal = typing.NamedTuple( |
| 'Animal', [('animal', str), ('max_speed', typing.Optional[float])]) |
| coders_registry.register_coder(Animal, RowCoder) |
| |
| |
| def matches_df(expected): |
| def check_df_pcoll_equal(actual): |
| actual = pd.concat(actual) |
| sorted_actual = actual.sort_values(by=list(actual.columns)).reset_index( |
| drop=True) |
| sorted_expected = expected.sort_values( |
| by=list(expected.columns)).reset_index(drop=True) |
| pd.testing.assert_frame_equal(sorted_actual, sorted_expected) |
| |
| return check_df_pcoll_equal |
| |
| |
| # Test data for all supported types that can be easily tested. |
| # Excludes bytes because it's difficult to create a series and dataframe bytes |
| # dtype. For example: |
| # pd.Series([b'abc'], dtype=bytes).dtype != 'S' |
| # pd.Series([b'abc'], dtype=bytes).astype(bytes).dtype == 'S' |
| # (test data, pandas_type, column_name, beam_type) |
| COLUMNS: typing.List[typing.Tuple[typing.List[typing.Any], |
| typing.Any, |
| str, |
| typing.Any]] = [ |
| ([375, 24, 0, 10, 16], |
| np.int32, |
| 'i32', |
| np.int32), |
| ([375, 24, 0, 10, 16], |
| np.int64, |
| 'i64', |
| np.int64), |
| ([375, 24, None, 10, 16], |
| pd.Int32Dtype(), |
| 'i32_nullable', |
| typing.Optional[np.int32]), |
| ([375, 24, None, 10, 16], |
| pd.Int64Dtype(), |
| 'i64_nullable', |
| typing.Optional[np.int64]), |
| ([375., 24., None, 10., 16.], |
| np.float64, |
| 'f64', |
| typing.Optional[np.float64]), |
| ([375., 24., None, 10., 16.], |
| np.float32, |
| 'f32', |
| typing.Optional[np.float32]), |
| ([True, False, True, True, False], |
| bool, |
| 'bool', |
| bool), |
| (['Falcon', 'Ostrich', None, 3.14, 0], |
| object, |
| 'any', |
| typing.Any), |
| ([True, False, True, None, False], |
| pd.BooleanDtype(), |
| 'bool_nullable', |
| typing.Optional[bool]), |
| ([ |
| 'Falcon', |
| 'Ostrich', |
| None, |
| 'Aardvark', |
| 'Elephant' |
| ], |
| pd.StringDtype(), |
| 'strdtype', |
| typing.Optional[str]), |
| ] |
| |
| NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name, _ in COLUMNS]) |
| for arr, dtype, name, _ in COLUMNS: |
| NICE_TYPES_DF[name] = pd.Series(arr, dtype=dtype, name=name).astype(dtype) |
| |
| NICE_TYPES_PROXY = NICE_TYPES_DF[:0] |
| |
| SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr, beam_type) |
| for (arr, dtype, name, beam_type) in COLUMNS] |
| |
| _TEST_ARRAYS: typing.List[typing.List[typing.Any]] = [ |
| arr for (arr, _, _, _) in COLUMNS |
| ] |
| DF_RESULT = list(zip(*_TEST_ARRAYS)) |
| BEAM_SCHEMA = typing.NamedTuple( # type: ignore |
| 'BEAM_SCHEMA', [(name, beam_type) for _, _, name, beam_type in COLUMNS]) |
| INDEX_DF_TESTS = [( |
| NICE_TYPES_DF.set_index([name for _, _, name, _ in COLUMNS[:i]]), |
| DF_RESULT, |
| BEAM_SCHEMA) for i in range(1, len(COLUMNS) + 1)] |
| |
| NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT, BEAM_SCHEMA)] |
| |
| # Get major, minor, bugfix version |
| PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:3])) |
| |
| |
| def test_name_func(testcase_func, param_num, params): |
| df_or_series, _, _ = params.args |
| if isinstance(df_or_series, pd.Series): |
| return f"{testcase_func.__name__}_Series[{df_or_series.dtype}]" |
| elif isinstance(df_or_series, pd.DataFrame): |
| return ( |
| f"{testcase_func.__name__}_DataFrame" |
| f"[{','.join(str(dtype) for dtype in df_or_series.dtypes)}]") |
| else: |
| raise ValueError( |
| f"Encountered unsupported param in {testcase_func.__name__}. " |
| "Expected Series or DataFrame, got:\n" + str(df_or_series)) |
| |
| |
| class SchemasTest(unittest.TestCase): |
| def test_simple_df(self): |
| expected = pd.DataFrame({ |
| 'name': list(str(i) for i in range(5)), |
| 'id': list(range(5)), |
| 'height': list(float(i) for i in range(5)) |
| }, |
| columns=['name', 'id', 'height']) |
| |
| expected.name = expected.name.astype(pd.StringDtype()) |
| |
| with TestPipeline() as p: |
| res = ( |
| p |
| | beam.Create( |
| [Simple(name=str(i), id=i, height=float(i)) for i in range(5)]) |
| | schemas.BatchRowsAsDataFrame(min_batch_size=10, max_batch_size=10)) |
| assert_that(res, matches_df(expected)) |
| |
| def test_simple_df_with_beam_row(self): |
| expected = pd.DataFrame({ |
| 'name': list(str(i) for i in range(5)), |
| 'id': list(range(5)), |
| 'height': list(float(i) for i in range(5)) |
| }, |
| columns=['name', 'id', 'height']) |
| expected.name = expected.name.astype(pd.StringDtype()) |
| |
| with TestPipeline() as p: |
| res = ( |
| p |
| | beam.Create([(str(i), i, float(i)) for i in range(5)]) |
| | beam.Select( |
| name=lambda r: str(r[0]), |
| id=lambda r: int(r[1]), |
| height=lambda r: float(r[2])) |
| | schemas.BatchRowsAsDataFrame(min_batch_size=10, max_batch_size=10)) |
| assert_that(res, matches_df(expected)) |
| |
| def test_generate_proxy(self): |
| expected = pd.DataFrame({ |
| 'animal': pd.Series(dtype=pd.StringDtype()), |
| 'max_speed': pd.Series(dtype=np.float64) |
| }) |
| |
| pd.testing.assert_frame_equal(schemas.generate_proxy(Animal), expected) |
| |
| def test_generate_proxy_beam_typehint(self): |
| expected = pd.Series(dtype=pd.Int32Dtype()) |
| |
| actual = schemas.generate_proxy(typehints.Optional[np.int32]) |
| |
| pd.testing.assert_series_equal(actual, expected) |
| |
| def test_nice_types_proxy_roundtrip(self): |
| roundtripped = schemas.generate_proxy( |
| schemas.element_type_from_dataframe(NICE_TYPES_PROXY)) |
| self.assertTrue(roundtripped.equals(NICE_TYPES_PROXY)) |
| |
| @unittest.skipIf( |
| PD_VERSION == (1, 2, 1), |
| "Can't roundtrip bytes in pandas 1.2.1" |
| "https://github.com/pandas-dev/pandas/issues/39474") |
| def test_bytes_proxy_roundtrip(self): |
| proxy = pd.DataFrame({'bytes': []}) |
| proxy.bytes = proxy.bytes.astype(bytes) |
| |
| roundtripped = schemas.generate_proxy( |
| schemas.element_type_from_dataframe(proxy)) |
| |
| self.assertEqual(roundtripped.bytes.dtype.kind, 'S') |
| |
| def test_batch_with_df_transform(self): |
| with TestPipeline() as p: |
| res = ( |
| p |
| | beam.Create([ |
| Animal('Falcon', 380.0), |
| Animal('Falcon', 370.0), |
| Animal('Parrot', 24.0), |
| Animal('Parrot', 26.0) |
| ]) |
| | schemas.BatchRowsAsDataFrame() |
| | transforms.DataframeTransform( |
| lambda df: df.groupby('animal').mean(), |
| # TODO: Generate proxy in this case as well |
| proxy=schemas.generate_proxy(Animal), |
| include_indexes=True)) |
| assert_that(res, equal_to([('Falcon', 375.), ('Parrot', 25.)])) |
| |
| # Do the same thing, but use reset_index() to make sure 'animal' is included |
| with TestPipeline() as p: |
| with beam.dataframe.allow_non_parallel_operations(): |
| res = ( |
| p |
| | beam.Create([ |
| Animal('Falcon', 380.0), |
| Animal('Falcon', 370.0), |
| Animal('Parrot', 24.0), |
| Animal('Parrot', 26.0) |
| ]) |
| | schemas.BatchRowsAsDataFrame() |
| | transforms.DataframeTransform( |
| lambda df: df.groupby('animal').mean().reset_index(), |
| # TODO: Generate proxy in this case as well |
| proxy=schemas.generate_proxy(Animal))) |
| assert_that(res, equal_to([('Falcon', 375.), ('Parrot', 25.)])) |
| |
| def assert_typehints_equal(self, left, right): |
| def maybe_drop_rowtypeconstraint(typehint): |
| if isinstance(typehint, row_type.RowTypeConstraint): |
| return typehint.user_type |
| else: |
| return typehint |
| |
| left = maybe_drop_rowtypeconstraint(typehints.normalize(left)) |
| right = maybe_drop_rowtypeconstraint(typehints.normalize(right)) |
| |
| if match_is_named_tuple(left): |
| self.assertTrue(match_is_named_tuple(right)) |
| self.assertEqual(left.__annotations__, right.__annotations__) |
| else: |
| self.assertEqual(left, right) |
| |
| @parameterized.expand( |
| SERIES_TESTS + NOINDEX_DF_TESTS, name_func=test_name_func) |
| def test_unbatch_no_index(self, df_or_series, rows, beam_type): |
| proxy = df_or_series[:0] |
| |
| with TestPipeline() as p: |
| res = ( |
| p | beam.Create([df_or_series[::2], df_or_series[1::2]]) |
| | schemas.UnbatchPandas(proxy)) |
| |
| # Verify that the unbatched PCollection has the expected typehint |
| # TODO(https://github.com/apache/beam/issues/19923): typehints should |
| # support NamedTuple so we can use typehints.is_consistent_with here |
| # instead |
| self.assert_typehints_equal(res.element_type, beam_type) |
| |
| assert_that(res, equal_to(rows)) |
| |
| @parameterized.expand(SERIES_TESTS + INDEX_DF_TESTS, name_func=test_name_func) |
| def test_unbatch_with_index(self, df_or_series, rows, _): |
| proxy = df_or_series[:0] |
| |
| if (PD_VERSION < (1, 2) and |
| set(['i32_nullable', 'i64_nullable']).intersection(proxy.index.names)): |
| self.skipTest( |
| "pandas<1.2 incorrectly changes Int64Dtype to int64 when " |
| "moved to index.") |
| |
| with TestPipeline() as p: |
| res = ( |
| p | beam.Create([df_or_series[::2], df_or_series[1::2]]) |
| | schemas.UnbatchPandas(proxy, include_indexes=True)) |
| |
| assert_that(res, equal_to(rows)) |
| |
| @parameterized.expand(SERIES_TESTS, name_func=test_name_func) |
| def test_unbatch_series_with_index_warns( |
| self, series, unused_rows, unused_type): |
| proxy = series[:0] |
| |
| with TestPipeline() as p: |
| input_pc = p | beam.Create([series[::2], series[1::2]]) |
| with self.assertWarns(UserWarning): |
| _ = input_pc | schemas.UnbatchPandas(proxy, include_indexes=True) |
| |
| def test_unbatch_include_index_unnamed_index_raises(self): |
| df = pd.DataFrame({'foo': [1, 2, 3, 4]}) |
| proxy = df[:0] |
| |
| with TestPipeline() as p: |
| pc = p | beam.Create([df[::2], df[1::2]]) |
| |
| with self.assertRaisesRegex(ValueError, 'unnamed'): |
| _ = pc | schemas.UnbatchPandas(proxy, include_indexes=True) |
| |
| def test_unbatch_include_index_nonunique_index_raises(self): |
| df = pd.DataFrame({'foo': [1, 2, 3, 4]}) |
| df.index = pd.MultiIndex.from_arrays([[1, 2, 3, 4], [4, 3, 2, 1]], |
| names=['bar', 'bar']) |
| proxy = df[:0] |
| |
| with TestPipeline() as p: |
| pc = p | beam.Create([df[::2], df[1::2]]) |
| |
| with self.assertRaisesRegex(ValueError, 'bar'): |
| _ = pc | schemas.UnbatchPandas(proxy, include_indexes=True) |
| |
| def test_unbatch_include_index_column_conflict_raises(self): |
| df = pd.DataFrame({'foo': [1, 2, 3, 4]}) |
| df.index = pd.Index([4, 3, 2, 1], name='foo') |
| proxy = df[:0] |
| |
| with TestPipeline() as p: |
| pc = p | beam.Create([df[::2], df[1::2]]) |
| |
| with self.assertRaisesRegex(ValueError, 'foo'): |
| _ = pc | schemas.UnbatchPandas(proxy, include_indexes=True) |
| |
| def test_unbatch_datetime(self): |
| |
| s = pd.Series( |
| pd.date_range( |
| '1/1/2000', periods=100, freq='m', tz='America/Los_Angeles')) |
| proxy = s[:0] |
| |
| with TestPipeline() as p: |
| res = ( |
| p | beam.Create([s[::2], s[1::2]]) |
| | schemas.UnbatchPandas(proxy, include_indexes=True)) |
| |
| assert_that(res, equal_to(list(s))) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |