Merge pull request #14709 from pabloem/revert-pr13617
Revert "Merge pull request #13617 from [BEAM-11289] [Python] Integrat…
diff --git a/.test-infra/jenkins/CommonJobProperties.groovy b/.test-infra/jenkins/CommonJobProperties.groovy
index 40f143a..851fc0b 100644
--- a/.test-infra/jenkins/CommonJobProperties.groovy
+++ b/.test-infra/jenkins/CommonJobProperties.groovy
@@ -49,7 +49,7 @@
// Discard old builds. Build records are only kept up to this number of days.
context.logRotator {
- daysToKeep(14)
+ daysToKeep(30)
}
// Source code management.
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 34fb037..25355fe 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -678,12 +678,8 @@
reason="order-sensitive")
if verify_integrity:
- # verifying output has a unique index requires global index.
- requires = partitionings.Singleton(
- reason=(
- "append(verify_integrity=True) cannot currently be "
- "parallelized. It requires collecting all data on a single node "
- "to check the full index."))
+ # We can verify the index is non-unique within index partitioned data.
+ requires = partitionings.Index()
else:
requires = partitionings.Arbitrary()
@@ -762,7 +758,12 @@
right = other._expr
right_is_series = False
else:
- raise frame_base.WontImplementError('non-deferred result')
+ raise frame_base.WontImplementError(
+ "other must be a DeferredDataFrame or DeferredSeries instance. "
+ "Passing a concrete list or numpy array is not supported. Those "
+ "types have no index and must be joined based on the order of the "
+ "data.",
+ reason="order-sensitive")
dots = expressions.ComputedExpression(
'dot',
@@ -1267,7 +1268,8 @@
elif _is_integer_slice(key):
# This depends on the contents of the index.
raise frame_base.WontImplementError(
- 'Use iloc or loc with integer slices.')
+ "Integer slices are not supported as they are ambiguous. Please "
+ "use iloc or loc with integer slices.")
else:
return self.loc[key]
@@ -1303,7 +1305,10 @@
@frame_base.populate_defaults(pd.DataFrame)
def align(self, other, join, axis, copy, level, method, **kwargs):
if not copy:
- raise frame_base.WontImplementError('align(copy=False)')
+ raise frame_base.WontImplementError(
+ "align(copy=False) is not supported because it might be an inplace "
+ "operation depending on the data. Please prefer the default "
+ "align(copy=True).")
if method is not None:
raise frame_base.WontImplementError(
f"align(method={method!r}) is not supported because it is "
@@ -1341,16 +1346,21 @@
"append(ignore_index=True) is order sensitive because it requires "
"generating a new index based on the order of the data.",
reason="order-sensitive")
+
if verify_integrity:
- raise frame_base.WontImplementError(
- "append(verify_integrity=True) produces an execution time error")
+ # We can verify the index is non-unique within index partitioned data.
+ requires = partitionings.Index()
+ else:
+ requires = partitionings.Arbitrary()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'append',
- lambda s, other: s.append(other, sort=sort, **kwargs),
+ lambda s, other: s.append(other, sort=sort,
+ verify_integrity=verify_integrity,
+ **kwargs),
[self._expr, other._expr],
- requires_partition_by=partitionings.Arbitrary(),
+ requires_partition_by=requires,
preserves_partition_by=partitionings.Arbitrary()
)
)
@@ -2131,22 +2141,37 @@
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
- def shift(self, axis, **kwargs):
- if 'freq' in kwargs:
- raise frame_base.WontImplementError('data-dependent')
+ def shift(self, axis, freq, **kwargs):
if axis in (1, 'columns'):
- requires_partition_by = partitionings.Arbitrary()
+ preserves = partitionings.Arbitrary()
+ proxy = None
else:
- raise frame_base.WontImplementError(
- "shift(axis='index') is sensitive to the order of the data.",
- reason="order-sensitive")
+ if freq is None or 'fill_value' in kwargs:
+ fill_value = kwargs.get('fill_value', 'NOT SET')
+ raise frame_base.WontImplementError(
+ f"shift(axis={axis!r}) is only supported with freq defined, and "
+ f"fill_value undefined (got freq={freq!r},"
+ f"fill_value={fill_value!r}). Other configurations are sensitive "
+ "to the order of the data because they require populating shifted "
+ "rows with `fill_value`.",
+ reason="order-sensitive")
+ # proxy generation fails in pandas <1.2
+ # Seems due to https://github.com/pandas-dev/pandas/issues/14811,
+ # bug with shift on empty indexes.
+ # Fortunately the proxy should be identical to the input.
+ proxy = self._expr.proxy().copy()
+
+ # index is modified, so no partitioning is preserved.
+ preserves = partitionings.Singleton()
+
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'shift',
- lambda df: df.shift(axis=axis, **kwargs),
+ lambda df: df.shift(axis=axis, freq=freq, **kwargs),
[self._expr],
- preserves_partition_by=partitionings.Singleton(),
- requires_partition_by=requires_partition_by))
+ proxy=proxy,
+ preserves_partition_by=preserves,
+ requires_partition_by=partitionings.Arbitrary()))
shape = property(frame_base.wont_implement_method(
pd.DataFrame, 'shape', reason="non-deferred-result"))
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 3315548..1cf1dfb 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -45,12 +45,15 @@
class DeferredFrameTest(unittest.TestCase):
- def _run_error_test(self, func, *args):
+ def _run_error_test(
+ self, func, *args, construction_time=True, distributed=True):
"""Verify that func(*args) raises the same exception in pandas and in Beam.
- Note that for Beam this only checks for exceptions that are raised during
- expression generation (i.e. construction time). Execution time exceptions
- are not helpful."""
+ Note that by default this only checks for exceptions that the Beam DataFrame
+ API raises during expression generation (i.e. construction time).
+ Exceptions raised while the pipeline is executing are less helpful, but
+ are sometimes unavoidable (e.g. data validation exceptions), to check for
+ these exceptions use construction_time=False."""
deferred_args = _get_deferred_args(*args)
# Get expected error
@@ -64,14 +67,29 @@
f"returned:\n{expected}")
# Get actual error
- try:
- _ = func(*deferred_args)._expr
- except Exception as e:
- actual = e
- else:
- raise AssertionError(
- "Expected an error:\n{expected_error}\nbut Beam successfully "
- "generated an expression.")
+ if construction_time:
+ try:
+ _ = func(*deferred_args)._expr
+ except Exception as e:
+ actual = e
+ else:
+ raise AssertionError(
+ f"Expected an error:\n{expected_error}\nbut Beam successfully "
+ f"generated an expression.")
+ else: # not construction_time
+ # Check for an error raised during pipeline execution
+ expr = func(*deferred_args)._expr
+ session_type = (
+ expressions.PartitioningSession
+ if distributed else expressions.Session)
+ try:
+ result = session_type({}).evaluate(expr)
+ except Exception as e:
+ actual = e
+ else:
+ raise AssertionError(
+ f"Expected an error:\n{expected_error}\nbut Beam successfully "
+ f"Computed the result:\n{result}.")
# Verify
if (not isinstance(actual, type(expected_error)) or
@@ -750,6 +768,7 @@
lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
def test_dataframe_melt(self):
+
df = pd.DataFrame({
'A': {
0: 'a', 1: 'b', 2: 'c'
@@ -809,6 +828,23 @@
# self._run_test(lambda df: df.fillna(axis='columns', value=100,
# limit=2), df)
+ def test_append_verify_integrity(self):
+ df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
+ df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
+
+ self._run_error_test(
+ lambda s1,
+ s2: s1.append(s2, verify_integrity=True),
+ df1['A'],
+ df2['A'],
+ construction_time=False)
+ self._run_error_test(
+ lambda df1,
+ df2: df1.append(df2, verify_integrity=True),
+ df1,
+ df2,
+ construction_time=False)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index 43b9e93..fcf18fa 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -41,10 +41,8 @@
'pandas.core.generic.NDFrame.head': ['*'],
'pandas.core.generic.NDFrame.last': ['*'],
'pandas.core.generic.NDFrame.shift': [
- "df.shift(periods=3)",
+ 'df.shift(periods=3)',
'df.shift(periods=3, fill_value=0)',
- 'df.shift(periods=3, freq="D")',
- 'df.shift(periods=3, freq="infer")',
],
'pandas.core.generic.NDFrame.tail': ['*'],
'pandas.core.generic.NDFrame.take': ['*'],
@@ -194,10 +192,8 @@
'pandas.core.frame.DataFrame.transpose': ['*'],
'pandas.core.frame.DataFrame.shape': ['*'],
'pandas.core.frame.DataFrame.shift': [
- "df.shift(periods=3)",
+ 'df.shift(periods=3)',
'df.shift(periods=3, fill_value=0)',
- 'df.shift(periods=3, freq="D")',
- 'df.shift(periods=3, freq="infer")',
],
'pandas.core.frame.DataFrame.unstack': ['*'],
'pandas.core.frame.DataFrame.memory_usage': ['*'],
@@ -402,7 +398,10 @@
],
'pandas.core.series.Series.pop': ['*'],
'pandas.core.series.Series.searchsorted': ['*'],
- 'pandas.core.series.Series.shift': ['*'],
+ 'pandas.core.series.Series.shift': [
+ 'df.shift(periods=3)',
+ 'df.shift(periods=3, fill_value=0)',
+ ],
'pandas.core.series.Series.take': ['*'],
'pandas.core.series.Series.to_dict': ['*'],
'pandas.core.series.Series.unique': ['*'],