[BEAM-11777][BEAM-11978] Add support for all kwargs in DataFrame, Series aggregation methods (#14438)

* agg(level=) implementation that defers to GroupBy(level=).agg

* min_count

* lint

* Disable numeric_only test for pandas <1.2

* fix skip

* yapf

* remove min_count raises test

* Reuire numeric_only= to be specified for numeric aggregations on DataFrames with non-numeric columns

* lint

* cleanups

* yapf

* _run_test correctly verifies nan outputs

* Use new _run_test infra

* Add additional testing for invalid kwargs

* yapf

* Track different singleton reasons in agg

* Remove TODOs

* yapf

* Address PR comments

* Switch to getattr in _(un)liftable_agg expressions
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 25355fe..3a60c28 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -37,6 +37,7 @@
 import itertools
 import math
 import re
+import warnings
 from typing import List
 from typing import Optional
 
@@ -948,41 +949,87 @@
   to_string = frame_base.wont_implement_method(
       pd.Series, 'to_string', reason="non-deferred-result")
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def aggregate(self, func, axis, *args, **kwargs):
+    if kwargs.get('skipna', False):
+      # Eagerly generate a proxy to make sure skipna is a valid argument
+      # for this aggregation method
+      _ = self._expr.proxy().aggregate(func, axis, *args, **kwargs)
+      kwargs.pop('skipna')
+      return self.dropna().aggregate(func, axis, *args, **kwargs)
+
     if isinstance(func, list) and len(func) > 1:
-      # Aggregate each column separately, then stick them all together.
+      # level arg is ignored for multiple aggregations
+      _ = kwargs.pop('level', None)
+
+      # Aggregate with each method separately, then stick them all together.
       rows = [self.agg([f], *args, **kwargs) for f in func]
       return frame_base.DeferredFrame.wrap(
           expressions.ComputedExpression(
               'join_aggregate',
               lambda *rows: pd.concat(rows), [row._expr for row in rows]))
     else:
-      # We're only handling a single column.
+      # We're only handling a single column. It could be 'func' or ['func'],
+      # which produce different results. 'func' produces a scalar, ['func']
+      # produces a single element Series.
       base_func = func[0] if isinstance(func, list) else func
-      if _is_associative(base_func) and not args and not kwargs:
+
+      if (_is_numeric(base_func) and
+          not pd.core.dtypes.common.is_numeric_dtype(self.dtype)):
+        warnings.warn(
+            f"Performing a numeric aggregation, {base_func!r}, on "
+            f"Series {self._expr.proxy().name!r} with non-numeric type "
+            f"{self.dtype!r}. This can result in runtime errors or surprising "
+            "results.")
+
+      if 'level' in kwargs:
+        # Defer to groupby.agg for level= mode
+        return self.groupby(
+            level=kwargs.pop('level'), axis=axis).agg(func, *args, **kwargs)
+
+      singleton_reason = None
+      if 'min_count' in kwargs:
+        # Eagerly generate a proxy to make sure min_count is a valid argument
+        # for this aggregation method
+        _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
+
+        singleton_reason = (
+            "Aggregation with min_count= requires collecting all data on a "
+            "single node.")
+
+      agg_kwargs = kwargs.copy()
+      if ((_is_associative(base_func) or _is_liftable_with_sum(base_func)) and
+          singleton_reason is None):
         intermediate = expressions.ComputedExpression(
             'pre_aggregate',
-            lambda s: s.agg([base_func], *args, **kwargs), [self._expr],
+            # Coerce to a Series, if the result is scalar we still want a Series
+            # so we can combine and do the final aggregation next.
+            lambda s: pd.Series(s.agg(func, *args, **kwargs)),
+            [self._expr],
             requires_partition_by=partitionings.Arbitrary(),
             preserves_partition_by=partitionings.Singleton())
         allow_nonparallel_final = True
+        if _is_associative(base_func):
+          agg_func = func
+        else:
+          agg_func = ['sum'] if isinstance(func, list) else 'sum'
       else:
         intermediate = self._expr
         allow_nonparallel_final = None  # i.e. don't change the value
+        agg_func = func
+        singleton_reason = (
+            f"Aggregation function {func!r} cannot currently be "
+            "parallelized, it requires collecting all data for "
+            "this Series on a single node.")
       with expressions.allow_non_parallel_operations(allow_nonparallel_final):
         return frame_base.DeferredFrame.wrap(
             expressions.ComputedExpression(
                 'aggregate',
-                lambda s: s.agg(func, *args, **kwargs),
-                [intermediate],
-                preserves_partition_by=partitionings.Arbitrary(),
-                # TODO(BEAM-11839): This reason should be more specific. It's
-                # actually incorrect for the args/kwargs case above.
+                lambda s: s.agg(agg_func, *args, **agg_kwargs), [intermediate],
+                preserves_partition_by=partitionings.Singleton(),
                 requires_partition_by=partitionings.Singleton(
-                    reason=(
-                        f"Aggregation function {func!r} cannot currently be "
-                        "parallelized, it requires collecting all data for "
-                        "this Series on a single node."))))
+                    reason=singleton_reason)))
 
   agg = aggregate
 
