blob: 9e9401b08758268d4db696859251c6dda8f4d1eb [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
import unittest
import pandas as pd
from past.builtins import unicode
import apache_beam as beam
from apache_beam import coders
from apache_beam.dataframe import convert
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import transforms
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
def check_correct(expected, actual):
if actual is None:
raise AssertionError('Empty frame but expected: \n\n%s' % (expected))
if isinstance(expected, pd.core.generic.NDFrame):
expected = expected.sort_index()
actual = actual.sort_index()
if isinstance(expected, pd.Series):
pd.testing.assert_series_equal(expected, actual)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_frame_equal(expected, actual)
else:
raise ValueError(
f"Expected value is a {type(expected)},"
"not a Series or DataFrame.")
else:
if actual != expected:
raise AssertionError('Scalars not equal: %s != %s' % (actual, expected))
def concat(parts):
if len(parts) > 1:
return pd.concat(parts)
elif len(parts) == 1:
return parts[0]
else:
return None
def df_equal_to(expected):
return lambda actual: check_correct(expected, concat(actual))
AnimalSpeed = typing.NamedTuple(
'AnimalSpeed', [('Animal', unicode), ('Speed', int)])
coders.registry.register_coder(AnimalSpeed, coders.RowCoder)
Nested = typing.NamedTuple(
'Nested', [('id', int), ('animal_speed', AnimalSpeed)])
coders.registry.register_coder(Nested, coders.RowCoder)
class TransformTest(unittest.TestCase):
def run_scenario(self, input, func):
expected = func(input)
empty = input.iloc[0:0]
input_placeholder = expressions.PlaceholderExpression(empty)
input_deferred = frame_base.DeferredFrame.wrap(input_placeholder)
actual_deferred = func(input_deferred)._expr.evaluate_at(
expressions.Session({input_placeholder: input}))
check_correct(expected, actual_deferred)
with beam.Pipeline() as p:
input_pcoll = p | beam.Create([input.iloc[::2], input.iloc[1::2]])
input_df = convert.to_dataframe(input_pcoll, proxy=empty)
output_df = func(input_df)
output_proxy = output_df._expr.proxy()
if isinstance(output_proxy, pd.core.generic.NDFrame):
self.assertTrue(
output_proxy.iloc[:0].equals(expected.iloc[:0]),
(
'Output proxy is incorrect:\n'
f'Expected:\n{expected.iloc[:0]}\n\n'
f'Actual:\n{output_proxy.iloc[:0]}'))
else:
self.assertEqual(type(output_proxy), type(expected))
output_pcoll = convert.to_pcollection(output_df, yield_elements='pandas')
assert_that(
output_pcoll, lambda actual: check_correct(expected, concat(actual)))
def test_identity(self):
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self.run_scenario(df, lambda x: x)
def test_groupby_sum_mean(self):
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self.run_scenario(df, lambda df: df.groupby('Animal').sum())
with expressions.allow_non_parallel_operations():
self.run_scenario(df, lambda df: df.groupby('Animal').mean())
self.run_scenario(
df, lambda df: df.loc[df.Speed > 25].groupby('Animal').sum())
def test_groupby_apply(self):
df = pd.DataFrame({
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
'foo': [None if i % 11 == 0 else i for i in range(100)],
'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],
'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],
})
def median_sum_fn(x):
return (x.foo + x.bar).median()
describe = lambda df: df.describe()
self.run_scenario(df, lambda df: df.groupby('group').foo.apply(describe))
self.run_scenario(
df, lambda df: df.groupby('group')[['foo', 'bar']].apply(describe))
self.run_scenario(df, lambda df: df.groupby('group').apply(median_sum_fn))
self.run_scenario(
df,
lambda df: df.set_index('group').foo.groupby(level=0).apply(describe))
self.run_scenario(df, lambda df: df.groupby(level=0).apply(median_sum_fn))
self.run_scenario(
df, lambda df: df.groupby(lambda x: x % 3).apply(describe))
def test_filter(self):
df = pd.DataFrame({
'Animal': ['Aardvark', 'Ant', 'Elephant', 'Zebra'],
'Speed': [5, 2, 35, 40]
})
self.run_scenario(df, lambda df: df.filter(items=['Animal']))
self.run_scenario(df, lambda df: df.filter(regex='Anim.*'))
self.run_scenario(
df, lambda df: df.set_index('Animal').filter(regex='F.*', axis='index'))
with expressions.allow_non_parallel_operations():
a = pd.DataFrame({'col': [1, 2, 3]})
self.run_scenario(a, lambda a: a.agg(sum))
self.run_scenario(a, lambda a: a.agg(['mean', 'min', 'max']))
def test_scalar(self):
with expressions.allow_non_parallel_operations():
a = pd.Series([1, 2, 6])
self.run_scenario(a, lambda a: a.agg(sum))
self.run_scenario(a, lambda a: a / a.agg(sum))
# Tests scalar being used as an input to a downstream stage.
df = pd.DataFrame({'key': ['a', 'a', 'b'], 'val': [1, 2, 6]})
self.run_scenario(
df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))
def test_getitem_projection(self):
df = pd.DataFrame({
'Animal': ['Aardvark', 'Ant', 'Elephant', 'Zebra'],
'Speed': [5, 2, 35, 40],
'Size': ['Small', 'Extra Small', 'Large', 'Medium']
})
self.run_scenario(df, lambda df: df[['Speed', 'Size']])
def test_offset_elementwise(self):
s = pd.Series(range(10)).astype(float)
df = pd.DataFrame({'value': s, 'square': s * s, 'cube': s * s * s})
# Only those values that are both squares and cubes will intersect.
self.run_scenario(
df,
lambda df: df.set_index('square').value + df.set_index('cube').value)
def test_batching_named_tuple_input(self):
with beam.Pipeline() as p:
result = (
p | beam.Create([
AnimalSpeed('Aardvark', 5),
AnimalSpeed('Ant', 2),
AnimalSpeed('Elephant', 35),
AnimalSpeed('Zebra', 40)
]).with_output_types(AnimalSpeed)
| transforms.DataframeTransform(lambda df: df.filter(regex='Anim.*')))
assert_that(
result,
equal_to([('Aardvark', ), ('Ant', ), ('Elephant', ), ('Zebra', )]))
def test_batching_beam_row_input(self):
with beam.Pipeline() as p:
result = (
p
| beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (u'Parrot', 24.),
(u'Parrot', 26.)])
| beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1]))
| transforms.DataframeTransform(
lambda df: df.groupby('Animal').mean(), include_indexes=True))
assert_that(result, equal_to([('Falcon', 375.), ('Parrot', 25.)]))
def test_batching_beam_row_to_dataframe(self):
with beam.Pipeline() as p:
df = convert.to_dataframe(
p
| beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (
u'Parrot', 24.), (u'Parrot', 26.)])
| beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1])))
result = convert.to_pcollection(
df.groupby('Animal').mean(), include_indexes=True)
assert_that(result, equal_to([('Falcon', 375.), ('Parrot', 25.)]))
def test_batching_passthrough_nested_schema(self):
with beam.Pipeline() as p:
nested_schema_pc = (
p | beam.Create([Nested(1, AnimalSpeed('Aardvark', 5))
]).with_output_types(Nested))
result = nested_schema_pc | transforms.DataframeTransform( # pylint: disable=expression-not-assigned
lambda df: df.filter(items=['animal_speed']))
assert_that(result, equal_to([(('Aardvark', 5), )]))
def test_batching_passthrough_nested_array(self):
Array = typing.NamedTuple(
'Array', [('id', int), ('business_numbers', typing.Sequence[int])])
coders.registry.register_coder(Array, coders.RowCoder)
with beam.Pipeline() as p:
array_schema_pc = (p | beam.Create([Array(1, [7, 8, 9])]))
result = array_schema_pc | transforms.DataframeTransform( # pylint: disable=expression-not-assigned
lambda df: df.filter(items=['business_numbers']))
assert_that(result, equal_to([([7, 8, 9], )]))
def test_unbatching_series(self):
with beam.Pipeline() as p:
result = (
p
| beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (u'Parrot', 24.),
(u'Parrot', 26.)])
| beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1]))
| transforms.DataframeTransform(lambda df: df.Animal))
assert_that(result, equal_to(['Falcon', 'Falcon', 'Parrot', 'Parrot']))
def test_input_output_polymorphism(self):
one_series = pd.Series([1])
two_series = pd.Series([2])
three_series = pd.Series([3])
proxy = one_series[:0]
def equal_to_series(expected):
def check(actual):
actual = pd.concat(actual)
if not expected.equals(actual):
raise AssertionError(
'Series not equal: \n%s\n%s\n' % (expected, actual))
return check
with beam.Pipeline() as p:
one = p | 'One' >> beam.Create([one_series])
two = p | 'Two' >> beam.Create([two_series])
assert_that(
one | 'PcollInPcollOut' >> transforms.DataframeTransform(
lambda x: 3 * x, proxy=proxy, yield_elements='pandas'),
equal_to_series(three_series),
label='CheckPcollInPcollOut')
assert_that(
(one, two)
| 'TupleIn' >> transforms.DataframeTransform(
lambda x, y: (x + y), (proxy, proxy), yield_elements='pandas'),
equal_to_series(three_series),
label='CheckTupleIn')
assert_that(
dict(x=one, y=two)
| 'DictIn' >> transforms.DataframeTransform(
lambda x,
y: (x + y),
proxy=dict(x=proxy, y=proxy),
yield_elements='pandas'),
equal_to_series(three_series),
label='CheckDictIn')
double, triple = one | 'TupleOut' >> transforms.DataframeTransform(
lambda x: (2*x, 3*x), proxy, yield_elements='pandas')
assert_that(double, equal_to_series(two_series), 'CheckTupleOut0')
assert_that(triple, equal_to_series(three_series), 'CheckTupleOut1')
res = one | 'DictOut' >> transforms.DataframeTransform(
lambda x: {'res': 3 * x}, proxy, yield_elements='pandas')
assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut')
def test_cat(self):
# verify that cat works with a List[Series] since this is
# missing from doctests
df = pd.DataFrame({
'one': ['A', 'B', 'C'],
'two': ['BB', 'CC', 'A'],
'three': ['CCC', 'AA', 'B'],
})
self.run_scenario(df, lambda df: df.two.str.cat([df.three], join='outer'))
self.run_scenario(
df, lambda df: df.one.str.cat([df.two, df.three], join='outer'))
def test_repeat(self):
# verify that repeat works with a Series since this is
# missing from doctests
df = pd.DataFrame({
'strings': ['A', 'B', 'C', 'D', 'E'],
'repeats': [3, 1, 4, 5, 2],
})
self.run_scenario(df, lambda df: df.strings.str.repeat(df.repeats))
def test_rename(self):
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
self.run_scenario(
df, lambda df: df.rename(columns={'B': 'C'}, index={
0: 2, 2: 0
}))
with expressions.allow_non_parallel_operations():
self.run_scenario(
df,
lambda df: df.rename(
columns={'B': 'C'}, index={
0: 2, 2: 0
}, errors='raise'))
class TransformPartsTest(unittest.TestCase):
def test_rebatch(self):
with beam.Pipeline() as p:
sA = pd.Series(range(1000))
sB = sA * sA
pcA = p | 'CreatePCollA' >> beam.Create([('k0', sA[::3]),
('k1', sA[1::3]),
('k2', sA[2::3])])
pcB = p | 'CreatePCollB' >> beam.Create([('k0', sB[::3]),
('k1', sB[1::3]),
('k2', sB[2::3])])
input = {'A': pcA, 'B': pcB} | beam.CoGroupByKey()
output = input | beam.ParDo(
transforms._ReBatch(target_size=sA.memory_usage()))
# There should be exactly two elements, as the target size will be
# hit when 2/3 of pcA and 2/3 of pcB is seen, but not before.
assert_that(output | beam.combiners.Count.Globally(), equal_to([2]))
# Sanity check that we got all the right values.
assert_that(
output | beam.Map(lambda x: x['A'].sum())
| 'SumA' >> beam.CombineGlobally(sum),
equal_to([sA.sum()]),
label='CheckValuesA')
assert_that(
output | beam.Map(lambda x: x['B'].sum())
| 'SumB' >> beam.CombineGlobally(sum),
equal_to([sB.sum()]),
label='CheckValuesB')
if __name__ == '__main__':
unittest.main()