Merge pull request #14709 from pabloem/revert-pr13617

Revert "Merge pull request #13617 from [BEAM-11289] [Python] Integrat…
diff --git a/.test-infra/jenkins/CommonJobProperties.groovy b/.test-infra/jenkins/CommonJobProperties.groovy
index 40f143a..851fc0b 100644
--- a/.test-infra/jenkins/CommonJobProperties.groovy
+++ b/.test-infra/jenkins/CommonJobProperties.groovy
@@ -49,7 +49,7 @@
 
     // Discard old builds. Build records are only kept up to this number of days.
     context.logRotator {
-      daysToKeep(14)
+      daysToKeep(30)
     }
 
     // Source code management.
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 34fb037..25355fe 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -678,12 +678,8 @@
           reason="order-sensitive")
 
     if verify_integrity:
-      # verifying output has a unique index requires global index.
-      requires = partitionings.Singleton(
-          reason=(
-              "append(verify_integrity=True) cannot currently be "
-              "parallelized. It requires collecting all data on a single node "
-              "to check the full index."))
+      # We can verify the index is non-unique within index partitioned data.
+      requires = partitionings.Index()
     else:
       requires = partitionings.Arbitrary()
 
@@ -762,7 +758,12 @@
       right = other._expr
       right_is_series = False
     else:
-      raise frame_base.WontImplementError('non-deferred result')
+      raise frame_base.WontImplementError(
+          "other must be a DeferredDataFrame or DeferredSeries instance. "
+          "Passing a concrete list or numpy array is not supported. Those "
+          "types have no index and must be joined based on the order of the "
+          "data.",
+          reason="order-sensitive")
 
     dots = expressions.ComputedExpression(
         'dot',
@@ -1267,7 +1268,8 @@
       elif _is_integer_slice(key):
         # This depends on the contents of the index.
         raise frame_base.WontImplementError(
-            'Use iloc or loc with integer slices.')
+            "Integer slices are not supported as they are ambiguous. Please "
+            "use iloc or loc with integer slices.")
       else:
         return self.loc[key]
 
@@ -1303,7 +1305,10 @@
   @frame_base.populate_defaults(pd.DataFrame)
   def align(self, other, join, axis, copy, level, method, **kwargs):
     if not copy:
-      raise frame_base.WontImplementError('align(copy=False)')
+      raise frame_base.WontImplementError(
+          "align(copy=False) is not supported because it might be an inplace "
+          "operation depending on the data. Please prefer the default "
+          "align(copy=True).")
     if method is not None:
       raise frame_base.WontImplementError(
           f"align(method={method!r}) is not supported because it is "
@@ -1341,16 +1346,21 @@
           "append(ignore_index=True) is order sensitive because it requires "
           "generating a new index based on the order of the data.",
           reason="order-sensitive")
+
     if verify_integrity:
-      raise frame_base.WontImplementError(
-          "append(verify_integrity=True) produces an execution time error")
+      # We can verify the index is non-unique within index partitioned data.
+      requires = partitionings.Index()
+    else:
+      requires = partitionings.Arbitrary()
 
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'append',
-            lambda s, other: s.append(other, sort=sort, **kwargs),
+            lambda s, other: s.append(other, sort=sort,
+                                      verify_integrity=verify_integrity,
+                                      **kwargs),
             [self._expr, other._expr],
-            requires_partition_by=partitionings.Arbitrary(),
+            requires_partition_by=requires,
             preserves_partition_by=partitionings.Arbitrary()
         )
     )
@@ -2131,22 +2141,37 @@
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
-  def shift(self, axis, **kwargs):
-    if 'freq' in kwargs:
-      raise frame_base.WontImplementError('data-dependent')
+  def shift(self, axis, freq, **kwargs):
     if axis in (1, 'columns'):
-      requires_partition_by = partitionings.Arbitrary()
+      preserves = partitionings.Arbitrary()
+      proxy = None
     else:
-      raise frame_base.WontImplementError(
-          "shift(axis='index') is sensitive to the order of the data.",
-          reason="order-sensitive")
+      if freq is None or 'fill_value' in kwargs:
+        fill_value = kwargs.get('fill_value', 'NOT SET')
+        raise frame_base.WontImplementError(
+            f"shift(axis={axis!r}) is only supported with freq defined, and "
+            f"fill_value undefined (got freq={freq!r},"
+            f"fill_value={fill_value!r}). Other configurations are sensitive "
+            "to the order of the data because they require populating shifted "
+            "rows with `fill_value`.",
+            reason="order-sensitive")
+      # proxy generation fails in pandas <1.2
+      # Seems due to https://github.com/pandas-dev/pandas/issues/14811,
+      # bug with shift on empty indexes.
+      # Fortunately the proxy should be identical to the input.
+      proxy = self._expr.proxy().copy()
+
+      # index is modified, so no partitioning is preserved.
+      preserves = partitionings.Singleton()
+
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'shift',
-            lambda df: df.shift(axis=axis, **kwargs),
+            lambda df: df.shift(axis=axis, freq=freq, **kwargs),
             [self._expr],