@@ -994,6 +1041,7 @@
 
   all = frame_base._agg_method('all')
   any = frame_base._agg_method('any')
+  # TODO(BEAM-12074): Document that Series.count(level=) will drop NaN's
   count = frame_base._agg_method('count')
   min = frame_base._agg_method('min')
   max = frame_base._agg_method('max')
@@ -1428,7 +1476,45 @@
             preserves_partition_by=preserves,
             requires_partition_by=partitionings.Arbitrary()))
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
   def aggregate(self, func, axis=0, *args, **kwargs):
+    if 'numeric_only' in kwargs and kwargs['numeric_only']:
+      # Eagerly generate a proxy to make sure numeric_only is a valid argument
+      # for this aggregation method
+      _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
+
+      projected = self[[name for name, dtype in self.dtypes.items()
+                        if pd.core.dtypes.common.is_numeric_dtype(dtype)]]
+      kwargs.pop('numeric_only')
+      return projected.agg(func, axis, *args, **kwargs)
+
+    if 'bool_only' in kwargs and kwargs['bool_only']:
+      # Eagerly generate a proxy to make sure bool_only is a valid argument
+      # for this aggregation method
+      _ = self._expr.proxy().agg(func, axis, *args, **kwargs)
+
+      projected = self[[name for name, dtype in self.dtypes.items()
+                        if pd.core.dtypes.common.is_bool_dtype(dtype)]]
+      kwargs.pop('bool_only')
+      return projected.agg(func, axis, *args, **kwargs)
+
+    nonnumeric_columns = [name for (name, dtype) in self.dtypes.items()
+                          if not pd.core.dtypes.common.is_numeric_dtype(dtype)]
+    if _is_numeric(func) and len(nonnumeric_columns):
+      if 'numeric_only' in kwargs and kwargs['numeric_only'] is False:
+        # User has opted in to execution with non-numeric columns, they
+        # will accept runtime errors
+        pass
+      else:
+        raise frame_base.WontImplementError(
+            f"Numeric aggregation ({func!r}) on a DataFrame containing "
+            f"non-numeric columns ({*nonnumeric_columns,!r} is not supported, "
+            "unless `numeric_only=` is specified.\n"
+            "Use `numeric_only=True` to only aggregate over numeric columns.\n"
+            "Use `numeric_only=False` to aggregate over all columns. Note this "
+            "is not recommended, as it could result in execution time errors.")
+
     if axis is None:
       # Aggregate across all elements by first aggregating across columns,
       # then across rows.
@@ -1442,14 +1528,13 @@
               lambda df: df.agg(func, axis=1, *args, **kwargs),
               [self._expr],
               requires_partition_by=partitionings.Arbitrary()))
