[BEAM-9547] Add DataFrame.insert implementation (#14663)

[BEAM-9547] Add DataFrame.insert implementation
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 3a60c28..42df75b 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -1478,6 +1478,47 @@
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
+  def insert(self, value, **kwargs):
+    if isinstance(value, list):
+      raise frame_base.WontImplementMethod(
+          "insert(value=list) is not supported because it joins the input "
+          "list to the deferred DataFrame based on the order of the data.",
+          reason="order-sensitive")
+
+    if isinstance(value, pd.core.generic.NDFrame):
+      value = frame_base.DeferredFrame.wrap(
+          expressions.ConstantExpression(value))
+
+    if isinstance(value, frame_base.DeferredFrame):
+      def func_zip(df, value):
+        df = df.copy()
+        df.insert(value=value, **kwargs)
+        return df
+
+      inserted = frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'insert',
+              func_zip,
+              [self._expr, value._expr],
+              requires_partition_by=partitionings.Index(),
+              preserves_partition_by=partitionings.Arbitrary()))
+    else:
+      def func_elementwise(df):
+        df = df.copy()
+        df.insert(value=value, **kwargs)
+        return df
+      inserted = frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'insert',
+              func_elementwise,
+              [self._expr],
+              requires_partition_by=partitionings.Arbitrary(),
+              preserves_partition_by=partitionings.Arbitrary()))
+
+    self._expr = inserted._expr
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
   def aggregate(self, func, axis=0, *args, **kwargs):
     if 'numeric_only' in kwargs and kwargs['numeric_only']:
       # Eagerly generate a proxy to make sure numeric_only is a valid argument
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 55e3892..fcf5963 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -98,6 +98,18 @@
           f'Expected {expected_error!r} to be raised, but got {actual!r}'
       ) from actual
 
+  def _run_inplace_test(self, func, arg, **kwargs):
+    """Verify an inplace operation performed by func.
+
+    Checks that func performs the same inplace operation on arg, in pandas and
+    in Beam."""
+    def wrapper(df):
+      df = df.copy()
+      func(df)
+      return df
+
+    self._run_test(wrapper, arg, **kwargs)
+
   def _run_test(self, func, *args, distributed=True, nonparallel=False):
     """Verify that func(*args) produces the same result in pandas and in Beam.
 
@@ -185,13 +197,12 @@
   def test_set_column(self):
     def new_column(df):
       df['NewCol'] = df['Speed']
-      return df
 
     df = pd.DataFrame({
         'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
         'Speed': [380., 370., 24., 26.]
     })
-    self._run_test(new_column, df)
+    self._run_inplace_test(new_column, df)
 
   def test_str_split(self):
     s = pd.Series([
@@ -212,13 +223,12 @@
   def test_set_column_from_index(self):
     def new_column(df):
       df['NewCol'] = df.index
-      return df
 
     df = pd.DataFrame({
         'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
         'Speed': [380., 370., 24., 26.]
     })
-    self._run_test(new_column, df)
+    self._run_inplace_test(new_column, df)
 
   def test_tz_localize_ambiguous_series(self):
     # This replicates a tz_localize doctest:
@@ -706,11 +716,7 @@
     self._run_test(lambda df: df.eval('foo = a + b - c'), df)
     self._run_test(lambda df: df.query('a > b + c'), df)
 
-    def eval_inplace(df):
-      df.eval('foo = a + b - c', inplace=True)
-      return df.foo
-
-    self._run_test(eval_inplace, df)
+    self._run_inplace_test(lambda df: df.eval('foo = a + b - c'), df)
 
     # Verify that attempting to access locals raises a useful error
     deferred_df = frame_base.DeferredFrame.wrap(
@@ -726,9 +732,8 @@
 
     def change_index_names(df):
       df.index.names = ['A', None]
-      return df
 
-    self._run_test(change_index_names, df)
+    self._run_inplace_test(change_index_names, df)
 
   @parameterized.expand((x, ) for x in [
       0,
@@ -1046,6 +1051,14 @@
     # projecting only numeric columns should too
     self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
 
+  def test_insert(self):
+    df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
+
+    self._run_inplace_test(lambda df: df.insert(1, 'C', df.A * 2), df)
+    self._run_inplace_test(
+        lambda df: df.insert(0, 'foo', pd.Series([8], index=[1])), df)
+    self._run_inplace_test(lambda df: df.insert(2, 'bar', value='q'), df)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):