-            preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=requires_partition_by))
+            proxy=proxy,
+            preserves_partition_by=preserves,
+            requires_partition_by=partitionings.Arbitrary()))
 
   shape = property(frame_base.wont_implement_method(
       pd.DataFrame, 'shape', reason="non-deferred-result"))
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 3315548..1cf1dfb 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -45,12 +45,15 @@
 
 
 class DeferredFrameTest(unittest.TestCase):
-  def _run_error_test(self, func, *args):
+  def _run_error_test(
+      self, func, *args, construction_time=True, distributed=True):
     """Verify that func(*args) raises the same exception in pandas and in Beam.
 
-    Note that for Beam this only checks for exceptions that are raised during
-    expression generation (i.e. construction time). Execution time exceptions
-    are not helpful."""
+    Note that by default this only checks for exceptions that the Beam DataFrame
+    API raises during expression generation (i.e. construction time).
+    Exceptions raised while the pipeline is executing are less helpful, but
+    are sometimes unavoidable (e.g. data validation exceptions), to check for
+    these exceptions use construction_time=False."""
     deferred_args = _get_deferred_args(*args)
 
     # Get expected error
@@ -64,14 +67,29 @@
           f"returned:\n{expected}")
 
     # Get actual error
-    try:
-      _ = func(*deferred_args)._expr
-    except Exception as e:
-      actual = e
-    else:
-      raise AssertionError(
-          "Expected an error:\n{expected_error}\nbut Beam successfully "
-          "generated an expression.")
+    if construction_time:
+      try:
+        _ = func(*deferred_args)._expr
+      except Exception as e:
+        actual = e
+      else:
+        raise AssertionError(
+            f"Expected an error:\n{expected_error}\nbut Beam successfully "
+            f"generated an expression.")
+    else:  # not construction_time
+      # Check for an error raised during pipeline execution
+      expr = func(*deferred_args)._expr
+      session_type = (
+          expressions.PartitioningSession
+          if distributed else expressions.Session)
+      try:
+        result = session_type({}).evaluate(expr)
+      except Exception as e:
+        actual = e
+      else:
+        raise AssertionError(
+            f"Expected an error:\n{expected_error}\nbut Beam successfully "
+            f"Computed the result:\n{result}.")
 
     # Verify
     if (not isinstance(actual, type(expected_error)) or
@@ -750,6 +768,7 @@
         lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
 
   def test_dataframe_melt(self):
+
     df = pd.DataFrame({
         'A': {
             0: 'a', 1: 'b', 2: 'c'
@@ -809,6 +828,23 @@
     # self._run_test(lambda df: df.fillna(axis='columns', value=100,
     #                                     limit=2), df)
 
+  def test_append_verify_integrity(self):
+    df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
+    df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
+
+    self._run_error_test(
+        lambda s1,
+        s2: s1.append(s2, verify_integrity=True),
+        df1['A'],
+        df2['A'],
+        construction_time=False)
+    self._run_error_test(
+        lambda df1,
+        df2: df1.append(df2, verify_integrity=True),
+        df1,
+        df2,
+        construction_time=False)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index 43b9e93..fcf18fa 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -41,10 +41,8 @@
             'pandas.core.generic.NDFrame.head': ['*'],
             'pandas.core.generic.NDFrame.last': ['*'],
             'pandas.core.generic.NDFrame.shift': [
-                "df.shift(periods=3)",
+                'df.shift(periods=3)',
                 'df.shift(periods=3, fill_value=0)',
-                'df.shift(periods=3, freq="D")',
-                'df.shift(periods=3, freq="infer")',
             ],
             'pandas.core.generic.NDFrame.tail': ['*'],
             'pandas.core.generic.NDFrame.take': ['*'],
@@ -194,10 +192,8 @@
             'pandas.core.frame.DataFrame.transpose': ['*'],
             'pandas.core.frame.DataFrame.shape': ['*'],
             'pandas.core.frame.DataFrame.shift': [
-                "df.shift(periods=3)",
+                'df.shift(periods=3)',
                 'df.shift(periods=3, fill_value=0)',
-                'df.shift(periods=3, freq="D")',
-                'df.shift(periods=3, freq="infer")',
             ],
             'pandas.core.frame.DataFrame.unstack': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
@@ -402,7 +398,10 @@
             ],
             'pandas.core.series.Series.pop': ['*'],
             'pandas.core.series.Series.searchsorted': ['*'],
-            'pandas.core.series.Series.shift': ['*'],
+            'pandas.core.series.Series.shift': [
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
+            ],
             'pandas.core.series.Series.take': ['*'],
             'pandas.core.series.Series.to_dict': ['*'],
             'pandas.core.series.Series.unique': ['*'],