[BEAM-11839] Add reason for most Singleton() requirements (#14569)

[BEAM-11839] Add reason for most Singleton() requirements
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 80ee149..54a15c5 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -348,10 +348,16 @@
       preserves_partition_by: The level of partitioning preserved.
     """
     if (not _get_allow_non_parallel() and
-        requires_partition_by == partitionings.Singleton()):
+        isinstance(requires_partition_by, partitionings.Singleton)):
+      reason = requires_partition_by.reason or (
+          f"Encountered non-parallelizable form of {name!r}.")
+
       raise NonParallelOperation(
-          "Using non-parallel form of %s "
-          "outside of allow_non_parallel_operations block." % name)
+          f"{reason}\n"
+          "Consider using an allow_non_parallel_operations block if you're "
+          "sure you want to do this. See "
+          "https://s.apache.org/dataframe-non-parallel-operations for more "
+          "information.")
     args = tuple(args)
     if proxy is None:
       proxy = func(*(arg.proxy() for arg in args))
@@ -406,4 +412,6 @@
 
 
 class NonParallelOperation(Exception):
-  pass
+  def __init__(self, msg):
+    super(NonParallelOperation, self).__init__(self, msg)
+    self.msg = msg
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 1f892f4..34fb037 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -109,7 +109,11 @@
     if index is not None and errors == 'raise':
       # In order to raise an error about missing index values, we'll
       # need to collect the entire dataframe.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              "drop(errors='raise', axis='index') is not currently "
+              "parallelizable. This requires collecting all data on a single "
+              f"node in order to detect if one of {index!r} is missing."))
     else:
       requires = partitionings.Arbitrary()
 
@@ -142,24 +146,26 @@
   def fillna(self, value, method, axis, limit, **kwargs):
     # Default value is None, but is overriden with index.
     axis = axis or 'index'
-    if method is not None and axis in (0, 'index'):
-      raise frame_base.WontImplementError(
-          f"fillna(method={method!r}) is not supported because it is "
-          "order-sensitive. Only fillna(method=None) is supported.",
-          reason="order-sensitive")
+
+    if axis in (0, 'index'):
+      if method is not None:
+        raise frame_base.WontImplementError(
+            f"fillna(method={method!r}, axis={axis!r}) is not supported "
+            "because it is order-sensitive. Only fillna(method=None) is "
+            f"supported with axis={axis!r}.",
+            reason="order-sensitive")
+      if limit is not None:
+        raise frame_base.WontImplementError(
+            f"fillna(limit={method!r}, axis={axis!r}) is not supported because "
+            "it is order-sensitive. Only fillna(limit=None) is supported with "
+            f"axis={axis!r}.",
+            reason="order-sensitive")
+
     if isinstance(value, frame_base.DeferredBase):
       value_expr = value._expr
     else:
       value_expr = expressions.ConstantExpression(value)
 
-    if limit is not None and method is None:
-      # If method is not None (and axis is 'columns'), we can do limit in
-      # a distributed way. Otherwise the limit is global, so it requires
-      # Singleton partitioning.
-      requires = partitionings.Singleton()
-    else:
-      requires = partitionings.Arbitrary()
-
     return frame_base.DeferredFrame.wrap(
         # yapf: disable
         expressions.ComputedExpression(
@@ -169,7 +175,7 @@
                 value, method=method, axis=axis, limit=limit, **kwargs),
             [self._expr, value_expr],
             preserves_partition_by=partitionings.Arbitrary(),
-            requires_partition_by=requires))
+            requires_partition_by=partitionings.Arbitrary()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -523,7 +529,11 @@
     if errors == "ignore":
       # We need all data in order to ignore errors and propagate the original
       # data.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              f"where(errors={errors!r}) is currently not parallelizable, "
+              "because all data must be collected on one node to determine if "
+              "the original data should be propagated instead."))
 
     actual_args['errors'] = errors
 
@@ -669,9 +679,11 @@
 
     if verify_integrity:
       # verifying output has a unique index requires global index.
-      # TODO(BEAM-11839): Attach an explanation to the Singleton partitioning
-      # requirement, and include it in raised errors.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              "append(verify_integrity=True) cannot currently be "
+              "parallelized. It requires collecting all data on a single node "
+              "to check the full index."))
     else:
       requires = partitionings.Arbitrary()
 
@@ -838,6 +850,10 @@
       return x._corr_aligned(y, min_periods)
 
     else:
+      reason = (
+          f"Encountered corr(method={method!r}) which cannot be "
+          "parallelized. Only corr(method='pearson') is currently "
+          "parallelizable.")
       # The rank-based correlations are not obviously parallelizable, though
       # perhaps an approximation could be done with a knowledge of quantiles
       # and custom partitioning.
@@ -847,9 +863,7 @@
               lambda df,
               other: df.corr(other, method=method, min_periods=min_periods),
               [self._expr, other._expr],
-              # TODO(BEAM-11839): Attach an explanation to the Singleton
-              # partitioning requirement, and include it in raised errors.
-              requires_partition_by=partitionings.Singleton()))
+              requires_partition_by=partitionings.Singleton(reason=reason)))
 
   def _corr_aligned(self, other, min_periods):
     std_x = self.std()
@@ -958,9 +972,16 @@
         return frame_base.DeferredFrame.wrap(
             expressions.ComputedExpression(
                 'aggregate',
-                lambda s: s.agg(func, *args, **kwargs), [intermediate],
+                lambda s: s.agg(func, *args, **kwargs),
+                [intermediate],
                 preserves_partition_by=partitionings.Arbitrary(),
-                requires_partition_by=partitionings.Singleton()))
+                # TODO(BEAM-11839): This reason should be more specific. It's
+                # actually incorrect for the args/kwargs case above.
+                requires_partition_by=partitionings.Singleton(
+                    reason=(
+                        f"Aggregation function {func!r} cannot currently be "
+                        "parallelized, it requires collecting all data for "
+                        "this Series on a single node."))))
 
   agg = aggregate
 
@@ -1119,7 +1140,10 @@
     if limit is None:
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(
+          reason=(
+              f"replace(limit={limit!r}) cannot currently be parallelized, it "
+              "requires collecting all data on a single node."))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'replace',
@@ -1154,7 +1178,8 @@
             'unique',
             lambda df: pd.Series(df.unique()), [self._expr],
             preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=partitionings.Singleton()))
+            requires_partition_by=partitionings.Singleton(
+                reason="unique() cannot currently be parallelized.")))
 
   def update(self, other):
     self._expr = expressions.ComputedExpression(
@@ -1289,7 +1314,9 @@
 
     if level is not None:
       # Could probably get by partitioning on the used levels.
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(reason=(
+          f"align(level={level}) is not currently parallelizable. Only "
+          "align(level=None) can be parallelized."))
     elif axis in ('columns', 1):
       requires_partition_by = partitionings.Arbitrary()
     else:
@@ -1391,8 +1418,6 @@
             preserves_partition_by=preserves,
             requires_partition_by=partitionings.Arbitrary()))
 
-
-
   def aggregate(self, func, axis=0, *args, **kwargs):
     if axis is None:
       # Aggregate across all elements by first aggregating across columns,
@@ -1414,6 +1439,7 @@
             'aggregate',
             lambda df: df.agg(func, *args, **kwargs),
             [self._expr],
+            # TODO(BEAM-11839): Provide a reason for this Singleton
             requires_partition_by=partitionings.Singleton()))
     else:
       # In the general case, compute the aggregation of each column separately,
@@ -1499,12 +1525,15 @@
                 proxy=proxy))
 
     else:
+      reason = (f"Encountered corr(method={method!r}) which cannot be "
+                "parallelized. Only corr(method='pearson') is currently "
+                "parallelizable.")
       return frame_base.DeferredFrame.wrap(
           expressions.ComputedExpression(
               'corr',
               lambda df: df.corr(method=method, min_periods=min_periods),
               [self._expr],
-              requires_partition_by=partitionings.Singleton()))
+              requires_partition_by=partitionings.Singleton(reason=reason)))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -1653,8 +1682,12 @@
             'mode',
             lambda df: df.mode(*args, **kwargs),
             [self._expr],
-            #TODO(robertwb): Approximate?
-            requires_partition_by=partitionings.Singleton(),
+            #TODO(BEAM-12181): Can we add an approximate implementation?
+            requires_partition_by=partitionings.Singleton(reason=(
+                "mode(axis='index') cannot currently be parallelized. See "
+                "BEAM-12181 tracking the possble addition of an approximate, "
+                "parallelizable implementation of mode."
+            )),
             preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1662,8 +1695,12 @@
   @frame_base.maybe_inplace
   def dropna(self, axis, **kwargs):
     # TODO(robertwb): This is a common pattern. Generalize?
-    if axis == 1 or axis == 'columns':
-      requires_partition_by = partitionings.Singleton()
+    if axis in (1, 'columns'):
+      requires_partition_by = partitionings.Singleton(reason=(
+          "dropna(axis=1) cannot currently be parallelized. It requires "
+          "checking all values in each column for NaN values, to determine "
+          "if that column should be dropped."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
     return frame_base.DeferredFrame.wrap(
@@ -1913,8 +1950,11 @@
       requires_partition_by = partitionings.Arbitrary()
       preserves_partition_by = partitionings.Index()
     else:
-      # TODO: This could be implemented in a distributed fashion
-      requires_partition_by = partitionings.Singleton()
+      # TODO(BEAM-9547): This could be implemented in a distributed fashion,
+      # perhaps by deferring to a distributed drop_duplicates
+      requires_partition_by = partitionings.Singleton(reason=(
+         "nunique(axis='index') is not currently parallelizable."
+      ))
       preserves_partition_by = partitionings.Singleton()
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
@@ -1941,22 +1981,31 @@
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
   def quantile(self, q, axis, **kwargs):
-    if axis in (1, 'columns') and isinstance(q, list):
-      raise frame_base.WontImplementError(
-          "quantile(axis=columns) with multiple q values is not supported "
-          "because it transposes the input DataFrame. Note computing "
-          "an individual quantile across columns (e.g. "
-          f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
-          reason="non-deferred-columns")
+    if axis in (1, 'columns'):
+      if isinstance(q, list):
+        raise frame_base.WontImplementError(
+            "quantile(axis=columns) with multiple q values is not supported "
+            "because it transposes the input DataFrame. Note computing "
+            "an individual quantile across columns (e.g. "
+            f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
+            reason="non-deferred-columns")
+      else:
+        requires = partitionings.Arbitrary()
+    else: # axis='index'
+      # TODO(BEAM-12167): Provide an option for approximate distributed
+      # quantiles
+      requires = partitionings.Singleton(reason=(
+          "Computing quantiles across index cannot currently be parallelized. "
+          "See BEAM-12167 tracking the possible addition of an approximate, "
+          "parallelizable implementation of quantile."
+      ))
 
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'quantile',
             lambda df: df.quantile(q=q, axis=axis, **kwargs),
             [self._expr],
-            # TODO(BEAM-12167): Provide an option for approximate distributed
-            # quantiles
-            requires_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires,
             preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1978,8 +2027,15 @@
       preserves_partition_by = partitionings.Index()
 
     if kwargs.get('errors', None) == 'raise' and rename_index:
-      # Renaming index with checking requires global index.
-      requires_partition_by = partitionings.Singleton()
+      # TODO: We could do this in parallel by creating a ConstantExpression
+      # with a series created from the mapper dict. Then Index() partitioning
+      # would co-locate the necessary index values and we could raise
+      # individually within each partition. Execution time errors are
+      # discouraged anyway so probably not worth the effort.
+      requires_partition_by = partitionings.Singleton(reason=(
+          "rename(errors='raise', axis='index') requires collecting all "
+          "data on a single node in order to detect missing index values."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
 
@@ -2014,7 +2070,9 @@
     if limit is None:
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(reason=(
+         f"replace(limit={limit!r}) cannot currently be parallelized, it "
+         "requires collecting all data on a single node."))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'replace',
@@ -2032,8 +2090,11 @@
     if level is not None and not isinstance(level, (tuple, list)):
       level = [level]
     if level is None or len(level) == self._expr.proxy().index.nlevels:
-      # TODO: Could do distributed re-index with offsets.
-      requires_partition_by = partitionings.Singleton()
+      # TODO(BEAM-12182): Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton(reason=(
+          "reset_index(level={level!r}) drops the entire index and creates a "
+          "new one, so it cannot currently be parallelized (BEAM-12182)."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
     return frame_base.DeferredFrame.wrap(
@@ -2073,10 +2134,12 @@
   def shift(self, axis, **kwargs):
     if 'freq' in kwargs:
       raise frame_base.WontImplementError('data-dependent')
-    if axis == 1 or axis == 'columns':
+    if axis in (1, 'columns'):
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      raise frame_base.WontImplementError(
+          "shift(axis='index') is sensitive to the order of the data.",
+          reason="order-sensitive")
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'shift',
@@ -2388,7 +2451,10 @@
             df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
             **kwargs),
         [pre_agg],
-        requires_partition_by=(partitionings.Singleton()
+        requires_partition_by=(partitionings.Singleton(reason=(
+            "Aggregations grouped by a categorical column are not currently "
+            "parallelizable (BEAM-11190)."
+        ))
                                if is_categorical_grouping
                                else partitionings.Index()),
         preserves_partition_by=partitionings.Arbitrary())
@@ -2416,7 +2482,10 @@
                        **groupby_kwargs),
             ), **kwargs),
         [self._ungrouped],
-        requires_partition_by=(partitionings.Singleton()
+        requires_partition_by=(partitionings.Singleton(reason=(
+            "Aggregations grouped by a categorical column are not currently "
+            "parallelizable (BEAM-11190)."
+        ))
                                if is_categorical_grouping
                                else partitionings.Index()),
         preserves_partition_by=partitionings.Arbitrary())
@@ -2633,7 +2702,10 @@
   def cat(self, others, join, **kwargs):
     if others is None:
       # Concatenate series into a single String
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(reason=(
+          "cat(others=None) concatenates all data in a Series into a single "
+          "string, so it requires collecting all data on a single node."
+      ))
       func = lambda df: df.str.cat(join=join, **kwargs)
       args = [self._expr]
 
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index b692f08..3315548 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -99,8 +99,15 @@
     deferred_args = _get_deferred_args(*args)
     if nonparallel:
       # First run outside a nonparallel block to confirm this raises as expected
-      with self.assertRaises(expressions.NonParallelOperation):
-        _ = func(*deferred_args)
+      with self.assertRaises(expressions.NonParallelOperation) as raised:
+        func(*deferred_args)
+
+      if raised.exception.msg.startswith(
+          "Encountered non-parallelizable form of"):
+        raise AssertionError(
+            "Default NonParallelOperation raised, please specify a reason in "
+            "the Singleton() partitioning requirement for this operation."
+        ) from raised.exception
 
       # Re-run in an allow non parallel block to get an expression to verify
       with beam.dataframe.allow_non_parallel_operations():
@@ -722,13 +729,14 @@
             lambda x: (x.foo + x.bar).median()),
         df)
 
-  def test_quantile_axis_columns(self):
+  def test_quantile(self):
     df = pd.DataFrame(
         np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])
 
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
+    self._run_test(lambda df: df.quantile(0.1), df, nonparallel=True)
+    self._run_test(lambda df: df.quantile([0.1, 0.9]), df, nonparallel=True)
 
+    self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
     with self.assertRaisesRegex(frame_base.WontImplementError,
                                 r"df\.quantile\(q=0\.1, axis='columns'\)"):
       self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
@@ -784,6 +792,23 @@
             id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),
         df)
 
+  def test_fillna_columns(self):
+    df = pd.DataFrame(
+        [[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],
+         [np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],
+        columns=list('ABCD'))
+
+    self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)
+    self._run_test(
+        lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)
+    self._run_test(
+        lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)
+
+    # Intended behavior is unclear here. See
+    # https://github.com/pandas-dev/pandas/issues/40989
+    # self._run_test(lambda df: df.fillna(axis='columns', value=100,
+    #                                     limit=2), df)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index f720112..43b9e93 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -40,7 +40,12 @@
             'pandas.core.generic.NDFrame.first': ['*'],
             'pandas.core.generic.NDFrame.head': ['*'],
             'pandas.core.generic.NDFrame.last': ['*'],
-            'pandas.core.generic.NDFrame.shift': ['*'],
+            'pandas.core.generic.NDFrame.shift': [
+                "df.shift(periods=3)",
+                'df.shift(periods=3, fill_value=0)',
+                'df.shift(periods=3, freq="D")',
+                'df.shift(periods=3, freq="infer")',
+            ],
             'pandas.core.generic.NDFrame.tail': ['*'],
             'pandas.core.generic.NDFrame.take': ['*'],
             'pandas.core.generic.NDFrame.values': ['*'],
@@ -189,8 +194,10 @@
             'pandas.core.frame.DataFrame.transpose': ['*'],
             'pandas.core.frame.DataFrame.shape': ['*'],
             'pandas.core.frame.DataFrame.shift': [
+                "df.shift(periods=3)",
+                'df.shift(periods=3, fill_value=0)',
                 'df.shift(periods=3, freq="D")',
-                'df.shift(periods=3, freq="infer")'
+                'df.shift(periods=3, freq="infer")',
             ],
             'pandas.core.frame.DataFrame.unstack': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py
index ef58023..afb71ba 100644
--- a/sdks/python/apache_beam/dataframe/partitionings.py
+++ b/sdks/python/apache_beam/dataframe/partitionings.py
@@ -151,6 +151,13 @@
 class Singleton(Partitioning):
   """A partitioning of all the data into a single partition.
   """
+  def __init__(self, reason=None):
+    self._reason = reason
+
+  @property
+  def reason(self):
+    return self._reason
+
   def __eq__(self, other):
     return type(self) == type(other)