-    elif len(self._expr.proxy().columns) == 0 or args or kwargs:
-      # For these corner cases, just colocate everything.
+    elif len(self._expr.proxy().columns) == 0:
+      # For this corner case, just colocate everything.
       return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
             lambda df: df.agg(func, *args, **kwargs),
             [self._expr],
-            # TODO(BEAM-11839): Provide a reason for this Singleton
             requires_partition_by=partitionings.Singleton()))
     else:
       # In the general case, compute the aggregation of each column separately,
@@ -1460,15 +1545,19 @@
       else:
         col_names = list(func.keys())
       aggregated_cols = []
+      has_lists = any(isinstance(f, list) for f in func.values())
       for col in col_names:
         funcs = func[col]
-        if not isinstance(funcs, list):
+        if has_lists and not isinstance(funcs, list):
+          # If any of the columns do multiple aggregations, they all must use
+          # "list" style output
           funcs = [funcs]
         aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
       # The final shape is different depending on whether any of the columns
       # were aggregated by a list of aggregators.
       with expressions.allow_non_parallel_operations():
-        if any(isinstance(funcs, list) for funcs in func.values()):
+        if (any(isinstance(funcs, list) for funcs in func.values()) or
+            'level' in kwargs):
           return frame_base.DeferredFrame.wrap(
               expressions.ComputedExpression(
                   'join_aggregate',
@@ -1481,7 +1570,7 @@
             expressions.ComputedExpression(
                 'join_aggregate',
                   lambda *cols: pd.Series(
-                      {col: value[0] for col, value in zip(col_names, cols)}),
+                      {col: value for col, value in zip(col_names, cols)}),
                 [col._expr for col in aggregated_cols],
                 requires_partition_by=partitionings.Singleton(),
                 proxy=self._expr.proxy().agg(func, *args, **kwargs)))
@@ -2321,18 +2410,23 @@
         self._grouping_indexes,
         projection=name)
 
-  def agg(self, fn):
-    if not callable(fn):
-      # TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
-      # running doctests for pandas.core.groupby.generic
-      raise NotImplementedError('GroupBy.agg currently only supports callable '
-                                'arguments')
-    return DeferredDataFrame(
-        expressions.ComputedExpression(
-            'agg',
-            lambda gb: gb.agg(fn), [self._expr],
-            requires_partition_by=partitionings.Index(),
-            preserves_partition_by=partitionings.Singleton()))
+  def agg(self, fn, *args, **kwargs):
+    if _is_associative(fn):
+      return _liftable_agg(fn)(self, *args, **kwargs)
+    elif _is_liftable_with_sum(fn):
+      return _liftable_agg(fn, postagg_meth='sum')(self, *args, **kwargs)
+    elif _is_unliftable(fn):
+      return _unliftable_agg(fn)(self, *args, **kwargs)
+    elif callable(fn):
+      return DeferredDataFrame(
+          expressions.ComputedExpression(
+              'agg',
+              lambda gb: gb.agg(fn, *args, **kwargs), [self._expr],
+              requires_partition_by=partitionings.Index(),
+              preserves_partition_by=partitionings.Singleton()))
+    else:
+      raise NotImplementedError(f"GroupBy.agg(func={fn!r})")
+
 
   def apply(self, fn, *args, **kwargs):
     if self._grouping_columns and not self._projection:
@@ -2440,16 +2534,19 @@
 
 
 def _liftable_agg(meth, postagg_meth=None):
-  name, agg_func = frame_base.name_and_func(meth)
+  agg_name, _ = frame_base.name_and_func(meth)
 
   if postagg_meth is None:
-    post_agg_name, post_agg_func = name, agg_func
+    post_agg_name = agg_name
   else:
-    post_agg_name, post_agg_func = frame_base.name_and_func(postagg_meth)
+    post_agg_name, _ = frame_base.name_and_func(postagg_meth)
 
   def wrapper(self, *args, **kwargs):
     assert isinstance(self, DeferredGroupBy)
 
