[BEAM-11839] Add reason for most Singleton() requirements (#14569)
[BEAM-11839] Add reason for most Singleton() requirements
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 80ee149..54a15c5 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -348,10 +348,16 @@
preserves_partition_by: The level of partitioning preserved.
"""
if (not _get_allow_non_parallel() and
- requires_partition_by == partitionings.Singleton()):
+ isinstance(requires_partition_by, partitionings.Singleton)):
+ reason = requires_partition_by.reason or (
+ f"Encountered non-parallelizable form of {name!r}.")
+
raise NonParallelOperation(
- "Using non-parallel form of %s "
- "outside of allow_non_parallel_operations block." % name)
+ f"{reason}\n"
+ "Consider using an allow_non_parallel_operations block if you're "
+ "sure you want to do this. See "
+ "https://s.apache.org/dataframe-non-parallel-operations for more "
+ "information.")
args = tuple(args)
if proxy is None:
proxy = func(*(arg.proxy() for arg in args))
@@ -406,4 +412,6 @@
class NonParallelOperation(Exception):
- pass
+ def __init__(self, msg):
+ super(NonParallelOperation, self).__init__(self, msg)
+ self.msg = msg
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 1f892f4..34fb037 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -109,7 +109,11 @@
if index is not None and errors == 'raise':
# In order to raise an error about missing index values, we'll
# need to collect the entire dataframe.
- requires = partitionings.Singleton()
+ requires = partitionings.Singleton(
+ reason=(
+ "drop(errors='raise', axis='index') is not currently "
+ "parallelizable. This requires collecting all data on a single "
+ f"node in order to detect if one of {index!r} is missing."))
else:
requires = partitionings.Arbitrary()
@@ -142,24 +146,26 @@
def fillna(self, value, method, axis, limit, **kwargs):
# Default value is None, but is overriden with index.
axis = axis or 'index'
- if method is not None and axis in (0, 'index'):
- raise frame_base.WontImplementError(
- f"fillna(method={method!r}) is not supported because it is "
- "order-sensitive. Only fillna(method=None) is supported.",
- reason="order-sensitive")
+
+ if axis in (0, 'index'):
+ if method is not None:
+ raise frame_base.WontImplementError(
+ f"fillna(method={method!r}, axis={axis!r}) is not supported "
+ "because it is order-sensitive. Only fillna(method=None) is "
+ f"supported with axis={axis!r}.",
+ reason="order-sensitive")
+ if limit is not None:
+ raise frame_base.WontImplementError(
+ f"fillna(limit={method!r}, axis={axis!r}) is not supported because "
+ "it is order-sensitive. Only fillna(limit=None) is supported with "
+ f"axis={axis!r}.",
+ reason="order-sensitive")
+
if isinstance(value, frame_base.DeferredBase):
value_expr = value._expr
else:
value_expr = expressions.ConstantExpression(value)
- if limit is not None and method is None:
- # If method is not None (and axis is 'columns'), we can do limit in
- # a distributed way. Otherwise the limit is global, so it requires
- # Singleton partitioning.
- requires = partitionings.Singleton()
- else:
- requires = partitionings.Arbitrary()
-
return frame_base.DeferredFrame.wrap(
# yapf: disable
expressions.ComputedExpression(
@@ -169,7 +175,7 @@
value, method=method, axis=axis, limit=limit, **kwargs),
[self._expr, value_expr],
preserves_partition_by=partitionings.Arbitrary(),
- requires_partition_by=requires))
+ requires_partition_by=partitionings.Arbitrary()))
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@@ -523,7 +529,11 @@
if errors == "ignore":
# We need all data in order to ignore errors and propagate the original
# data.
- requires = partitionings.Singleton()
+ requires = partitionings.Singleton(
+ reason=(
+ f"where(errors={errors!r}) is currently not parallelizable, "
+ "because all data must be collected on one node to determine if "
+ "the original data should be propagated instead."))
actual_args['errors'] = errors
@@ -669,9 +679,11 @@
if verify_integrity:
# verifying output has a unique index requires global index.
- # TODO(BEAM-11839): Attach an explanation to the Singleton partitioning
- # requirement, and include it in raised errors.
- requires = partitionings.Singleton()
+ requires = partitionings.Singleton(
+ reason=(
+ "append(verify_integrity=True) cannot currently be "
+ "parallelized. It requires collecting all data on a single node "
+ "to check the full index."))
else:
requires = partitionings.Arbitrary()
@@ -838,6 +850,10 @@
return x._corr_aligned(y, min_periods)
else:
+ reason = (
+ f"Encountered corr(method={method!r}) which cannot be "
+ "parallelized. Only corr(method='pearson') is currently "
+ "parallelizable.")
# The rank-based correlations are not obviously parallelizable, though
# perhaps an approximation could be done with a knowledge of quantiles
# and custom partitioning.
@@ -847,9 +863,7 @@
lambda df,
other: df.corr(other, method=method, min_periods=min_periods),
[self._expr, other._expr],
- # TODO(BEAM-11839): Attach an explanation to the Singleton
- # partitioning requirement, and include it in raised errors.
- requires_partition_by=partitionings.Singleton()))
+ requires_partition_by=partitionings.Singleton(reason=reason)))
def _corr_aligned(self, other, min_periods):
std_x = self.std()
@@ -958,9 +972,16 @@
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda s: s.agg(func, *args, **kwargs), [intermediate],
+ lambda s: s.agg(func, *args, **kwargs),
+ [intermediate],
preserves_partition_by=partitionings.Arbitrary(),
- requires_partition_by=partitionings.Singleton()))
+ # TODO(BEAM-11839): This reason should be more specific. It's
+ # actually incorrect for the args/kwargs case above.
+ requires_partition_by=partitionings.Singleton(
+ reason=(
+ f"Aggregation function {func!r} cannot currently be "
+ "parallelized, it requires collecting all data for "
+ "this Series on a single node."))))
agg = aggregate
@@ -1119,7 +1140,10 @@
if limit is None:
requires_partition_by = partitionings.Arbitrary()
else:
- requires_partition_by = partitionings.Singleton()
+ requires_partition_by = partitionings.Singleton(
+ reason=(
+ f"replace(limit={limit!r}) cannot currently be parallelized, it "
+ "requires collecting all data on a single node."))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
@@ -1154,7 +1178,8 @@
'unique',
lambda df: pd.Series(df.unique()), [self._expr],
preserves_partition_by=partitionings.Singleton(),
- requires_partition_by=partitionings.Singleton()))
+ requires_partition_by=partitionings.Singleton(
+ reason="unique() cannot currently be parallelized.")))
def update(self, other):
self._expr = expressions.ComputedExpression(
@@ -1289,7 +1314,9 @@
if level is not None:
# Could probably get by partitioning on the used levels.
- requires_partition_by = partitionings.Singleton()
+ requires_partition_by = partitionings.Singleton(reason=(
+ f"align(level={level}) is not currently parallelizable. Only "
+ "align(level=None) can be parallelized."))
elif axis in ('columns', 1):
requires_partition_by = partitionings.Arbitrary()
else:
@@ -1391,8 +1418,6 @@
preserves_partition_by=preserves,
requires_partition_by=partitionings.Arbitrary()))
-
-
def aggregate(self, func, axis=0, *args, **kwargs):
if axis is None:
# Aggregate across all elements by first aggregating across columns,
@@ -1414,6 +1439,7 @@
'aggregate',
lambda df: df.agg(func, *args, **kwargs),
[self._expr],
+ # TODO(BEAM-11839): Provide a reason for this Singleton
requires_partition_by=partitionings.Singleton()))
else:
# In the general case, compute the aggregation of each column separately,
@@ -1499,12 +1525,15 @@
proxy=proxy))
else:
+ reason = (f"Encountered corr(method={method!r}) which cannot be "
+ "parallelized. Only corr(method='pearson') is currently "
+ "parallelizable.")
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'corr',
lambda df: df.corr(method=method, min_periods=min_periods),
[self._expr],
- requires_partition_by=partitionings.Singleton()))
+ requires_partition_by=partitionings.Singleton(reason=reason)))
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@@ -1653,8 +1682,12 @@
'mode',
lambda df: df.mode(*args, **kwargs),
[self._expr],
- #TODO(robertwb): Approximate?
- requires_partition_by=partitionings.Singleton(),
+ #TODO(BEAM-12181): Can we add an approximate implementation?
+ requires_partition_by=partitionings.Singleton(reason=(
+ "mode(axis='index') cannot currently be parallelized. See "
+ "BEAM-12181 tracking the possble addition of an approximate, "
+ "parallelizable implementation of mode."
+ )),
preserves_partition_by=partitionings.Singleton()))
@frame_base.args_to_kwargs(pd.DataFrame)
@@ -1662,8 +1695,12 @@
@frame_base.maybe_inplace
def dropna(self, axis, **kwargs):
# TODO(robertwb): This is a common pattern. Generalize?
- if axis == 1 or axis == 'columns':
- requires_partition_by = partitionings.Singleton()
+ if axis in (1, 'columns'):
+ requires_partition_by = partitionings.Singleton(reason=(
+ "dropna(axis=1) cannot currently be parallelized. It requires "
+ "checking all values in each column for NaN values, to determine "
+ "if that column should be dropped."
+ ))
else:
requires_partition_by = partitionings.Arbitrary()
return frame_base.DeferredFrame.wrap(
@@ -1913,8 +1950,11 @@
requires_partition_by = partitionings.Arbitrary()
preserves_partition_by = partitionings.Index()
else:
- # TODO: This could be implemented in a distributed fashion
- requires_partition_by = partitionings.Singleton()
+ # TODO(BEAM-9547): This could be implemented in a distributed fashion,
+ # perhaps by deferring to a distributed drop_duplicates
+ requires_partition_by = partitionings.Singleton(reason=(
+ "nunique(axis='index') is not currently parallelizable."
+ ))
preserves_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
@@ -1941,22 +1981,31 @@
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def quantile(self, q, axis, **kwargs):
- if axis in (1, 'columns') and isinstance(q, list):
- raise frame_base.WontImplementError(
- "quantile(axis=columns) with multiple q values is not supported "
- "because it transposes the input DataFrame. Note computing "
- "an individual quantile across columns (e.g. "
- f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
- reason="non-deferred-columns")
+ if axis in (1, 'columns'):
+ if isinstance(q, list):
+ raise frame_base.WontImplementError(
+ "quantile(axis=columns) with multiple q values is not supported "
+ "because it transposes the input DataFrame. Note computing "
+ "an individual quantile across columns (e.g. "
+ f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
+ reason="non-deferred-columns")
+ else:
+ requires = partitionings.Arbitrary()
+ else: # axis='index'
+ # TODO(BEAM-12167): Provide an option for approximate distributed
+ # quantiles
+ requires = partitionings.Singleton(reason=(
+ "Computing quantiles across index cannot currently be parallelized. "
+ "See BEAM-12167 tracking the possible addition of an approximate, "
+ "parallelizable implementation of quantile."
+ ))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'quantile',
lambda df: df.quantile(q=q, axis=axis, **kwargs),
[self._expr],
- # TODO(BEAM-12167): Provide an option for approximate distributed
- # quantiles
- requires_partition_by=partitionings.Singleton(),
+ requires_partition_by=requires,
preserves_partition_by=partitionings.Singleton()))
@frame_base.args_to_kwargs(pd.DataFrame)
@@ -1978,8 +2027,15 @@
preserves_partition_by = partitionings.Index()
if kwargs.get('errors', None) == 'raise' and rename_index:
- # Renaming index with checking requires global index.
- requires_partition_by = partitionings.Singleton()
+ # TODO: We could do this in parallel by creating a ConstantExpression
+ # with a series created from the mapper dict. Then Index() partitioning
+ # would co-locate the necessary index values and we could raise
+ # individually within each partition. Execution time errors are
+ # discouraged anyway so probably not worth the effort.
+ requires_partition_by = partitionings.Singleton(reason=(
+ "rename(errors='raise', axis='index') requires collecting all "
+ "data on a single node in order to detect missing index values."
+ ))
else:
requires_partition_by = partitionings.Arbitrary()
@@ -2014,7 +2070,9 @@
if limit is None:
requires_partition_by = partitionings.Arbitrary()
else:
- requires_partition_by = partitionings.Singleton()
+ requires_partition_by = partitionings.Singleton(reason=(
+ f"replace(limit={limit!r}) cannot currently be parallelized, it "
+ "requires collecting all data on a single node."))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
@@ -2032,8 +2090,11 @@
if level is not None and not isinstance(level, (tuple, list)):
level = [level]
if level is None or len(level) == self._expr.proxy().index.nlevels:
- # TODO: Could do distributed re-index with offsets.
- requires_partition_by = partitionings.Singleton()
+ # TODO(BEAM-12182): Could do distributed re-index with offsets.
+ requires_partition_by = partitionings.Singleton(reason=(
+ "reset_index(level={level!r}) drops the entire index and creates a "
+ "new one, so it cannot currently be parallelized (BEAM-12182)."
+ ))
else:
requires_partition_by = partitionings.Arbitrary()
return frame_base.DeferredFrame.wrap(
@@ -2073,10 +2134,12 @@
def shift(self, axis, **kwargs):
if 'freq' in kwargs:
raise frame_base.WontImplementError('data-dependent')
- if axis == 1 or axis == 'columns':
+ if axis in (1, 'columns'):
requires_partition_by = partitionings.Arbitrary()
else:
- requires_partition_by = partitionings.Singleton()
+ raise frame_base.WontImplementError(
+ "shift(axis='index') is sensitive to the order of the data.",
+ reason="order-sensitive")
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'shift',
@@ -2388,7 +2451,10 @@
df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
**kwargs),
[pre_agg],
- requires_partition_by=(partitionings.Singleton()
+ requires_partition_by=(partitionings.Singleton(reason=(
+ "Aggregations grouped by a categorical column are not currently "
+ "parallelizable (BEAM-11190)."
+ ))
if is_categorical_grouping
else partitionings.Index()),
preserves_partition_by=partitionings.Arbitrary())
@@ -2416,7 +2482,10 @@
**groupby_kwargs),
), **kwargs),
[self._ungrouped],
- requires_partition_by=(partitionings.Singleton()
+ requires_partition_by=(partitionings.Singleton(reason=(
+ "Aggregations grouped by a categorical column are not currently "
+ "parallelizable (BEAM-11190)."
+ ))
if is_categorical_grouping
else partitionings.Index()),
preserves_partition_by=partitionings.Arbitrary())
@@ -2633,7 +2702,10 @@
def cat(self, others, join, **kwargs):
if others is None:
# Concatenate series into a single String
- requires = partitionings.Singleton()
+ requires = partitionings.Singleton(reason=(
+ "cat(others=None) concatenates all data in a Series into a single "
+ "string, so it requires collecting all data on a single node."
+ ))
func = lambda df: df.str.cat(join=join, **kwargs)
args = [self._expr]
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index b692f08..3315548 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -99,8 +99,15 @@
deferred_args = _get_deferred_args(*args)
if nonparallel:
# First run outside a nonparallel block to confirm this raises as expected
- with self.assertRaises(expressions.NonParallelOperation):
- _ = func(*deferred_args)
+ with self.assertRaises(expressions.NonParallelOperation) as raised:
+ func(*deferred_args)
+
+ if raised.exception.msg.startswith(
+ "Encountered non-parallelizable form of"):
+ raise AssertionError(
+ "Default NonParallelOperation raised, please specify a reason in "
+ "the Singleton() partitioning requirement for this operation."
+ ) from raised.exception
# Re-run in an allow non parallel block to get an expression to verify
with beam.dataframe.allow_non_parallel_operations():
@@ -722,13 +729,14 @@
lambda x: (x.foo + x.bar).median()),
df)
- def test_quantile_axis_columns(self):
+ def test_quantile(self):
df = pd.DataFrame(
np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])
- with beam.dataframe.allow_non_parallel_operations():
- self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
+ self._run_test(lambda df: df.quantile(0.1), df, nonparallel=True)
+ self._run_test(lambda df: df.quantile([0.1, 0.9]), df, nonparallel=True)
+ self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
with self.assertRaisesRegex(frame_base.WontImplementError,
r"df\.quantile\(q=0\.1, axis='columns'\)"):
self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
@@ -784,6 +792,23 @@
id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),
df)
+ def test_fillna_columns(self):
+ df = pd.DataFrame(
+ [[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],
+ [np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],
+ columns=list('ABCD'))
+
+ self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)
+ self._run_test(
+ lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)
+ self._run_test(
+ lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)
+
+ # Intended behavior is unclear here. See
+ # https://github.com/pandas-dev/pandas/issues/40989
+ # self._run_test(lambda df: df.fillna(axis='columns', value=100,
+ # limit=2), df)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index f720112..43b9e93 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -40,7 +40,12 @@
'pandas.core.generic.NDFrame.first': ['*'],
'pandas.core.generic.NDFrame.head': ['*'],
'pandas.core.generic.NDFrame.last': ['*'],
- 'pandas.core.generic.NDFrame.shift': ['*'],
+ 'pandas.core.generic.NDFrame.shift': [
+ "df.shift(periods=3)",
+ 'df.shift(periods=3, fill_value=0)',
+ 'df.shift(periods=3, freq="D")',
+ 'df.shift(periods=3, freq="infer")',
+ ],
'pandas.core.generic.NDFrame.tail': ['*'],
'pandas.core.generic.NDFrame.take': ['*'],
'pandas.core.generic.NDFrame.values': ['*'],
@@ -189,8 +194,10 @@
'pandas.core.frame.DataFrame.transpose': ['*'],
'pandas.core.frame.DataFrame.shape': ['*'],
'pandas.core.frame.DataFrame.shift': [
+ "df.shift(periods=3)",
+ 'df.shift(periods=3, fill_value=0)',
'df.shift(periods=3, freq="D")',
- 'df.shift(periods=3, freq="infer")'
+ 'df.shift(periods=3, freq="infer")',
],
'pandas.core.frame.DataFrame.unstack': ['*'],
'pandas.core.frame.DataFrame.memory_usage': ['*'],
diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py
index ef58023..afb71ba 100644
--- a/sdks/python/apache_beam/dataframe/partitionings.py
+++ b/sdks/python/apache_beam/dataframe/partitionings.py
@@ -151,6 +151,13 @@
class Singleton(Partitioning):
"""A partitioning of all the data into a single partition.
"""
+ def __init__(self, reason=None):
+ self._reason = reason
+
+ @property
+ def reason(self):
+ return self._reason
+
def __eq__(self, other):
return type(self) == type(other)