[BEAM-9547] Add DataFrame.insert implementation (#14663)
[BEAM-9547] Add DataFrame.insert implementation
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 3a60c28..42df75b 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -1478,6 +1478,47 @@
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
+ def insert(self, value, **kwargs):
+ if isinstance(value, list):
+ raise frame_base.WontImplementMethod(
+ "insert(value=list) is not supported because it joins the input "
+ "list to the deferred DataFrame based on the order of the data.",
+ reason="order-sensitive")
+
+ if isinstance(value, pd.core.generic.NDFrame):
+ value = frame_base.DeferredFrame.wrap(
+ expressions.ConstantExpression(value))
+
+ if isinstance(value, frame_base.DeferredFrame):
+ def func_zip(df, value):
+ df = df.copy()
+ df.insert(value=value, **kwargs)
+ return df
+
+ inserted = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'insert',
+ func_zip,
+ [self._expr, value._expr],
+ requires_partition_by=partitionings.Index(),
+ preserves_partition_by=partitionings.Arbitrary()))
+ else:
+ def func_elementwise(df):
+ df = df.copy()
+ df.insert(value=value, **kwargs)
+ return df
+ inserted = frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'insert',
+ func_elementwise,
+ [self._expr],
+ requires_partition_by=partitionings.Arbitrary(),
+ preserves_partition_by=partitionings.Arbitrary()))
+
+ self._expr = inserted._expr
+
+ @frame_base.args_to_kwargs(pd.DataFrame)
+ @frame_base.populate_defaults(pd.DataFrame)
def aggregate(self, func, axis=0, *args, **kwargs):
if 'numeric_only' in kwargs and kwargs['numeric_only']:
# Eagerly generate a proxy to make sure numeric_only is a valid argument
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 55e3892..fcf5963 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -98,6 +98,18 @@
f'Expected {expected_error!r} to be raised, but got {actual!r}'
) from actual
+ def _run_inplace_test(self, func, arg, **kwargs):
+ """Verify an inplace operation performed by func.
+
+ Checks that func performs the same inplace operation on arg, in pandas and
+ in Beam."""
+ def wrapper(df):
+ df = df.copy()
+ func(df)
+ return df
+
+ self._run_test(wrapper, arg, **kwargs)
+
def _run_test(self, func, *args, distributed=True, nonparallel=False):
"""Verify that func(*args) produces the same result in pandas and in Beam.
@@ -185,13 +197,12 @@
def test_set_column(self):
def new_column(df):
df['NewCol'] = df['Speed']
- return df
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
- self._run_test(new_column, df)
+ self._run_inplace_test(new_column, df)
def test_str_split(self):
s = pd.Series([
@@ -212,13 +223,12 @@
def test_set_column_from_index(self):
def new_column(df):
df['NewCol'] = df.index
- return df
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
- self._run_test(new_column, df)
+ self._run_inplace_test(new_column, df)
def test_tz_localize_ambiguous_series(self):
# This replicates a tz_localize doctest:
@@ -706,11 +716,7 @@
self._run_test(lambda df: df.eval('foo = a + b - c'), df)
self._run_test(lambda df: df.query('a > b + c'), df)
- def eval_inplace(df):
- df.eval('foo = a + b - c', inplace=True)
- return df.foo
-
- self._run_test(eval_inplace, df)
+ self._run_inplace_test(lambda df: df.eval('foo = a + b - c'), df)
# Verify that attempting to access locals raises a useful error
deferred_df = frame_base.DeferredFrame.wrap(
@@ -726,9 +732,8 @@
def change_index_names(df):
df.index.names = ['A', None]
- return df
- self._run_test(change_index_names, df)
+ self._run_inplace_test(change_index_names, df)
@parameterized.expand((x, ) for x in [
0,
@@ -1046,6 +1051,14 @@
# projecting only numeric columns should too
self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
+ def test_insert(self):
+ df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
+
+ self._run_inplace_test(lambda df: df.insert(1, 'C', df.A * 2), df)
+ self._run_inplace_test(
+ lambda df: df.insert(0, 'foo', pd.Series([8], index=[1])), df)
+ self._run_inplace_test(lambda df: df.insert(2, 'bar', value='q'), df)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):