+    if 'min_count' in kwargs:
+      return _unliftable_agg(meth)(self, *args, **kwargs)
+
     to_group = self._ungrouped.proxy().index
     is_categorical_grouping = any(to_group.get_level_values(i).is_categorical()
                                   for i in self._grouping_indexes)
@@ -2461,20 +2558,24 @@
 
     project = _maybe_project_func(self._projection)
     pre_agg = expressions.ComputedExpression(
-        'pre_combine_' + name,
-        lambda df: agg_func(project(
-            df.groupby(level=list(range(df.index.nlevels)),
-                   **preagg_groupby_kwargs),
-        ), **kwargs),
+        'pre_combine_' + agg_name,
+        lambda df: getattr(
+            project(
+                df.groupby(level=list(range(df.index.nlevels)),
+                           **preagg_groupby_kwargs)
+            ),
+            agg_name)(**kwargs),
         [self._ungrouped],
         requires_partition_by=partitionings.Arbitrary(),
         preserves_partition_by=partitionings.Arbitrary())
 
+
     post_agg = expressions.ComputedExpression(
         'post_combine_' + post_agg_name,
-        lambda df: post_agg_func(
-            df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
-            **kwargs),
+        lambda df: getattr(
+            df.groupby(level=list(range(df.index.nlevels)),
+                       **groupby_kwargs),
+            post_agg_name)(**kwargs),
         [pre_agg],
         requires_partition_by=(partitionings.Singleton(reason=(
             "Aggregations grouped by a categorical column are not currently "
@@ -2489,7 +2590,7 @@
 
 
 def _unliftable_agg(meth):
-  name, agg_func = frame_base.name_and_func(meth)
+  agg_name, _ = frame_base.name_and_func(meth)
 
   def wrapper(self, *args, **kwargs):
     assert isinstance(self, DeferredGroupBy)
@@ -2501,11 +2602,11 @@
     groupby_kwargs = self._kwargs
     project = _maybe_project_func(self._projection)
     post_agg = expressions.ComputedExpression(
-        name,
-        lambda df: agg_func(project(
+        agg_name,
+        lambda df: getattr(project(
             df.groupby(level=list(range(df.index.nlevels)),
                        **groupby_kwargs),
-            ), **kwargs),
+        ), agg_name)(**kwargs),
         [self._ungrouped],
         requires_partition_by=(partitionings.Singleton(reason=(
             "Aggregations grouped by a categorical column are not currently "
@@ -2529,13 +2630,27 @@
 for meth in UNLIFTABLE_AGGREGATIONS:
   setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
 
-
-def _is_associative(agg_func):
-  return agg_func in LIFTABLE_AGGREGATIONS or (
-      getattr(agg_func, '__name__', None) in LIFTABLE_AGGREGATIONS
+def _check_str_or_np_builtin(agg_func, func_list):
+  return agg_func in func_list or (
+      getattr(agg_func, '__name__', None) in func_list
       and agg_func.__module__ in ('numpy', 'builtins'))
 
 
+def _is_associative(agg_func):
+  return _check_str_or_np_builtin(agg_func, LIFTABLE_AGGREGATIONS)
+
+def _is_liftable_with_sum(agg_func):
+  return _check_str_or_np_builtin(agg_func, LIFTABLE_WITH_SUM_AGGREGATIONS)
+
+def _is_unliftable(agg_func):
+  return _check_str_or_np_builtin(agg_func, UNLIFTABLE_AGGREGATIONS)
+
+NUMERIC_AGGREGATIONS = ['max', 'min', 'prod', 'sum', 'mean', 'median', 'std',
+                        'var']
+
+def _is_numeric(agg_func):
+  return _check_str_or_np_builtin(agg_func, NUMERIC_AGGREGATIONS)
+
 
 @populate_not_implemented(DataFrameGroupBy)
 class _DeferredGroupByCols(frame_base.DeferredFrame):
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 1cf1dfb..55e3892 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -160,7 +160,10 @@
     else:
       # Expectation is not a pandas object
       if isinstance(expected, float):
-        cmp = lambda x: np.isclose(expected, x)
+        if np.isnan(expected):
+          cmp = np.isnan
+        else:
+          cmp = lambda x: np.isclose(expected, x)
       else:
         cmp = expected.__eq__
       self.assertTrue(
@@ -845,6 +848,204 @@
         df2,
         construction_time=False)
 
+  def test_series_agg_level(self):
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.max(level=0), GROUPBY_DF)
+
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.median(level=0),
+        GROUPBY_DF)
+
+    self._run_test(
+        lambda df: df.set_index(['foo', 'group']).bar.count(level=1),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.max(level=1), GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.max(level='foo'),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
+        GROUPBY_DF)
+
+  def test_dataframe_agg_level(self):
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).max(
+            level=0, numeric_only=False),
+        GROUPBY_DF)
+    # pandas implementation doesn't respect numeric_only argument here
+    # (https://github.com/pandas-dev/pandas/issues/40788), it
+    # always acts as if numeric_only=True. Our implmentation respects it so we
+    # need to make it explicit.
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).sum(
+            level=0, numeric_only=True),
+        GROUPBY_DF)
+
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo'])[['bar']].count(level=1),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).count(level=1), GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).max(
+            level=1, numeric_only=False),
+        GROUPBY_DF)
+    # sum with str columns is order-sensitive
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).sum(
+            level=1, numeric_only=True),
+        GROUPBY_DF)
+
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).median(
+            level=0, numeric_only=True),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.drop('str', axis=1).set_index(['foo', 'group']).median(
+            level=1, numeric_only=True),
+        GROUPBY_DF)
+
+  def test_series_agg_multifunc_level(self):
+    # level= is ignored for multiple agg fns
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).bar.agg(['min', 'max'],
+                                                          level=0),
+        GROUPBY_DF)
+
+  def test_dataframe_agg_multifunc_level(self):
+    # level= is ignored for multiple agg fns
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).agg(['min', 'max'], level=0),
+        GROUPBY_DF)
+
+  @parameterized.expand([(True, ), (False, )])
+  @unittest.skipIf(
+      PD_VERSION < (1, 2),
+      "pandas 1.1.0 produces different dtypes for these examples")
+  def test_dataframe_agg_numeric_only(self, numeric_only):
+    # Note other aggregation functions can fail on this input with
+    # numeric_only={False,None}. These are the only ones that actually work for
+    # the string inputs.
+    self._run_test(lambda df: df.max(numeric_only=numeric_only), GROUPBY_DF)
+    self._run_test(lambda df: df.min(numeric_only=numeric_only), GROUPBY_DF)
+
+  @unittest.skip(
+      "pandas implementation doesn't respect numeric_only= with "
+      "level= (https://github.com/pandas-dev/pandas/issues/40788)")
+  def test_dataframe_agg_level_numeric_only(self):
+    self._run_test(
+        lambda df: df.set_index('foo').sum(level=0, numeric_only=True),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index('foo').max(level=0, numeric_only=True),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index('foo').mean(level=0, numeric_only=True),
+        GROUPBY_DF)
+    self._run_test(
+        lambda df: df.set_index('foo').median(level=0, numeric_only=True),
+        GROUPBY_DF)
+
+  def test_dataframe_agg_bool_only(self):
+    df = pd.DataFrame({
+        'all': [True for i in range(10)],
+        'any': [i % 3 == 0 for i in range(10)],
+        'int': range(10)
+    })
+
+    self._run_test(lambda df: df.all(), df)
+    self._run_test(lambda df: df.any(), df)
+    self._run_test(lambda df: df.all(bool_only=True), df)
+    self._run_test(lambda df: df.any(bool_only=True), df)
+
+  @unittest.skip(
+      "pandas doesn't implement bool_only= with level= "
+      "(https://github.com/pandas-dev/pandas/blob/"
+      "v1.2.3/pandas/core/generic.py#L10573)")
+  def test_dataframe_agg_level_bool_only(self):
+    df = pd.DataFrame({
+        'all': [True for i in range(10)],
+        'any': [i % 3 == 0 for i in range(10)],
+        'int': range(10)
+    })
+
+    self._run_test(lambda df: df.set_index('int', drop=False).all(level=0), df)
+    self._run_test(lambda df: df.set_index('int', drop=False).any(level=0), df)
+    self._run_test(
+        lambda df: df.set_index('int', drop=False).all(level=0, bool_only=True),
+        df)
+    self._run_test(
+        lambda df: df.set_index('int', drop=False).any(level=0, bool_only=True),
+        df)
+
+  def test_series_agg_np_size(self):
+    self._run_test(
+        lambda df: df.set_index(['group', 'foo']).agg(np.size), GROUPBY_DF)
+
+  def test_df_agg_invalid_kwarg_raises(self):
+    self._run_error_test(lambda df: df.agg('mean', bool_only=True), GROUPBY_DF)
+    self._run_error_test(
+        lambda df: df.agg('any', numeric_only=True), GROUPBY_DF)
+    self._run_error_test(
+        lambda df: df.agg('median', min_count=3, numeric_only=True), GROUPBY_DF)
+
+  def test_series_agg_method_invalid_kwarg_raises(self):
+    self._run_error_test(lambda df: df.foo.median(min_count=3), GROUPBY_DF)
+    self._run_error_test(
+        lambda df: df.foo.agg('median', min_count=3), GROUPBY_DF)
+
+  @unittest.skipIf(
+      PD_VERSION < (1, 3),
+      (
+          "DataFrame.agg raises a different exception from the "
+          "aggregation methods. Fixed in "
+          "https://github.com/pandas-dev/pandas/pull/40543."))
+  def test_df_agg_method_invalid_kwarg_raises(self):
+    self._run_error_test(lambda df: df.mean(bool_only=True), GROUPBY_DF)
+    self._run_error_test(lambda df: df.any(numeric_only=True), GROUPBY_DF)
+    self._run_error_test(
+        lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)
+
+  def test_agg_min_count(self):
+    df = pd.DataFrame({
+        'good': [1, 2, 3, np.nan],
+        'bad': [np.nan, np.nan, np.nan, 4],
+    },
+                      index=['a', 'b', 'a', 'b'])
+
+    self._run_test(lambda df: df.sum(level=0, min_count=2), df)
+
+    self._run_test(lambda df: df.sum(min_count=3), df, nonparallel=True)
+    self._run_test(lambda df: df.sum(min_count=1), df, nonparallel=True)
+    self._run_test(lambda df: df.good.sum(min_count=2), df, nonparallel=True)
+    self._run_test(lambda df: df.bad.sum(min_count=2), df, nonparallel=True)
+
+  def test_groupby_sum_min_count(self):
+    df = pd.DataFrame({
+        'good': [1, 2, 3, np.nan],
+        'bad': [np.nan, np.nan, np.nan, 4],
+        'group': ['a', 'b', 'a', 'b']
+    })
+
+    self._run_test(lambda df: df.groupby('group').sum(min_count=2), df)
+
+  def test_dataframe_sum_nonnumeric_raises(self):
+    # Attempting a numeric aggregation with the str column present should
+    # raise, and suggest the numeric_only argument
+    with self.assertRaisesRegex(frame_base.WontImplementError, 'numeric_only'):
+      self._run_test(lambda df: df.sum(), GROUPBY_DF)
+
+    # numeric_only=True should work
+    self._run_test(lambda df: df.sum(numeric_only=True), GROUPBY_DF)
+    # projecting only numeric columns should too
+    self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):