Add DataFrame.insert implementation
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 3a60c28..42df75b 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -1478,6 +1478,47 @@
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
+ def insert(self, value, **kwargs):
+ if isinstance(value, list):
+ raise frame_base.WontImplementMethod(
+ "insert(value=list) is not supported because it joins the input "
+ "list to the deferred DataFrame based on the order of the data.",
+ reason="order-sensitive")
+
+ if isinstance(value, pd.core.generic.NDFrame):
+ value = frame_base.DeferredFrame.wrap(
+ expressions.ConstantExpression(value))
+
+ if isinstance(value, frame_base.DeferredFrame):
+ def func_zip(df, value):
+ df = df.copy()
+ df.insert(value=value, **kwargs)
+ return df
+
+ inserted = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'insert',
+ func_zip,
+ [self._expr, value._expr],
+ requires_partition_by=partitionings.Index(),
+ preserves_partition_by=partitionings.Arbitrary()))
+ else:
+ def func_elementwise(df):
+ df = df.copy()
+ df.insert(value=value, **kwargs)
+ return df
+ inserted = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'insert',
+ func_elementwise,
+ [self._expr],
+ requires_partition_by=partitionings.Arbitrary(),
+ preserves_partition_by=partitionings.Arbitrary()))
+
+ self._expr = inserted._expr
+
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
def aggregate(self, func, axis=0, *args, **kwargs):
if 'numeric_only' in kwargs and kwargs['numeric_only']:
# Eagerly generate a proxy to make sure numeric_only is a valid argument
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 541d5c2..fcf5963 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -1051,6 +1051,14 @@
# projecting only numeric columns should too
self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
+ def test_insert(self):
+ df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
+
+ self._run_inplace_test(lambda df: df.insert(1, 'C', df.A * 2), df)
+ self._run_inplace_test(
+ lambda df: df.insert(0, 'foo', pd.Series([8], index=[1])), df)
+ self._run_inplace_test(lambda df: df.insert(2, 'bar', value='q'), df)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):