[BEAM-11777][BEAM-11978] Add support for all kwargs in DataFrame, Series aggregation methods (#14438)
* agg(level=) implementation that defers to GroupBy(level=).agg
* min_count
* lint
* Disable numeric_only test for pandas <1.2
* fix skip
* yapf
* remove min_count raises test
* Reuire numeric_only= to be specified for numeric aggregations on DataFrames with non-numeric columns
* lint
* cleanups
* yapf
* _run_test correctly verifies nan outputs
* Use new _run_test infra
* Add additional testing for invalid kwargs
* yapf
* Track different singleton reasons in agg
* Remove TODOs
* yapf
* Address PR comments
* Switch to getattr in _(un)liftable_agg expressions
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 25355fe..3a60c28 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -37,6 +37,7 @@
import itertools
import math
import re
+import warnings
from typing import List
from typing import Optional
@@ -948,41 +949,87 @@
to_string = frame_base.wont_implement_method(
pd.Series, 'to_string', reason="non-deferred-result")
- def aggregate(self, func, axis=0, *args, **kwargs):
+ @frame_base.args_to_kwargs(pd.Series)
+ @frame_base.populate_defaults(pd.Series)
+ def aggregate(self, func, axis, *args, **kwargs):
+ if kwargs.get('skipna', False):
+ # Eagerly generate a proxy to make sure skipna is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().aggregate(func, axis, *args, **kwargs)
+ kwargs.pop('skipna')
+ return self.dropna().aggregate(func, axis, *args, **kwargs)
+
if isinstance(func, list) and len(func) > 1:
- # Aggregate each column separately, then stick them all together.
+ # level arg is ignored for multiple aggregations
+ _ = kwargs.pop('level', None)
+
+ # Aggregate with each method separately, then stick them all together.
rows = [self.agg([f], *args, **kwargs) for f in func]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *rows: pd.concat(rows), [row._expr for row in rows]))
else:
- # We're only handling a single column.
+ # We're only handling a single column. It could be 'func' or ['func'],
+ # which produce different results. 'func' produces a scalar, ['func']
+ # produces a single element Series.
base_func = func[0] if isinstance(func, list) else func
- if _is_associative(base_func) and not args and not kwargs:
+
+ if (_is_numeric(base_func) and
+ not pd.core.dtypes.common.is_numeric_dtype(self.dtype)):
+ warnings.warn(
+ f"Performing a numeric aggregation, {base_func!r}, on "
+ f"Series {self._expr.proxy().name!r} with non-numeric type "
+ f"{self.dtype!r}. This can result in runtime errors or surprising "
+ "results.")
+
+ if 'level' in kwargs:
+ # Defer to groupby.agg for level= mode
+ return self.groupby(
+ level=kwargs.pop('level'), axis=axis).agg(func, *args, **kwargs)
+
+ singleton_reason = None
+ if 'min_count' in kwargs:
+ # Eagerly generate a proxy to make sure min_count is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
+
+ singleton_reason = (
+ "Aggregation with min_count= requires collecting all data on a "
+ "single node.")
+
+ agg_kwargs = kwargs.copy()
+ if ((_is_associative(base_func) or _is_liftable_with_sum(base_func)) and
+ singleton_reason is None):
intermediate = expressions.ComputedExpression(
'pre_aggregate',
- lambda s: s.agg([base_func], *args, **kwargs), [self._expr],
+ # Coerce to a Series, if the result is scalar we still want a Series
+ # so we can combine and do the final aggregation next.
+ lambda s: pd.Series(s.agg(func, *args, **kwargs)),
+ [self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Singleton())
allow_nonparallel_final = True
+ if _is_associative(base_func):
+ agg_func = func
+ else:
+ agg_func = ['sum'] if isinstance(func, list) else 'sum'
else:
intermediate = self._expr
allow_nonparallel_final = None # i.e. don't change the value
+ agg_func = func
+ singleton_reason = (
+ f"Aggregation function {func!r} cannot currently be "
+ "parallelized, it requires collecting all data for "
+ "this Series on a single node.")
with expressions.allow_non_parallel_operations(allow_nonparallel_final):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda s: s.agg(func, *args, **kwargs),
- [intermediate],
- preserves_partition_by=partitionings.Arbitrary(),
- # TODO(BEAM-11839): This reason should be more specific. It's
- # actually incorrect for the args/kwargs case above.
+ lambda s: s.agg(agg_func, *args, **agg_kwargs), [intermediate],
+ preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton(
- reason=(
- f"Aggregation function {func!r} cannot currently be "
- "parallelized, it requires collecting all data for "
- "this Series on a single node."))))
+ reason=singleton_reason)))
agg = aggregate
@@ -994,6 +1041,7 @@
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
+ # TODO(BEAM-12074): Document that Series.count(level=) will drop NaN's
count = frame_base._agg_method('count')
min = frame_base._agg_method('min')
max = frame_base._agg_method('max')
@@ -1428,7 +1476,45 @@
preserves_partition_by=preserves,
requires_partition_by=partitionings.Arbitrary()))
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
def aggregate(self, func, axis=0, *args, **kwargs):
+ if 'numeric_only' in kwargs and kwargs['numeric_only']:
+ # Eagerly generate a proxy to make sure numeric_only is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
+
+ projected = self[[name for name, dtype in self.dtypes.items()
+ if pd.core.dtypes.common.is_numeric_dtype(dtype)]]
+ kwargs.pop('numeric_only')
+ return projected.agg(func, axis, *args, **kwargs)
+
+ if 'bool_only' in kwargs and kwargs['bool_only']:
+ # Eagerly generate a proxy to make sure bool_only is a valid argument
+ # for this aggregation method
+ _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
+
+ projected = self[[name for name, dtype in self.dtypes.items()
+ if pd.core.dtypes.common.is_bool_dtype(dtype)]]
+ kwargs.pop('bool_only')
+ return projected.agg(func, axis, *args, **kwargs)
+
+ nonnumeric_columns = [name for (name, dtype) in self.dtypes.items()
+ if not pd.core.dtypes.common.is_numeric_dtype(dtype)]
+ if _is_numeric(func) and len(nonnumeric_columns):
+ if 'numeric_only' in kwargs and kwargs['numeric_only'] is False:
+ # User has opted in to execution with non-numeric columns, they
+ # will accept runtime errors
+ pass
+ else:
+ raise frame_base.WontImplementError(
+ f"Numeric aggregation ({func!r}) on a DataFrame containing "
+ f"non-numeric columns ({*nonnumeric_columns,!r} is not supported, "
+ "unless `numeric_only=` is specified.\n"
+ "Use `numeric_only=True` to only aggregate over numeric columns.\n"
+ "Use `numeric_only=False` to aggregate over all columns. Note this "
+ "is not recommended, as it could result in execution time errors.")
+
if axis is None:
# Aggregate across all elements by first aggregating across columns,
# then across rows.
@@ -1442,14 +1528,13 @@
lambda df: df.agg(func, axis=1, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Arbitrary()))
- elif len(self._expr.proxy().columns) == 0 or args or kwargs:
- # For these corner cases, just colocate everything.
+ elif len(self._expr.proxy().columns) == 0:
+ # For this corner case, just colocate everything.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, *args, **kwargs),
[self._expr],
- # TODO(BEAM-11839): Provide a reason for this Singleton
requires_partition_by=partitionings.Singleton()))
else:
# In the general case, compute the aggregation of each column separately,
@@ -1460,15 +1545,19 @@
else:
col_names = list(func.keys())
aggregated_cols = []
+ has_lists = any(isinstance(f, list) for f in func.values())
for col in col_names:
funcs = func[col]
- if not isinstance(funcs, list):
+ if has_lists and not isinstance(funcs, list):
+ # If any of the columns do multiple aggregations, they all must use
+ # "list" style output
funcs = [funcs]
aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
# The final shape is different depending on whether any of the columns
# were aggregated by a list of aggregators.
with expressions.allow_non_parallel_operations():
- if any(isinstance(funcs, list) for funcs in func.values()):
+ if (any(isinstance(funcs, list) for funcs in func.values()) or
+ 'level' in kwargs):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
@@ -1481,7 +1570,7 @@
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.Series(
- {col: value[0] for col, value in zip(col_names, cols)}),
+ {col: value for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton(),
proxy=self._expr.proxy().agg(func, *args, **kwargs)))
@@ -2321,18 +2410,23 @@
self._grouping_indexes,
projection=name)
- def agg(self, fn):
- if not callable(fn):
- # TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
- # running doctests for pandas.core.groupby.generic
- raise NotImplementedError('GroupBy.agg currently only supports callable '
- 'arguments')
- return DeferredDataFrame(
- expressions.ComputedExpression(
- 'agg',
- lambda gb: gb.agg(fn), [self._expr],
- requires_partition_by=partitionings.Index(),
- preserves_partition_by=partitionings.Singleton()))
+ def agg(self, fn, *args, **kwargs):
+ if _is_associative(fn):
+ return _liftable_agg(fn)(self, *args, **kwargs)
+ elif _is_liftable_with_sum(fn):
+ return _liftable_agg(fn, postagg_meth='sum')(self, *args, **kwargs)
+ elif _is_unliftable(fn):
+ return _unliftable_agg(fn)(self, *args, **kwargs)
+ elif callable(fn):
+ return DeferredDataFrame(
+ expressions.ComputedExpression(
+ 'agg',
+ lambda gb: gb.agg(fn, *args, **kwargs), [self._expr],
+ requires_partition_by=partitionings.Index(),
+ preserves_partition_by=partitionings.Singleton()))
+ else:
+ raise NotImplementedError(f"GroupBy.agg(func={fn!r})")
+
def apply(self, fn, *args, **kwargs):
if self._grouping_columns and not self._projection:
@@ -2440,16 +2534,19 @@
def _liftable_agg(meth, postagg_meth=None):
- name, agg_func = frame_base.name_and_func(meth)
+ agg_name, _ = frame_base.name_and_func(meth)
if postagg_meth is None:
- post_agg_name, post_agg_func = name, agg_func
+ post_agg_name = agg_name
else:
- post_agg_name, post_agg_func = frame_base.name_and_func(postagg_meth)
+ post_agg_name, _ = frame_base.name_and_func(postagg_meth)
def wrapper(self, *args, **kwargs):
assert isinstance(self, DeferredGroupBy)
+ if 'min_count' in kwargs:
+ return _unliftable_agg(meth)(self, *args, **kwargs)
+
to_group = self._ungrouped.proxy().index
is_categorical_grouping = any(to_group.get_level_values(i).is_categorical()
for i in self._grouping_indexes)
@@ -2461,20 +2558,24 @@
project = _maybe_project_func(self._projection)
pre_agg = expressions.ComputedExpression(
- 'pre_combine_' + name,
- lambda df: agg_func(project(
- df.groupby(level=list(range(df.index.nlevels)),
- **preagg_groupby_kwargs),
- ), **kwargs),
+ 'pre_combine_' + agg_name,
+ lambda df: getattr(
+ project(
+ df.groupby(level=list(range(df.index.nlevels)),
+ **preagg_groupby_kwargs)
+ ),
+ agg_name)(**kwargs),
[self._ungrouped],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary())
+
post_agg = expressions.ComputedExpression(
'post_combine_' + post_agg_name,
- lambda df: post_agg_func(
- df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
- **kwargs),
+ lambda df: getattr(
+ df.groupby(level=list(range(df.index.nlevels)),
+ **groupby_kwargs),
+ post_agg_name)(**kwargs),
[pre_agg],
requires_partition_by=(partitionings.Singleton(reason=(
"Aggregations grouped by a categorical column are not currently "
@@ -2489,7 +2590,7 @@
def _unliftable_agg(meth):
- name, agg_func = frame_base.name_and_func(meth)
+ agg_name, _ = frame_base.name_and_func(meth)
def wrapper(self, *args, **kwargs):
assert isinstance(self, DeferredGroupBy)
@@ -2501,11 +2602,11 @@
groupby_kwargs = self._kwargs
project = _maybe_project_func(self._projection)
post_agg = expressions.ComputedExpression(
- name,
- lambda df: agg_func(project(
+ agg_name,
+ lambda df: getattr(project(
df.groupby(level=list(range(df.index.nlevels)),
**groupby_kwargs),
- ), **kwargs),
+ ), agg_name)(**kwargs),
[self._ungrouped],
requires_partition_by=(partitionings.Singleton(reason=(
"Aggregations grouped by a categorical column are not currently "
@@ -2529,13 +2630,27 @@
for meth in UNLIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
-
-def _is_associative(agg_func):
- return agg_func in LIFTABLE_AGGREGATIONS or (
- getattr(agg_func, '__name__', None) in LIFTABLE_AGGREGATIONS
+def _check_str_or_np_builtin(agg_func, func_list):
+ return agg_func in func_list or (
+ getattr(agg_func, '__name__', None) in func_list
and agg_func.__module__ in ('numpy', 'builtins'))
+def _is_associative(agg_func):
+ return _check_str_or_np_builtin(agg_func, LIFTABLE_AGGREGATIONS)
+
+def _is_liftable_with_sum(agg_func):
+ return _check_str_or_np_builtin(agg_func, LIFTABLE_WITH_SUM_AGGREGATIONS)
+
+def _is_unliftable(agg_func):
+ return _check_str_or_np_builtin(agg_func, UNLIFTABLE_AGGREGATIONS)
+
+NUMERIC_AGGREGATIONS = ['max', 'min', 'prod', 'sum', 'mean', 'median', 'std',
+ 'var']
+
+def _is_numeric(agg_func):
+ return _check_str_or_np_builtin(agg_func, NUMERIC_AGGREGATIONS)
+
@populate_not_implemented(DataFrameGroupBy)
class _DeferredGroupByCols(frame_base.DeferredFrame):
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 1cf1dfb..55e3892 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -160,7 +160,10 @@
else:
# Expectation is not a pandas object
if isinstance(expected, float):
- cmp = lambda x: np.isclose(expected, x)
+ if np.isnan(expected):
+ cmp = np.isnan
+ else:
+ cmp = lambda x: np.isclose(expected, x)
else:
cmp = expected.__eq__
self.assertTrue(
@@ -845,6 +848,204 @@
df2,
construction_time=False)
+ def test_series_agg_level(self):
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.max(level=0), GROUPBY_DF)
+
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.median(level=0),
+ GROUPBY_DF)
+
+ self._run_test(
+ lambda df: df.set_index(['foo', 'group']).bar.count(level=1),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.max(level=1), GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.max(level='foo'),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
+ GROUPBY_DF)
+
+ def test_dataframe_agg_level(self):
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).max(
+ level=0, numeric_only=False),
+ GROUPBY_DF)
+ # pandas implementation doesn't respect numeric_only argument here
+ # (https://github.com/pandas-dev/pandas/issues/40788), it
+ # always acts as if numeric_only=True. Our implmentation respects it so we
+ # need to make it explicit.
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).sum(
+ level=0, numeric_only=True),
+ GROUPBY_DF)
+
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo'])[['bar']].count(level=1),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).count(level=1), GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).max(
+ level=1, numeric_only=False),
+ GROUPBY_DF)
+ # sum with str columns is order-sensitive
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).sum(
+ level=1, numeric_only=True),
+ GROUPBY_DF)
+
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).median(
+ level=0, numeric_only=True),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.drop('str', axis=1).set_index(['foo', 'group']).median(
+ level=1, numeric_only=True),
+ GROUPBY_DF)
+
+ def test_series_agg_multifunc_level(self):
+ # level= is ignored for multiple agg fns
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).bar.agg(['min', 'max'],
+ level=0),
+ GROUPBY_DF)
+
+ def test_dataframe_agg_multifunc_level(self):
+ # level= is ignored for multiple agg fns
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).agg(['min', 'max'], level=0),
+ GROUPBY_DF)
+
+ @parameterized.expand([(True, ), (False, )])
+ @unittest.skipIf(
+ PD_VERSION < (1, 2),
+ "pandas 1.1.0 produces different dtypes for these examples")
+ def test_dataframe_agg_numeric_only(self, numeric_only):
+ # Note other aggregation functions can fail on this input with
+ # numeric_only={False,None}. These are the only ones that actually work for
+ # the string inputs.
+ self._run_test(lambda df: df.max(numeric_only=numeric_only), GROUPBY_DF)
+ self._run_test(lambda df: df.min(numeric_only=numeric_only), GROUPBY_DF)
+
+ @unittest.skip(
+ "pandas implementation doesn't respect numeric_only= with "
+ "level= (https://github.com/pandas-dev/pandas/issues/40788)")
+ def test_dataframe_agg_level_numeric_only(self):
+ self._run_test(
+ lambda df: df.set_index('foo').sum(level=0, numeric_only=True),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index('foo').max(level=0, numeric_only=True),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index('foo').mean(level=0, numeric_only=True),
+ GROUPBY_DF)
+ self._run_test(
+ lambda df: df.set_index('foo').median(level=0, numeric_only=True),
+ GROUPBY_DF)
+
+ def test_dataframe_agg_bool_only(self):
+ df = pd.DataFrame({
+ 'all': [True for i in range(10)],
+ 'any': [i % 3 == 0 for i in range(10)],
+ 'int': range(10)
+ })
+
+ self._run_test(lambda df: df.all(), df)
+ self._run_test(lambda df: df.any(), df)
+ self._run_test(lambda df: df.all(bool_only=True), df)
+ self._run_test(lambda df: df.any(bool_only=True), df)
+
+ @unittest.skip(
+ "pandas doesn't implement bool_only= with level= "
+ "(https://github.com/pandas-dev/pandas/blob/"
+ "v1.2.3/pandas/core/generic.py#L10573)")
+ def test_dataframe_agg_level_bool_only(self):
+ df = pd.DataFrame({
+ 'all': [True for i in range(10)],
+ 'any': [i % 3 == 0 for i in range(10)],
+ 'int': range(10)
+ })
+
+ self._run_test(lambda df: df.set_index('int', drop=False).all(level=0), df)
+ self._run_test(lambda df: df.set_index('int', drop=False).any(level=0), df)
+ self._run_test(
+ lambda df: df.set_index('int', drop=False).all(level=0, bool_only=True),
+ df)
+ self._run_test(
+ lambda df: df.set_index('int', drop=False).any(level=0, bool_only=True),
+ df)
+
+ def test_series_agg_np_size(self):
+ self._run_test(
+ lambda df: df.set_index(['group', 'foo']).agg(np.size), GROUPBY_DF)
+
+ def test_df_agg_invalid_kwarg_raises(self):
+ self._run_error_test(lambda df: df.agg('mean', bool_only=True), GROUPBY_DF)
+ self._run_error_test(
+ lambda df: df.agg('any', numeric_only=True), GROUPBY_DF)
+ self._run_error_test(
+ lambda df: df.agg('median', min_count=3, numeric_only=True), GROUPBY_DF)
+
+ def test_series_agg_method_invalid_kwarg_raises(self):
+ self._run_error_test(lambda df: df.foo.median(min_count=3), GROUPBY_DF)
+ self._run_error_test(
+ lambda df: df.foo.agg('median', min_count=3), GROUPBY_DF)
+
+ @unittest.skipIf(
+ PD_VERSION < (1, 3),
+ (
+ "DataFrame.agg raises a different exception from the "
+ "aggregation methods. Fixed in "
+ "https://github.com/pandas-dev/pandas/pull/40543."))
+ def test_df_agg_method_invalid_kwarg_raises(self):
+ self._run_error_test(lambda df: df.mean(bool_only=True), GROUPBY_DF)
+ self._run_error_test(lambda df: df.any(numeric_only=True), GROUPBY_DF)
+ self._run_error_test(
+ lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)
+
+ def test_agg_min_count(self):
+ df = pd.DataFrame({
+ 'good': [1, 2, 3, np.nan],
+ 'bad': [np.nan, np.nan, np.nan, 4],
+ },
+ index=['a', 'b', 'a', 'b'])
+
+ self._run_test(lambda df: df.sum(level=0, min_count=2), df)
+
+ self._run_test(lambda df: df.sum(min_count=3), df, nonparallel=True)
+ self._run_test(lambda df: df.sum(min_count=1), df, nonparallel=True)
+ self._run_test(lambda df: df.good.sum(min_count=2), df, nonparallel=True)
+ self._run_test(lambda df: df.bad.sum(min_count=2), df, nonparallel=True)
+
+ def test_groupby_sum_min_count(self):
+ df = pd.DataFrame({
+ 'good': [1, 2, 3, np.nan],
+ 'bad': [np.nan, np.nan, np.nan, 4],
+ 'group': ['a', 'b', 'a', 'b']
+ })
+
+ self._run_test(lambda df: df.groupby('group').sum(min_count=2), df)
+
+ def test_dataframe_sum_nonnumeric_raises(self):
+ # Attempting a numeric aggregation with the str column present should
+ # raise, and suggest the numeric_only argument
+ with self.assertRaisesRegex(frame_base.WontImplementError, 'numeric_only'):
+ self._run_test(lambda df: df.sum(), GROUPBY_DF)
+
+ # numeric_only=True should work
+ self._run_test(lambda df: df.sum(numeric_only=True), GROUPBY_DF)
+ # projecting only numeric columns should too
+ self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):