blob: a2703d80105161dee02487a543d1e35cd783c660 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import pandas as pd
from parameterized import parameterized
import apache_beam as beam
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import frames
# Get major, minor version
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
GROUPBY_DF = pd.DataFrame({
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
'foo': [None if i % 11 == 0 else i for i in range(100)],
'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],
'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],
'bool': [i % 17 == 0 for i in range(100)],
'str': [str(i) for i in range(100)],
})
def _get_deferred_args(*args):
return [
frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(arg, arg[0:0])) for arg in args
]
class _AbstractFrameTest(unittest.TestCase):
"""Test sub-class with utilities for verifying DataFrame operations."""
def _run_error_test(
self, func, *args, construction_time=True, distributed=True):
"""Verify that func(*args) raises the same exception in pandas and in Beam.
Note that by default this only checks for exceptions that the Beam DataFrame
API raises during expression generation (i.e. construction time).
Exceptions raised while the pipeline is executing are less helpful, but
are sometimes unavoidable (e.g. data validation exceptions), to check for
these exceptions use construction_time=False."""
deferred_args = _get_deferred_args(*args)
# Get expected error
try:
expected = func(*args)
except Exception as e:
expected_error = e
else:
raise AssertionError(
"Expected an error, but executing with pandas successfully "
f"returned:\n{expected}")
# Get actual error
if construction_time:
try:
_ = func(*deferred_args)._expr
except Exception as e:
actual = e
else:
raise AssertionError(
f"Expected an error:\n{expected_error}\nbut Beam successfully "
f"generated an expression.")
else: # not construction_time
# Check for an error raised during pipeline execution
expr = func(*deferred_args)._expr
session_type = (
expressions.PartitioningSession
if distributed else expressions.Session)
try:
result = session_type({}).evaluate(expr)
except Exception as e:
actual = e
else:
raise AssertionError(
f"Expected an error:\n{expected_error}\nbut Beam successfully "
f"Computed the result:\n{result}.")
# Verify
if (not isinstance(actual, type(expected_error)) or
str(expected_error) not in str(actual)):
raise AssertionError(
f'Expected {expected_error!r} to be raised, but got {actual!r}'
) from actual
def _run_inplace_test(self, func, arg, **kwargs):
"""Verify an inplace operation performed by func.
Checks that func performs the same inplace operation on arg, in pandas and
in Beam."""
def wrapper(df):
df = df.copy()
func(df)
return df
self._run_test(wrapper, arg, **kwargs)
def _run_test(
self, func, *args, distributed=True, nonparallel=False, check_proxy=True):
"""Verify that func(*args) produces the same result in pandas and in Beam.
Args:
distributed (bool): Whether or not to use PartitioningSession to
simulate parallel execution.
nonparallel (bool): Whether or not this function contains a
non-parallelizable operation. If True, the expression will be
generated twice, once outside of an allow_non_parallel_operations
block (to verify NonParallelOperation is raised), and again inside
of an allow_non_parallel_operations block to actually generate an
expression to verify.
check_proxy (bool): Whether or not to check that the proxy of the
generated expression matches the actual result, defaults to True.
This option should NOT be set to False in tests added for new
operations if at all possible. Instead make sure the new operation
produces the correct proxy. This flag only exists as an escape hatch
until existing failures can be addressed (BEAM-12379)."""
# Compute expected value
expected = func(*args)
# Compute actual value
deferred_args = _get_deferred_args(*args)
if nonparallel:
# First run outside a nonparallel block to confirm this raises as expected
with self.assertRaises(expressions.NonParallelOperation) as raised:
func(*deferred_args)
if raised.exception.msg.startswith(
"Encountered non-parallelizable form of"):
raise AssertionError(
"Default NonParallelOperation raised, please specify a reason in "
"the Singleton() partitioning requirement for this operation."
) from raised.exception
# Re-run in an allow non parallel block to get an expression to verify
with beam.dataframe.allow_non_parallel_operations():
expr = func(*deferred_args)._expr
else:
expr = func(*deferred_args)._expr
# Compute the result of the generated expression
session_type = (
expressions.PartitioningSession if distributed else expressions.Session)
actual = session_type({}).evaluate(expr)
# Verify
if isinstance(expected, pd.core.generic.NDFrame):
if distributed:
if expected.index.is_unique:
expected = expected.sort_index()
actual = actual.sort_index()
else:
expected = expected.sort_values(list(expected.columns))
actual = actual.sort_values(list(actual.columns))
if isinstance(expected, pd.Series):
pd.testing.assert_series_equal(expected, actual)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_frame_equal(expected, actual)
else:
raise ValueError(
f"Expected value is a {type(expected)},"
"not a Series or DataFrame.")
else:
# Expectation is not a pandas object
if isinstance(expected, float):
if np.isnan(expected):
cmp = np.isnan
else:
cmp = lambda x: np.isclose(expected, x)
else:
cmp = expected.__eq__
self.assertTrue(
cmp(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
if check_proxy:
# Verify that the actual result agrees with the proxy
proxy = expr.proxy()
if type(actual) in (np.float32, np.float64):
self.assertTrue(type(actual) == type(proxy) or np.isnan(proxy))
else:
self.assertEqual(type(actual), type(proxy))
if isinstance(expected, pd.core.generic.NDFrame):
if isinstance(expected, pd.Series):
self.assertEqual(actual.dtype, proxy.dtype)
self.assertEqual(actual.name, proxy.name)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_series_equal(actual.dtypes, proxy.dtypes)
else:
raise ValueError(
f"Expected value is a {type(expected)},"
"not a Series or DataFrame.")
self.assertEqual(actual.index.names, proxy.index.names)
for i in range(actual.index.nlevels):
self.assertEqual(
actual.index.get_level_values(i).dtype,
proxy.index.get_level_values(i).dtype)
class DeferredFrameTest(_AbstractFrameTest):
"""Miscellaneous tessts for DataFrame operations."""
def test_series_arithmetic(self):
a = pd.Series([1, 2, 3])
b = pd.Series([100, 200, 300])
self._run_test(lambda a, b: a - 2 * b, a, b)
self._run_test(lambda a, b: a.subtract(2).multiply(b).divide(a), a, b)
def test_dataframe_arithmetic(self):
df = pd.DataFrame({'a': [1, 2, 3], 'b': [100, 200, 300]})
df2 = pd.DataFrame({'a': [3000, 1000, 2000], 'b': [7, 11, 13]})
self._run_test(lambda df, df2: df - 2 * df2, df, df2)
self._run_test(
lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2)
@unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3")
def test_value_counts_dropna_false(self):
df = pd.DataFrame({
'first_name': ['John', 'Anne', 'John', 'Beth'],
'middle_name': ['Smith', pd.NA, pd.NA, 'Louise']
})
# TODO(BEAM-12495): Remove the assertRaises this when the underlying bug in
# https://github.com/pandas-dev/pandas/issues/36470 is fixed.
with self.assertRaises(NotImplementedError):
self._run_test(lambda df: df.value_counts(dropna=False), df)
def test_get_column(self):
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self._run_test(lambda df: df['Animal'], df)
self._run_test(lambda df: df.Speed, df)
self._run_test(lambda df: df.get('Animal'), df)
self._run_test(lambda df: df.get('FOO', df.Animal), df)
def test_series_xs(self):
# pandas doctests only verify DataFrame.xs, here we verify Series.xs as well
d = {
'num_legs': [4, 4, 2, 2],
'num_wings': [0, 0, 2, 2],
'class': ['mammal', 'mammal', 'mammal', 'bird'],
'animal': ['cat', 'dog', 'bat', 'penguin'],
'locomotion': ['walks', 'walks', 'flies', 'walks']
}
df = pd.DataFrame(data=d)
df = df.set_index(['class', 'animal', 'locomotion'])
self._run_test(lambda df: df.num_legs.xs('mammal'), df)
self._run_test(lambda df: df.num_legs.xs(('mammal', 'dog')), df)
self._run_test(lambda df: df.num_legs.xs('cat', level=1), df)
self._run_test(
lambda df: df.num_legs.xs(('bird', 'walks'), level=[0, 'locomotion']),
df)
def test_set_column(self):
def new_column(df):
df['NewCol'] = df['Speed']
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self._run_inplace_test(new_column, df)
def test_set_column_from_index(self):
def new_column(df):
df['NewCol'] = df.index
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self._run_inplace_test(new_column, df)
def test_tz_localize_ambiguous_series(self):
# This replicates a tz_localize doctest:
# s.tz_localize('CET', ambiguous=np.array([True, True, False]))
# But using a DeferredSeries instead of a np array
s = pd.Series(
range(3),
index=pd.DatetimeIndex([
'2018-10-28 01:20:00', '2018-10-28 02:36:00', '2018-10-28 03:46:00'
]))
ambiguous = pd.Series([True, True, False], index=s.index)
self._run_test(
lambda s,
ambiguous: s.tz_localize('CET', ambiguous=ambiguous),
s,
ambiguous)
def test_tz_convert(self):
# This replicates a tz_localize doctest:
# s.tz_localize('CET', ambiguous=np.array([True, True, False]))
# But using a DeferredSeries instead of a np array
s = pd.Series(
range(3),
index=pd.DatetimeIndex([
'2018-10-27 01:20:00', '2018-10-27 02:36:00', '2018-10-27 03:46:00'
],
tz='Europe/Berlin'))
self._run_test(lambda s: s.tz_convert('America/Los_Angeles'), s)
def test_sort_index_columns(self):
df = pd.DataFrame({
'c': range(10),
'a': range(10),
'b': range(10),
np.nan: range(10),
})
self._run_test(lambda df: df.sort_index(axis=1), df)
self._run_test(lambda df: df.sort_index(axis=1, ascending=False), df)
self._run_test(lambda df: df.sort_index(axis=1, na_position='first'), df)
def test_where_callable_args(self):
df = pd.DataFrame(
np.arange(10, dtype=np.int64).reshape(-1, 2), columns=['A', 'B'])
self._run_test(
lambda df: df.where(lambda df: df % 2 == 0, lambda df: df * 10), df)
def test_where_concrete_args(self):
df = pd.DataFrame(
np.arange(10, dtype=np.int64).reshape(-1, 2), columns=['A', 'B'])
self._run_test(
lambda df: df.where(
df % 2 == 0, pd.Series({
'A': 123, 'B': 456
}), axis=1),
df)
def test_combine_dataframe(self):
df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
self._run_test(
lambda df,
df2: df.combine(df2, take_smaller),
df,
df2,
nonparallel=True)
def test_combine_dataframe_fill(self):
df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
self._run_test(
lambda df1,
df2: df1.combine(df2, take_smaller, fill_value=-5),
df1,
df2,
nonparallel=True)
def test_combine_Series(self):
s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
self._run_test(
lambda s1,
s2: s1.combine(s2, max),
s1,
s2,
nonparallel=True,
check_proxy=False)
def test_combine_first_dataframe(self):
df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
self._run_test(lambda df1, df2: df1.combine_first(df2), df1, df2)
def test_combine_first_series(self):
s1 = pd.Series([1, np.nan])
s2 = pd.Series([3, 4])
self._run_test(lambda s1, s2: s1.combine_first(s2), s1, s2)
def test_add_prefix(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [3, 4, 5, 6]})
s = pd.Series([1, 2, 3, 4])
self._run_test(lambda df: df.add_prefix('col_'), df)
self._run_test(lambda s: s.add_prefix('col_'), s)
def test_add_suffix(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [3, 4, 5, 6]})
s = pd.Series([1, 2, 3, 4])
self._run_test(lambda df: df.add_suffix('_col'), df)
self._run_test(lambda s: s.add_prefix('_col'), s)
def test_set_index(self):
df = pd.DataFrame({
# [19, 18, ..]
'index1': reversed(range(20)), # [15, 16, .., 0, 1, .., 13, 14]
'index2': np.roll(range(20), 5), # ['', 'a', 'bb', ...]
'values': [chr(ord('a') + i) * i for i in range(20)],
})
self._run_test(lambda df: df.set_index(['index1', 'index2']), df)
self._run_test(lambda df: df.set_index(['index1', 'index2'], drop=True), df)
self._run_test(lambda df: df.set_index('values'), df)
self._run_error_test(lambda df: df.set_index('bad'), df)
self._run_error_test(
lambda df: df.set_index(['index2', 'bad', 'really_bad']), df)
def test_series_drop_ignore_errors(self):
midx = pd.MultiIndex(
levels=[['lama', 'cow', 'falcon'], ['speed', 'weight', 'length']],
codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]])
s = pd.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)
# drop() requires singleton partitioning unless errors are ignored
# Add some additional tests here to make sure the implementation works in
# non-singleton partitioning.
self._run_test(lambda s: s.drop('lama', level=0, errors='ignore'), s)
self._run_test(lambda s: s.drop(('cow', 'speed'), errors='ignore'), s)
self._run_test(lambda s: s.drop('falcon', level=0, errors='ignore'), s)
def test_dataframe_drop_ignore_errors(self):
midx = pd.MultiIndex(
levels=[['lama', 'cow', 'falcon'], ['speed', 'weight', 'length']],
codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]])
df = pd.DataFrame(
index=midx,
columns=['big', 'small'],
data=[[45, 30], [200, 100], [1.5, 1], [30, 20], [250, 150], [1.5, 0.8],
[320, 250], [1, 0.8], [0.3, 0.2]])
# drop() requires singleton partitioning unless errors are ignored
# Add some additional tests here to make sure the implementation works in
# non-singleton partitioning.
self._run_test(
lambda df: df.drop(index='lama', level=0, errors='ignore'), df)
self._run_test(
lambda df: df.drop(index=('cow', 'speed'), errors='ignore'), df)
self._run_test(
lambda df: df.drop(index='falcon', level=0, errors='ignore'), df)
self._run_test(
lambda df: df.drop(index='cow', columns='small', errors='ignore'), df)
def test_merge(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
df1 = pd.DataFrame({
'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
})
df2 = pd.DataFrame({
'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
})
self._run_test(
lambda df1,
df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
self._run_test(
lambda df1,
df2: df1.merge(
df2, left_on='lkey', right_on='rkey', suffixes=('_left', '_right')).
rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_left_join(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
self._run_test(
lambda df1,
df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_on_index(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
df1 = pd.DataFrame({
'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
}).set_index('lkey')
df2 = pd.DataFrame({
'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
}).set_index('rkey')
self._run_test(
lambda df1,
df2: df1.merge(df2, left_index=True, right_index=True),
df1,
df2,
check_proxy=False)
def test_merge_same_key(self):
df1 = pd.DataFrame({
'key': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
})
df2 = pd.DataFrame({
'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
})
self._run_test(
lambda df1,
df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
self._run_test(
lambda df1,
df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_same_key_doctest(self):
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
self._run_test(
lambda df1,
df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
# Test without specifying 'on'
self._run_test(
lambda df1,
df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_same_key_suffix_collision(self):
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})
self._run_test(
lambda df1,
df2: df1.merge(
df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
# Test without specifying 'on'
self._run_test(
lambda df1,
df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).
rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_value_counts_with_nans(self):
# similar to doctests that verify value_counts, but include nan values to
# make sure we handle them correctly.
df = pd.DataFrame({
'num_legs': [2, 4, 4, 6, np.nan, np.nan],
'num_wings': [2, 0, 0, 0, np.nan, 2]
},
index=['falcon', 'dog', 'cat', 'ant', 'car', 'plane'])
self._run_test(lambda df: df.value_counts(), df)
self._run_test(lambda df: df.value_counts(normalize=True), df)
if PD_VERSION >= (1, 3):
# dropna=False is new in pandas 1.3
# TODO(BEAM-12495): Remove the assertRaises this when the underlying bug
# in https://github.com/pandas-dev/pandas/issues/36470 is fixed.
with self.assertRaises(NotImplementedError):
self._run_test(lambda df: df.value_counts(dropna=False), df)
# Test the defaults.
self._run_test(lambda df: df.num_wings.value_counts(), df)
self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df)
self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df)
# Test the combination interactions.
for normalize in (True, False):
for dropna in (True, False):
self._run_test(
lambda df,
dropna=dropna,
normalize=normalize: df.num_wings.value_counts(
dropna=dropna, normalize=normalize),
df)
def test_value_counts_does_not_support_sort(self):
df = pd.DataFrame({
'num_legs': [2, 4, 4, 6, np.nan, np.nan],
'num_wings': [2, 0, 0, 0, np.nan, 2]
},
index=['falcon', 'dog', 'cat', 'ant', 'car', 'plane'])
with self.assertRaisesRegex(frame_base.WontImplementError,
r"value_counts\(sort\=True\)"):
self._run_test(lambda df: df.value_counts(sort=True), df)
with self.assertRaisesRegex(frame_base.WontImplementError,
r"value_counts\(sort\=True\)"):
self._run_test(lambda df: df.num_wings.value_counts(sort=True), df)
def test_series_getitem(self):
s = pd.Series([x**2 for x in range(10)])
self._run_test(lambda s: s[...], s)
self._run_test(lambda s: s[:], s)
self._run_test(lambda s: s[s < 10], s)
self._run_test(lambda s: s[lambda s: s < 10], s)
s.index = s.index.map(float)
self._run_test(lambda s: s[1.5:6], s)
@parameterized.expand([
(pd.Series(range(10)), ), # unique
(pd.Series(list(range(100)) + [0]), ), # non-unique int
(pd.Series(list(range(100)) + [0]) / 100, ), # non-unique flt
(pd.Series(['a', 'b', 'c', 'd']), ), # unique str
(pd.Series(['a', 'b', 'a', 'c', 'd']), ), # non-unique str
])
def test_series_is_unique(self, series):
self._run_test(lambda s: s.is_unique, series)
def test_dataframe_getitem(self):
df = pd.DataFrame({'A': [x**2 for x in range(6)], 'B': list('abcdef')})
self._run_test(lambda df: df['A'], df)
self._run_test(lambda df: df[['A', 'B']], df)
self._run_test(lambda df: df[:], df)
self._run_test(lambda df: df[df.A < 10], df)
df.index = df.index.map(float)
self._run_test(lambda df: df[1.5:4], df)
def test_loc(self):
dates = pd.date_range('1/1/2000', periods=8)
# TODO(BEAM-11757): We do not preserve the freq attribute on a DateTime
# index
dates.freq = None
df = pd.DataFrame(
np.arange(32).reshape((8, 4)),
index=dates,
columns=['A', 'B', 'C', 'D'])
self._run_test(lambda df: df.loc[:], df)
self._run_test(lambda df: df.loc[:, 'A'], df)
self._run_test(lambda df: df.loc[:dates[3]], df)
self._run_test(lambda df: df.loc[df.A > 10], df)
self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)
self._run_test(lambda df: df.C.loc[df.A > 10], df)
self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates))
def test_append_sort(self):
# yapf: disable
df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']},
columns=['int', 'str'],
index=[1, 3, 5])
df2 = pd.DataFrame({'int': [4, 5, 6], 'str': ['d', 'e', 'f']},
columns=['str', 'int'],
index=[2, 4, 6])
# yapf: enable
self._run_test(lambda df1, df2: df1.append(df2, sort=True), df1, df2)
self._run_test(lambda df1, df2: df1.append(df2, sort=False), df1, df2)
self._run_test(lambda df1, df2: df2.append(df1, sort=True), df1, df2)
self._run_test(lambda df1, df2: df2.append(df1, sort=False), df1, df2)
def test_smallest_largest(self):
df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.nlargest(1, 'A', keep='all'), df)
self._run_test(lambda df: df.nsmallest(3, 'A', keep='all'), df)
self._run_test(lambda df: df.nlargest(3, ['A', 'B'], keep='all'), df)
def test_series_cov_corr(self):
for s in [pd.Series([1, 2, 3]),
pd.Series(range(100)),
pd.Series([x**3 for x in range(-50, 50)])]:
self._run_test(lambda s: s.std(), s)
self._run_test(lambda s: s.var(), s)
self._run_test(lambda s: s.corr(s), s)
self._run_test(lambda s: s.corr(s + 1), s)
self._run_test(lambda s: s.corr(s * s), s)
self._run_test(lambda s: s.cov(s * s), s)
def test_dataframe_cov_corr(self):
df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
df.loc[df.index[:5], 'a'] = np.nan
df.loc[df.index[5:10], 'b'] = np.nan
self._run_test(lambda df: df.corr(), df)
self._run_test(lambda df: df.cov(), df)
self._run_test(lambda df: df.corr(min_periods=12), df)
self._run_test(lambda df: df.cov(min_periods=12), df)
self._run_test(lambda df: df.corrwith(df.a), df)
self._run_test(lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]), df)
df2 = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
self._run_test(
lambda df, df2: df.corrwith(df2, axis=1), df, df2, check_proxy=False)
def test_corrwith_bad_axis(self):
df = pd.DataFrame({'a': range(3), 'b': range(3, 6), 'c': range(6, 9)})
self._run_error_test(lambda df: df.corrwith(df.a, axis=2), df)
self._run_error_test(lambda df: df.corrwith(df, axis=5), df)
@unittest.skipIf(PD_VERSION < (1, 2), "na_action added in pandas 1.2.0")
def test_applymap_na_action(self):
# Replicates a doctest for na_action which is incompatible with
# doctest framework
df = pd.DataFrame([[pd.NA, 2.12], [3.356, 4.567]])
self._run_test(
lambda df: df.applymap(lambda x: len(str(x)), na_action='ignore'),
df,
# TODO: generate proxy using naive type inference on fn
check_proxy=False)
def test_dataframe_eval_query(self):
df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
self._run_test(lambda df: df.eval('foo = a + b - c'), df)
self._run_test(lambda df: df.query('a > b + c'), df)
self._run_inplace_test(lambda df: df.eval('foo = a + b - c'), df)
# Verify that attempting to access locals raises a useful error
deferred_df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(df, df[0:0]))
self.assertRaises(
NotImplementedError, lambda: deferred_df.eval('foo = a + @b - c'))
self.assertRaises(
NotImplementedError, lambda: deferred_df.query('a > @b + c'))
def test_index_name_assignment(self):
df = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df = df.set_index(['a', 'b'], drop=False)
def change_index_names(df):
df.index.names = ['A', None]
self._run_inplace_test(change_index_names, df)
def test_quantile(self):
df = pd.DataFrame(
np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])
self._run_test(
lambda df: df.quantile(0.1, axis='columns'), df, check_proxy=False)
self._run_test(
lambda df: df.quantile(0.1, axis='columns'), df, check_proxy=False)
with self.assertRaisesRegex(frame_base.WontImplementError,
r"df\.quantile\(q=0\.1, axis='columns'\)"):
self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
def test_dataframe_melt(self):
df = pd.DataFrame({
'A': {
0: 'a', 1: 'b', 2: 'c'
},
'B': {
0: 1, 1: 3, 2: 5
},
'C': {
0: 2, 1: 4, 2: 6
}
})
self._run_test(
lambda df: df.melt(id_vars=['A'], value_vars=['B'], ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=['A'], value_vars=['B', 'C'], ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=['A'],
value_vars=['B'],
var_name='myVarname',
value_name='myValname',
ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=['A'], value_vars=['B', 'C'], ignore_index=False),
df)
df.columns = [list('ABC'), list('DEF')]
self._run_test(
lambda df: df.melt(
col_level=0, id_vars=['A'], value_vars=['B'], ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),
df)
def test_fillna_columns(self):
df = pd.DataFrame(
[[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],
[np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],
columns=list('ABCD'))
self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)
self._run_test(
lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)
self._run_test(
lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)
# Intended behavior is unclear here. See
# https://github.com/pandas-dev/pandas/issues/40989
# self._run_test(lambda df: df.fillna(axis='columns', value=100,
# limit=2), df)
def test_dataframe_fillna_dataframe_as_value(self):
df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df2 = pd.DataFrame(np.zeros((4, 4)), columns=list("ABCE"))
self._run_test(lambda df, df2: df.fillna(df2), df, df2)
def test_dataframe_fillna_series_as_value(self):
df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
s = pd.Series(range(4), index=list("ABCE"))
self._run_test(lambda df, s: df.fillna(s), df, s)
def test_series_fillna_series_as_value(self):
df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df2 = pd.DataFrame(np.zeros((4, 4)), columns=list("ABCE"))
self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
self._run_error_test(
lambda s1,
s2: s1.append(s2, verify_integrity=True),
df1['A'],
df2['A'],
construction_time=False)
self._run_error_test(
lambda df1,
df2: df1.append(df2, verify_integrity=True),
df1,
df2,
construction_time=False)
def test_categorical_groupby(self):
df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})
df['B'] = df['B'].astype(pd.CategoricalDtype(list('cab')))
df = df.set_index('B')
# TODO(BEAM-11190): These aggregations can be done in index partitions, but
# it will require a little more complex logic
self._run_test(lambda df: df.groupby(level=0).sum(), df, nonparallel=True)
self._run_test(lambda df: df.groupby(level=0).mean(), df, nonparallel=True)
def test_dataframe_sum_nonnumeric_raises(self):
# Attempting a numeric aggregation with the str column present should
# raise, and suggest the numeric_only argument
with self.assertRaisesRegex(frame_base.WontImplementError, 'numeric_only'):
self._run_test(lambda df: df.sum(), GROUPBY_DF)
# numeric_only=True should work
self._run_test(lambda df: df.sum(numeric_only=True), GROUPBY_DF)
# projecting only numeric columns should too
self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
def test_insert(self):
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
self._run_inplace_test(lambda df: df.insert(1, 'C', df.A * 2), df)
self._run_inplace_test(
lambda df: df.insert(0, 'foo', pd.Series([8], index=[1])),
df,
check_proxy=False)
self._run_inplace_test(lambda df: df.insert(2, 'bar', value='q'), df)
def test_insert_does_not_support_list_value(self):
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
with self.assertRaisesRegex(frame_base.WontImplementError,
r"insert\(value=list\)"):
self._run_inplace_test(lambda df: df.insert(1, 'C', [7, 8, 9]), df)
def test_drop_duplicates(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
self._run_test(lambda df: df.drop_duplicates(keep=False), df)
self._run_test(
lambda df: df.drop_duplicates(subset=['brand'], keep=False), df)
self._run_test(
lambda df: df.drop_duplicates(subset=['brand', 'style'], keep=False),
df)
@parameterized.expand([
(
lambda base: base.from_dict({
'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']
}), ),
(
lambda base: base.from_dict({
'row_1': [3, 2, 1, 0], 'row_2': ['a', 'b', 'c', 'd']
},
orient='index'), ),
(
lambda base: base.from_records(
np.array([(3, 'a'), (2, 'b'), (1, 'c'), (0, 'd')],
dtype=[('col_1', 'i4'), ('col_2', 'U1')])), ),
])
def test_create_methods(self, func):
expected = func(pd.DataFrame)
deferred_df = func(frames.DeferredDataFrame)
actual = expressions.Session({}).evaluate(deferred_df._expr)
pd.testing.assert_frame_equal(actual, expected)
def test_replace(self):
# verify a replace() doctest case that doesn't quite work in Beam as it uses
# the default method='pad'
df = pd.DataFrame({'A': ['bat', 'foo', 'bait'], 'B': ['abc', 'bar', 'xyz']})
self._run_test(
lambda df: df.replace(
regex={
r'^ba.$': 'new', 'foo': 'xyz'
}, method=None),
df)
def test_sample_columns(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
self._run_test(lambda df: df.sample(axis=1, n=2, random_state=1), df)
self._run_error_test(lambda df: df.sample(axis=1, n=10, random_state=2), df)
self._run_test(
lambda df: df.sample(axis=1, n=10, random_state=3, replace=True), df)
def test_cat(self):
# Replicate the doctests from CategorigcalAccessor
# These tests don't translate into pandas_doctests_test.py because it
# tries to use astype("category") in Beam, which makes a non-deferred
# column type.
s = pd.Series(list("abbccc")).astype("category")
self._run_test(lambda s: s.cat.rename_categories(list("cba")), s)
self._run_test(lambda s: s.cat.reorder_categories(list("cba")), s)
self._run_test(lambda s: s.cat.add_categories(["d", "e"]), s)
self._run_test(lambda s: s.cat.remove_categories(["a", "c"]), s)
self._run_test(lambda s: s.cat.set_categories(list("abcde")), s)
self._run_test(lambda s: s.cat.as_ordered(), s)
self._run_test(lambda s: s.cat.as_unordered(), s)
self._run_test(lambda s: s.cat.codes, s)
@parameterized.expand(frames.ELEMENTWISE_DATETIME_PROPERTIES)
def test_dt_property(self, prop_name):
# Generate a series with a lot of unique timestamps
s = pd.Series(
pd.date_range('1/1/2000', periods=100, freq='m') +
pd.timedelta_range(start='0 days', end='70 days', periods=100))
self._run_test(lambda s: getattr(s.dt, prop_name), s)
@parameterized.expand([
('month_name', {}),
('day_name', {}),
('normalize', {}),
(
'strftime',
{
'date_format': '%B %d, %Y, %r'
},
),
('tz_convert', {
'tz': 'Europe/Berlin'
}),
])
def test_dt_method(self, op, kwargs):
# Generate a series with a lot of unique timestamps
s = pd.Series(
pd.date_range(
'1/1/2000', periods=100, freq='m', tz='America/Los_Angeles') +
pd.timedelta_range(start='0 days', end='70 days', periods=100))
self._run_test(lambda s: getattr(s.dt, op)(**kwargs), s)
def test_dt_tz_localize_ambiguous_series(self):
# This replicates a dt.tz_localize doctest:
# s.tz_localize('CET', ambiguous=np.array([True, True, False]))
# But using a DeferredSeries instead of a np array
s = pd.to_datetime(
pd.Series([
'2018-10-28 01:20:00', '2018-10-28 02:36:00', '2018-10-28 03:46:00'
]))
ambiguous = pd.Series([True, True, False], index=s.index)
self._run_test(
lambda s,
ambiguous: s.dt.tz_localize('CET', ambiguous=ambiguous),
s,
ambiguous)
def test_dt_tz_localize_nonexistent(self):
# This replicates dt.tz_localize doctests that exercise `nonexistent`.
# However they specify ambiguous='NaT' because the default,
# ambiguous='infer', is not supported.
s = pd.to_datetime(
pd.Series(['2015-03-29 02:30:00', '2015-03-29 03:30:00']))
self._run_test(
lambda s: s.dt.tz_localize(
'Europe/Warsaw', ambiguous='NaT', nonexistent='shift_forward'),
s)
self._run_test(
lambda s: s.dt.tz_localize(
'Europe/Warsaw', ambiguous='NaT', nonexistent='shift_backward'),
s)
self._run_test(
lambda s: s.dt.tz_localize(
'Europe/Warsaw', ambiguous='NaT', nonexistent=pd.Timedelta('1H')),
s)
# pandas doesn't support kurtosis on GroupBys:
# https://github.com/pandas-dev/pandas/issues/40139
ALL_GROUPING_AGGREGATIONS = sorted(
set(frames.ALL_AGGREGATIONS) - set(('kurt', 'kurtosis')))
class GroupByTest(_AbstractFrameTest):
"""Tests for DataFrame/Series GroupBy operations."""
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_agg(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"BEAM-12366: proxy generation of DataFrameGroupBy.describe "
"fails in pandas < 1.2")
self._run_test(
lambda df: df.groupby('group').agg(agg_type),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_with_filter(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"BEAM-12366: proxy generation of DataFrameGroupBy.describe "
"fails in pandas < 1.2")
self._run_test(
lambda df: getattr(df[df.foo > 30].groupby('group'), agg_type)(),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"BEAM-12366: proxy generation of DataFrameGroupBy.describe "
"fails in pandas < 1.2")
self._run_test(
lambda df: getattr(df.groupby('group'), agg_type)(),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_series(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"BEAM-12366: proxy generation of DataFrameGroupBy.describe "
"fails in pandas < 1.2")
self._run_test(
lambda df: getattr(df[df.foo > 40].groupby(df.group), agg_type)(),
GROUPBY_DF,
check_proxy=False)
def test_groupby_user_guide(self):
# Example from https://pandas.pydata.org/docs/user_guide/groupby.html
arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],
['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
df = pd.DataFrame({
'A': [1, 1, 1, 1, 2, 2, 3, 3], 'B': np.arange(8)
},
index=index)
self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_project_series(self, agg_type):
df = GROUPBY_DF
if agg_type == 'describe':
self.skipTest(
"BEAM-12366: proxy generation of SeriesGroupBy.describe "
"fails")
if agg_type in ('corr', 'cov'):
self.skipTest(
"BEAM-12367: SeriesGroupBy.{corr, cov} do not raise the "
"expected error.")
self._run_test(lambda df: getattr(df.groupby('group').foo, agg_type)(), df)
self._run_test(lambda df: getattr(df.groupby('group').bar, agg_type)(), df)
self._run_test(
lambda df: getattr(df.groupby('group')['foo'], agg_type)(), df)
self._run_test(
lambda df: getattr(df.groupby('group')['bar'], agg_type)(), df)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_project_dataframe(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"BEAM-12366: proxy generation of DataFrameGroupBy.describe "
"fails in pandas < 1.2")
self._run_test(
lambda df: getattr(df.groupby('group')[['bar', 'baz']], agg_type)(),
GROUPBY_DF,
check_proxy=False)
def test_groupby_errors_bad_projection(self):
df = GROUPBY_DF
# non-existent projection column
self._run_error_test(
lambda df: df.groupby('group')[['bar', 'baz']].bar.median(), df)
self._run_error_test(lambda df: df.groupby('group')[['bad']].median(), df)
self._run_error_test(lambda df: df.groupby('group').bad.median(), df)
self._run_error_test(
lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(), df)
self._run_error_test(lambda df: df.groupby('group')[['bat']].sum(), df)
self._run_error_test(lambda df: df.groupby('group').bat.sum(), df)
def test_groupby_errors_non_existent_label(self):
df = GROUPBY_DF
# non-existent grouping label
self._run_error_test(
lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(), df)
self._run_error_test(lambda df: df.groupby('bad').foo.sum(), df)
def test_groupby_callable(self):
df = GROUPBY_DF
self._run_test(lambda df: df.groupby(lambda x: x % 2).foo.sum(), df)
self._run_test(lambda df: df.groupby(lambda x: x % 5).median(), df)
def test_groupby_apply(self):
df = GROUPBY_DF
def median_sum_fn(x):
return (x.foo + x.bar).median()
# Note this is the same as DataFrameGroupBy.describe. Using it here is
# just a convenient way to test apply() with a user fn that returns a Series
describe = lambda df: df.describe()
self._run_test(lambda df: df.groupby('group').foo.apply(describe), df)
self._run_test(
lambda df: df.groupby('group')[['foo', 'bar']].apply(describe), df)
self._run_test(lambda df: df.groupby('group').apply(median_sum_fn), df)
self._run_test(
lambda df: df.set_index('group').foo.groupby(level=0).apply(describe),
df)
self._run_test(lambda df: df.groupby(level=0).apply(median_sum_fn), df)
self._run_test(lambda df: df.groupby(lambda x: x % 3).apply(describe), df)
self._run_test(
lambda df: df.bar.groupby(lambda x: x % 3).apply(describe), df)
self._run_test(
lambda df: df.set_index(['str', 'group', 'bool']).groupby(
level='group').apply(median_sum_fn),
df)
def test_groupby_apply_preserves_column_order(self):
df = GROUPBY_DF
self._run_test(
lambda df: df[['foo', 'group', 'bar']].groupby('group').apply(
lambda x: x),
df)
def test_groupby_transform(self):
df = pd.DataFrame({
"Date": [
"2015-05-08",
"2015-05-07",
"2015-05-06",
"2015-05-05",
"2015-05-08",
"2015-05-07",
"2015-05-06",
"2015-05-05"
],
"Data": [5, 8, 6, 1, 50, 100, 60, 120],
})
self._run_test(lambda df: df.groupby('Date')['Data'].transform(np.sum), df)
self._run_test(
lambda df: df.groupby('Date')['Data'].transform(
lambda x: (x - x.mean()) / x.std()),
df)
def test_groupby_apply_modified_index(self):
df = GROUPBY_DF
# If apply fn modifies the index then the output will include the grouped
# index
self._run_test(
lambda df: df.groupby('group').apply(
lambda x: x[x.foo > x.foo.median()]),
df)
@unittest.skip('BEAM-11710')
def test_groupby_aggregate_grouped_column(self):
df = pd.DataFrame({
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
'foo': [None if i % 11 == 0 else i for i in range(100)],
'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],
'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],
})
self._run_test(lambda df: df.groupby('group').group.count(), df)
self._run_test(lambda df: df.groupby('group')[['group', 'bar']].count(), df)
self._run_test(
lambda df: df.groupby('group')[['group', 'bar']].apply(
lambda x: x.describe()),
df)
@parameterized.expand((x, ) for x in [
0,
[1],
3,
[0, 3],
[2, 1],
['foo', 0],
[1, 'str'],
[3, 0, 2, 1],
])
def test_groupby_level_agg(self, level):
df = GROUPBY_DF.set_index(['group', 'foo', 'bar', 'str'], drop=False)
self._run_test(lambda df: df.groupby(level=level).bar.max(), df)
self._run_test(
lambda df: df.groupby(level=level).sum(numeric_only=True), df)
self._run_test(
lambda df: df.groupby(level=level).apply(
lambda x: (x.foo + x.bar).median()),
df)
@unittest.skipIf(PD_VERSION < (1, 1), "drop_na added in pandas 1.1.0")
def test_groupby_count_na(self):
# Verify we can do a groupby.count() that doesn't drop NaN values
self._run_test(
lambda df: df.groupby('foo', dropna=True).bar.count(), GROUPBY_DF)
self._run_test(
lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
def test_groupby_sum_min_count(self):
df = pd.DataFrame({
'good': [1, 2, 3, np.nan],
'bad': [np.nan, np.nan, np.nan, 4],
'group': ['a', 'b', 'a', 'b']
})
self._run_test(lambda df: df.groupby('group').sum(min_count=2), df)
def test_groupby_dtypes(self):
self._run_test(
lambda df: df.groupby('group').dtypes, GROUPBY_DF, check_proxy=False)
self._run_test(
lambda df: df.groupby(level=0).dtypes, GROUPBY_DF, check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_dataframe_groupby_series(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"BEAM-12366: proxy generation of DataFrameGroupBy.describe "
"fails in pandas < 1.2")
self._run_test(
lambda df: df[df.foo > 40].groupby(df.group).agg(agg_type),
GROUPBY_DF,
check_proxy=False)
self._run_test(
lambda df: df[df.foo > 40].groupby(df.foo % 3).agg(agg_type),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_series_groupby_series(self, agg_type):
if agg_type == 'describe':
self.skipTest(
"BEAM-12366: proxy generation of SeriesGroupBy.describe "
"fails")
if agg_type in ('corr', 'cov'):
self.skipTest(
"BEAM-12367: SeriesGroupBy.{corr, cov} do not raise the "
"expected error.")
self._run_test(
lambda df: df[df.foo < 40].bar.groupby(df.group).agg(agg_type),
GROUPBY_DF)
self._run_test(
lambda df: df[df.foo < 40].bar.groupby(df.foo % 3).agg(agg_type),
GROUPBY_DF)
def test_groupby_series_apply(self):
df = GROUPBY_DF
def median_sum_fn(x):
return (x.foo + x.bar).median()
# Note this is the same as DataFrameGroupBy.describe. Using it here is
# just a convenient way to test apply() with a user fn that returns a Series
describe = lambda df: df.describe()
self._run_test(lambda df: df.groupby(df.group).foo.apply(describe), df)
self._run_test(
lambda df: df.groupby(df.group)[['foo', 'bar']].apply(describe), df)
self._run_test(lambda df: df.groupby(df.group).apply(median_sum_fn), df)
def test_groupby_multiindex_keep_nans(self):
# Due to https://github.com/pandas-dev/pandas/issues/36470
# groupby(dropna=False) doesn't work with multiple columns
with self.assertRaisesRegex(NotImplementedError, "BEAM-12495"):
self._run_test(
lambda df: df.groupby(['foo', 'bar'], dropna=False).sum(), GROUPBY_DF)
class AggregationTest(_AbstractFrameTest):
"""Tests for global aggregation methods on DataFrame/Series."""
# corr, cov on Series require an other argument
@parameterized.expand(
sorted(set(frames.ALL_AGGREGATIONS) - set(['corr', 'cov'])))
def test_series_agg(self, agg_method):
s = pd.Series(list(range(16)))
nonparallel = agg_method in (
'quantile',
'mean',
'describe',
'median',
'sem',
'mad',
'skew',
'kurtosis',
'kurt')
# TODO(BEAM-12379): max and min produce the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda s: s.agg(agg_method),
s,
nonparallel=nonparallel,
check_proxy=check_proxy)
# corr, cov on Series require an other argument
# Series.size is a property
@parameterized.expand(
sorted(set(frames.ALL_AGGREGATIONS) - set(['corr', 'cov', 'size'])))
def test_series_agg_method(self, agg_method):
s = pd.Series(list(range(16)))
nonparallel = agg_method in (
'quantile',
'mean',
'describe',
'median',
'sem',
'mad',
'skew',
'kurtosis',
'kurt')
# TODO(BEAM-12379): max and min produce the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda s: getattr(s, agg_method)(),
s,
nonparallel=nonparallel,
check_proxy=check_proxy)
@parameterized.expand(frames.ALL_AGGREGATIONS)
def test_dataframe_agg(self, agg_method):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
nonparallel = agg_method in (
'quantile',
'mean',
'describe',
'median',
'sem',
'mad',
'skew',
'kurtosis',
'kurt')
# TODO(BEAM-12379): max and min produce the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda df: df.agg(agg_method),
df,
nonparallel=nonparallel,
check_proxy=check_proxy)
# DataFrame.size is a property
@parameterized.expand(sorted(set(frames.ALL_AGGREGATIONS) - set(['size'])))
def test_dataframe_agg_method(self, agg_method):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
nonparallel = agg_method in (
'quantile',
'mean',
'describe',
'median',
'sem',
'mad',
'skew',
'kurtosis',
'kurt')
# TODO(BEAM-12379): max and min produce the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda df: getattr(df, agg_method)(),
df,
nonparallel=nonparallel,
check_proxy=check_proxy)
def test_series_agg_modes(self):
s = pd.Series(list(range(16)))
self._run_test(lambda s: s.agg('sum'), s)
self._run_test(lambda s: s.agg(['sum']), s)
self._run_test(lambda s: s.agg(['sum', 'mean']), s, nonparallel=True)
self._run_test(lambda s: s.agg(['mean']), s, nonparallel=True)
self._run_test(lambda s: s.agg('mean'), s, nonparallel=True)
def test_dataframe_agg_modes(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.agg('sum'), df)
self._run_test(lambda df: df.agg(['sum', 'mean']), df, nonparallel=True)
self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
self._run_test(
lambda df: df.agg({
'A': 'sum', 'B': 'mean'
}), df, nonparallel=True)
self._run_test(
lambda df: df.agg({'A': ['sum', 'mean']}), df, nonparallel=True)
self._run_test(
lambda df: df.agg({
'A': ['sum', 'mean'], 'B': 'min'
}),
df,
nonparallel=True)
def test_series_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.max(level=0), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.median(level=0),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['foo', 'group']).bar.count(level=1),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.max(level=1), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.max(level='foo'),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
GROUPBY_DF)
def test_dataframe_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).max(
level=0, numeric_only=False),
GROUPBY_DF,
check_proxy=False)
# pandas implementation doesn't respect numeric_only argument here
# (https://github.com/pandas-dev/pandas/issues/40788), it
# always acts as if numeric_only=True. Our implmentation respects it so we
# need to make it explicit.
self._run_test(
lambda df: df.set_index(['group', 'foo']).sum(
level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo'])[['bar']].count(level=1),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).count(level=1), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).max(
level=1, numeric_only=False),
GROUPBY_DF,
check_proxy=False)
# sum with str columns is order-sensitive
self._run_test(
lambda df: df.set_index(['group', 'foo']).sum(
level=1, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).median(
level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.drop('str', axis=1).set_index(['foo', 'group']).median(
level=1, numeric_only=True),
GROUPBY_DF)
def test_series_agg_multifunc_level(self):
# level= is ignored for multiple agg fns
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.agg(['min', 'max'],
level=0),
GROUPBY_DF)
def test_dataframe_agg_multifunc_level(self):
# level= is ignored for multiple agg fns
self._run_test(
lambda df: df.set_index(['group', 'foo']).agg(['min', 'max'], level=0),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand([(True, ), (False, )])
@unittest.skipIf(
PD_VERSION < (1, 2),
"pandas 1.1.0 produces different dtypes for these examples")
def test_dataframe_agg_numeric_only(self, numeric_only):
# Note other aggregation functions can fail on this input with
# numeric_only={False,None}. These are the only ones that actually work for
# the string inputs.
self._run_test(
lambda df: df.max(numeric_only=numeric_only),
GROUPBY_DF,
check_proxy=False)
self._run_test(
lambda df: df.min(numeric_only=numeric_only),
GROUPBY_DF,
check_proxy=False)
@unittest.skip(
"pandas implementation doesn't respect numeric_only= with "
"level= (https://github.com/pandas-dev/pandas/issues/40788)")
def test_dataframe_agg_level_numeric_only(self):
self._run_test(
lambda df: df.set_index('foo').sum(level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index('foo').max(level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index('foo').mean(level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index('foo').median(level=0, numeric_only=True),
GROUPBY_DF)
def test_dataframe_agg_bool_only(self):
df = pd.DataFrame({
'all': [True for i in range(10)],
'any': [i % 3 == 0 for i in range(10)],
'int': range(10)
})
self._run_test(lambda df: df.all(), df)
self._run_test(lambda df: df.any(), df)
self._run_test(lambda df: df.all(bool_only=True), df)
self._run_test(lambda df: df.any(bool_only=True), df)
@unittest.skip(
"pandas doesn't implement bool_only= with level= "
"(https://github.com/pandas-dev/pandas/blob/"
"v1.2.3/pandas/core/generic.py#L10573)")
def test_dataframe_agg_level_bool_only(self):
df = pd.DataFrame({
'all': [True for i in range(10)],
'any': [i % 3 == 0 for i in range(10)],
'int': range(10)
})
self._run_test(lambda df: df.set_index('int', drop=False).all(level=0), df)
self._run_test(lambda df: df.set_index('int', drop=False).any(level=0), df)
self._run_test(
lambda df: df.set_index('int', drop=False).all(level=0, bool_only=True),
df)
self._run_test(
lambda df: df.set_index('int', drop=False).any(level=0, bool_only=True),
df)
def test_series_agg_np_size(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).agg(np.size),
GROUPBY_DF,
check_proxy=False)
def test_df_agg_invalid_kwarg_raises(self):
self._run_error_test(lambda df: df.agg('mean', bool_only=True), GROUPBY_DF)
self._run_error_test(
lambda df: df.agg('any', numeric_only=True), GROUPBY_DF)
self._run_error_test(
lambda df: df.agg('median', min_count=3, numeric_only=True), GROUPBY_DF)
def test_series_agg_method_invalid_kwarg_raises(self):
self._run_error_test(lambda df: df.foo.median(min_count=3), GROUPBY_DF)
self._run_error_test(
lambda df: df.foo.agg('median', min_count=3), GROUPBY_DF)
@unittest.skipIf(
PD_VERSION < (1, 3),
(
"DataFrame.agg raises a different exception from the "
"aggregation methods. Fixed in "
"https://github.com/pandas-dev/pandas/pull/40543."))
def test_df_agg_method_invalid_kwarg_raises(self):
self._run_error_test(lambda df: df.mean(bool_only=True), GROUPBY_DF)
self._run_error_test(lambda df: df.any(numeric_only=True), GROUPBY_DF)
self._run_error_test(
lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)
def test_agg_min_count(self):
df = pd.DataFrame({
'good': [1, 2, 3, np.nan],
'bad': [np.nan, np.nan, np.nan, 4],
},
index=['a', 'b', 'a', 'b'])
self._run_test(lambda df: df.sum(level=0, min_count=2), df)
self._run_test(lambda df: df.sum(min_count=3), df, nonparallel=True)
self._run_test(lambda df: df.sum(min_count=1), df, nonparallel=True)
self._run_test(lambda df: df.good.sum(min_count=2), df, nonparallel=True)
self._run_test(lambda df: df.bad.sum(min_count=2), df, nonparallel=True)
def test_series_agg_std(self):
s = pd.Series(range(10))
self._run_test(lambda s: s.agg('std'), s)
self._run_test(lambda s: s.agg('var'), s)
self._run_test(lambda s: s.agg(['std', 'sum']), s)
self._run_test(lambda s: s.agg(['var']), s)
def test_std_all_na(self):
s = pd.Series([np.nan] * 10)
self._run_test(lambda s: s.agg('std'), s)
self._run_test(lambda s: s.std(), s)
def test_std_mostly_na_with_ddof(self):
df = pd.DataFrame({
'one': [i if i % 8 == 0 else np.nan for i in range(8)],
'two': [i if i % 4 == 0 else np.nan for i in range(8)],
'three': [i if i % 2 == 0 else np.nan for i in range(8)],
},
index=pd.MultiIndex.from_arrays(
[list(range(8)), list(reversed(range(8)))],
names=['forward', None]))
self._run_test(lambda df: df.std(), df) # ddof=1
self._run_test(lambda df: df.std(ddof=0), df)
self._run_test(lambda df: df.std(ddof=2), df)
self._run_test(lambda df: df.std(ddof=3), df)
self._run_test(lambda df: df.std(ddof=4), df)
def test_dataframe_std(self):
self._run_test(lambda df: df.std(numeric_only=True), GROUPBY_DF)
self._run_test(lambda df: df.var(numeric_only=True), GROUPBY_DF)
def test_dataframe_mode(self):
self._run_test(
lambda df: df.mode(), GROUPBY_DF, nonparallel=True, check_proxy=False)
self._run_test(
lambda df: df.mode(numeric_only=True),
GROUPBY_DF,
nonparallel=True,
check_proxy=False)
self._run_test(
lambda df: df.mode(dropna=True, numeric_only=True),
GROUPBY_DF,
nonparallel=True,
check_proxy=False)
def test_series_mode(self):
self._run_test(lambda df: df.foo.mode(), GROUPBY_DF, nonparallel=True)
self._run_test(
lambda df: df.baz.mode(dropna=True), GROUPBY_DF, nonparallel=True)
class BeamSpecificTest(unittest.TestCase):
"""Tests for functionality that's specific to the Beam DataFrame API.
These features don't exist in pandas so we must verify them independently."""
def assert_frame_data_equivalent(self, actual, expected):
"""Verify that actual is the same as expected, ignoring the index and order
of the data."""
def sort_and_drop_index(df):
if isinstance(df, pd.Series):
df = df.sort_values()
elif isinstance(df, pd.DataFrame):
df = df.sort_values(by=list(df.columns))
return df.reset_index(drop=True)
actual = sort_and_drop_index(actual)
expected = sort_and_drop_index(expected)
if isinstance(expected, pd.Series):
pd.testing.assert_series_equal(actual, expected)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_frame_equal(actual, expected)
def _evaluate(self, func, *args, distributed=True):
deferred_args = [
frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(arg, arg[0:0])) for arg in args
]
session_type = (
expressions.PartitioningSession if distributed else expressions.Session)
return session_type({}).evaluate(func(*deferred_args)._expr)
def test_drop_duplicates_keep_any(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(lambda df: df.drop_duplicates(keep='any'), df)
# Verify that the result is the same as conventional drop_duplicates
self.assert_frame_data_equivalent(result, df.drop_duplicates())
def test_drop_duplicates_keep_any_subset(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(
lambda df: df.drop_duplicates(keep='any', subset=['brand']), df)
self.assertTrue(result.brand.unique)
self.assert_frame_data_equivalent(
result.brand, df.drop_duplicates(subset=['brand']).brand)
def test_series_drop_duplicates_keep_any(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(lambda df: df.brand.drop_duplicates(keep='any'), df)
self.assert_frame_data_equivalent(result, df.brand.drop_duplicates())
def test_duplicated_keep_any(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(lambda df: df.duplicated(keep='any'), df)
# Verify that the result is the same as conventional duplicated
self.assert_frame_data_equivalent(result, df.duplicated())
def test_nsmallest_any(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
result = self._evaluate(
lambda df: df.population.nsmallest(3, keep='any'), df)
# keep='any' should produce the same result as keep='first',
# but not necessarily with the same index
self.assert_frame_data_equivalent(result, df.population.nsmallest(3))
def test_nlargest_any(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
result = self._evaluate(
lambda df: df.population.nlargest(3, keep='any'), df)
# keep='any' should produce the same result as keep='first',
# but not necessarily with the same index
self.assert_frame_data_equivalent(result, df.population.nlargest(3))
def test_sample(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
result = self._evaluate(lambda df: df.sample(n=3), df)
self.assertEqual(len(result), 3)
series_result = self._evaluate(lambda df: df.GDP.sample(n=3), df)
self.assertEqual(len(series_result), 3)
self.assertEqual(series_result.name, "GDP")
def test_sample_with_weights(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
weights = pd.Series([0, 0, 0, 0, 0, 0, 0, 1, 1], index=df.index)
result = self._evaluate(
lambda df, weights: df.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(result), 2)
self.assertEqual(set(result.index), set(["Tuvalu", "Anguilla"]))
series_result = self._evaluate(
lambda df, weights: df.GDP.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(series_result), 2)
self.assertEqual(series_result.name, "GDP")
self.assertEqual(set(series_result.index), set(["Tuvalu", "Anguilla"]))
def test_sample_with_missing_weights(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
# Missing weights are treated as 0
weights = pd.Series([.1, .01, np.nan, 0],
index=["Nauru", "Iceland", "Anguilla", "Italy"])
result = self._evaluate(
lambda df, weights: df.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(result), 2)
self.assertEqual(set(result.index), set(["Nauru", "Iceland"]))
series_result = self._evaluate(
lambda df, weights: df.GDP.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(series_result), 2)
self.assertEqual(series_result.name, "GDP")
self.assertEqual(set(series_result.index), set(["Nauru", "Iceland"]))
def test_sample_with_weights_distribution(self):
target_prob = 0.25
num_samples = 100
num_targets = 200
num_other_elements = 10000
target_weight = target_prob / num_targets
other_weight = (1 - target_prob) / num_other_elements
self.assertTrue(target_weight > other_weight * 10, "weights too close")
result = self._evaluate(
lambda s,
weights: s.sample(n=num_samples, weights=weights).sum(),
# The first elements are 1, the rest are all 0. This means that when
# we sum all the sampled elements (above), the result should be the
# number of times the first elements (aka targets) were sampled.
pd.Series([1] * num_targets + [0] * num_other_elements),
pd.Series([target_weight] * num_targets +
[other_weight] * num_other_elements))
# With the above constants, the probability of violating this invariant
# (as computed using the Bernoulli distribution) is about 0.0012%.
expected = num_samples * target_prob
self.assertTrue(expected / 3 < result < expected * 2, (expected, result))
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
_ = frame_base.DeferredFrame.wrap(
expressions.PlaceholderExpression(pd.Series([1, 2, 3]))).replace(
'a', 'b', limit=1)
def test_disallow_non_parallel(self):
with self.assertRaises(expressions.NonParallelOperation):
self._use_non_parallel_operation()
def test_allow_non_parallel_in_context(self):
with beam.dataframe.allow_non_parallel_operations():
self._use_non_parallel_operation()
def test_allow_non_parallel_nesting(self):
# disallowed
with beam.dataframe.allow_non_parallel_operations():
# allowed
self._use_non_parallel_operation()
with beam.dataframe.allow_non_parallel_operations(False):
# disallowed again
with self.assertRaises(expressions.NonParallelOperation):
self._use_non_parallel_operation()
# allowed
self._use_non_parallel_operation()
# disallowed
with self.assertRaises(expressions.NonParallelOperation):
self._use_non_parallel_operation()
class ConstructionTimeTest(unittest.TestCase):
"""Tests for operations that can be executed eagerly."""
DF = pd.DataFrame({
'str_col': ['foo', 'bar'] * 3,
'int_col': [1, 2] * 3,
'flt_col': [1.1, 2.2] * 3,
'cat_col': pd.Series(list('aabbca'), dtype="category"),
'datetime_col': pd.Series(
pd.date_range(
'1/1/2000', periods=6, freq='m', tz='America/Los_Angeles'))
})
DEFERRED_DF = frame_base.DeferredFrame.wrap(
expressions.PlaceholderExpression(DF.iloc[:0]))
def _run_test(self, fn):
expected = fn(self.DF)
actual = fn(self.DEFERRED_DF)
if isinstance(expected, pd.Index):
pd.testing.assert_index_equal(expected, actual)
elif isinstance(expected, pd.Series):
pd.testing.assert_series_equal(expected, actual)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_frame_equal(expected, actual)
else:
self.assertEqual(expected, actual)
@parameterized.expand(DF.columns)
def test_series_name(self, col_name):
self._run_test(lambda df: df[col_name].name)
@parameterized.expand(DF.columns)
def test_series_dtype(self, col_name):
self._run_test(lambda df: df[col_name].dtype)
self._run_test(lambda df: df[col_name].dtypes)
def test_dataframe_columns(self):
self._run_test(lambda df: list(df.columns))
def test_dataframe_dtypes(self):
self._run_test(lambda df: list(df.dtypes))
def test_categories(self):
self._run_test(lambda df: df.cat_col.cat.categories)
def test_categorical_ordered(self):
self._run_test(lambda df: df.cat_col.cat.ordered)
def test_groupby_ndim(self):
self._run_test(lambda df: df.groupby('int_col').ndim)
def test_groupby_project_ndim(self):
self._run_test(lambda df: df.groupby('int_col').flt_col.ndim)
self._run_test(
lambda df: df.groupby('int_col')[['flt_col', 'str_col']].ndim)
def test_get_column_default_None(self):
# .get just returns default_value=None at construction time if the column
# doesn't exist
self._run_test(lambda df: df.get('FOO'))
def test_datetime_tz(self):
self._run_test(lambda df: df.datetime_col.dt.tz)
class DocstringTest(unittest.TestCase):
@parameterized.expand([
(frames.DeferredDataFrame, pd.DataFrame),
(frames.DeferredSeries, pd.Series),
#(frames._DeferredIndex, pd.Index),
(frames._DeferredStringMethods, pd.core.strings.StringMethods),
(
frames._DeferredCategoricalMethods,
pd.core.arrays.categorical.CategoricalAccessor),
(frames.DeferredGroupBy, pd.core.groupby.generic.DataFrameGroupBy),
(frames._DeferredGroupByCols, pd.core.groupby.generic.DataFrameGroupBy),
(
frames._DeferredDatetimeMethods,
pd.core.indexes.accessors.DatetimeProperties),
])
def test_docs_defined(self, beam_type, pd_type):
beam_attrs = set(dir(beam_type))
pd_attrs = set(dir(pd_type))
docstring_required = sorted([
attr for attr in beam_attrs.intersection(pd_attrs)
if getattr(pd_type, attr).__doc__ and not attr.startswith('_')
])
docstring_missing = [
attr for attr in docstring_required
if not getattr(beam_type, attr).__doc__
]
self.assertTrue(
len(docstring_missing) == 0,
f'{beam_type.__name__} is missing a docstring for '
f'{len(docstring_missing)}/{len(docstring_required)} '
f'({len(docstring_missing)/len(docstring_required):%}) '
f'operations:\n{docstring_missing}')
class ReprTest(unittest.TestCase):
def test_basic_dataframe(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['group', 'foo', 'bar', 'baz', 'bool', "
"'str'], index=<unnamed>)"))
def test_dataframe_with_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF.set_index('group')))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['foo', 'bar', 'baz', 'bool', 'str'], "
"index='group')"))
def test_dataframe_with_partial_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(
GROUPBY_DF.set_index([GROUPBY_DF.index, 'group'])))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['foo', 'bar', 'baz', 'bool', 'str'], "
"indexes=[<unnamed>, 'group'])"))
def test_dataframe_with_named_multi_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF.set_index(['str', 'group'])))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['foo', 'bar', 'baz', 'bool'], "
"indexes=['str', 'group'])"))
def test_dataframe_with_multiple_column_levels(self):
df = pd.DataFrame({
'foofoofoo': ['one', 'one', 'one', 'two', 'two', 'two'],
'barbar': ['A', 'B', 'C', 'A', 'B', 'C'],
'bazzy': [1, 2, 3, 4, 5, 6],
'zoop': ['x', 'y', 'z', 'q', 'w', 't']
})
df = df.pivot(index='foofoofoo', columns='barbar')
df = frame_base.DeferredFrame.wrap(expressions.ConstantExpression(df))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=[('bazzy', 'A'), ('bazzy', 'B'), "
"('bazzy', 'C'), ('zoop', 'A'), ('zoop', 'B'), ('zoop', 'C')], "
"index='foofoofoo')"))
def test_dataframe_with_multiple_column_and_multiple_index_levels(self):
df = pd.DataFrame({
'foofoofoo': ['one', 'one', 'one', 'two', 'two', 'two'],
'barbar': ['A', 'B', 'C', 'A', 'B', 'C'],
'bazzy': [1, 2, 3, 4, 5, 6],
'zoop': ['x', 'y', 'z', 'q', 'w', 't']
})
df = df.pivot(index='foofoofoo', columns='barbar')
df.index = [['a', 'b'], df.index]
# pandas repr displays this:
# bazzy zoop
# barbar A B C A B C
# foofoofoo
# a one 1 2 3 x y z
# b two 4 5 6 q w t
df = frame_base.DeferredFrame.wrap(expressions.ConstantExpression(df))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=[('bazzy', 'A'), ('bazzy', 'B'), "
"('bazzy', 'C'), ('zoop', 'A'), ('zoop', 'B'), ('zoop', 'C')], "
"indexes=[<unnamed>, 'foofoofoo'])"))
def test_basic_series(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF['bool']))
self.assertEqual(
repr(df), "DeferredSeries(name='bool', dtype=bool, index=<unnamed>)")
def test_series_with_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF.set_index('group')['str']))
self.assertEqual(
repr(df), "DeferredSeries(name='str', dtype=object, index='group')")
def test_series_with_partial_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(
GROUPBY_DF.set_index([GROUPBY_DF.index, 'group'])['bar']))
self.assertEqual(
repr(df),
(
"DeferredSeries(name='bar', dtype=float64, "
"indexes=[<unnamed>, 'group'])"))
def test_series_with_named_multi_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(
GROUPBY_DF.set_index(['str', 'group'])['baz']))
self.assertEqual(
repr(df),
"DeferredSeries(name='baz', dtype=float64, indexes=['str', 'group'])")
if __name__ == '__main__':
unittest.main()