Merge pull request #13126 More flexible dataframe conversions.
diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py
index 24ba96b..2ac7e13 100644
--- a/sdks/python/apache_beam/dataframe/convert.py
+++ b/sdks/python/apache_beam/dataframe/convert.py
@@ -31,13 +31,15 @@
if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
+ from typing import Optional
import pandas
# TODO: Or should this be called as_dataframe?
def to_dataframe(
pcoll, # type: pvalue.PCollection
- proxy=None, # type: pandas.core.generic.NDFrame
+ proxy=None, # type: Optional[pandas.core.generic.NDFrame]
+ label=None, # type: Optional[str]
):
# type: (...) -> frame_base.DeferredFrame
@@ -61,8 +63,12 @@
"the input PCollection, or provide a proxy.")
# If no proxy is given, assume this is an element-wise schema-aware
# PCollection that needs to be batched.
+ if label is None:
+ # Attempt to come up with a reasonable, stable label by retrieving
+ # the name of these variables in the calling context.
+ label = 'BatchElements(%s)' % _var_name(pcoll, 2)
proxy = schemas.generate_proxy(pcoll.element_type)
- pcoll = pcoll | 'BatchElements' >> schemas.BatchRowsAsDataFrame()
+ pcoll = pcoll | label >> schemas.BatchRowsAsDataFrame(proxy=proxy)
return frame_base.DeferredFrame.wrap(
expressions.PlaceholderExpression(proxy, pcoll))
@@ -112,23 +118,7 @@
if label is None:
# Attempt to come up with a reasonable, stable label by retrieving the name
# of these variables in the calling context.
- current_frame = inspect.currentframe()
- if current_frame is None:
- label = 'ToDataframe(...)'
-
- else:
- previous_frame = current_frame.f_back
-
- def name(obj):
- for key, value in previous_frame.f_locals.items():
- if obj is value:
- return key
- for key, value in previous_frame.f_globals.items():
- if obj is value:
- return key
- return '...'
-
- label = 'ToDataframe(%s)' % ', '.join(name(e) for e in dataframes)
+ label = 'ToPCollection(%s)' % ', '.join(_var_name(e, 3) for e in dataframes)
def extract_input(placeholder):
if not isinstance(placeholder._reference, pvalue.PCollection):
@@ -145,10 +135,16 @@
dataframes))) # type: Dict[Any, pvalue.PCollection]
if yield_elements == "schemas":
+
+ def maybe_unbatch(pc, value):
+ if isinstance(value, frame_base._DeferredScalar):
+ return pc
+ else:
+ return pc | "Unbatch '%s'" % value._expr._id >> schemas.UnbatchPandas(
+ value._expr.proxy(), include_indexes=include_indexes)
+
results = {
- key: pc
- | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
- dataframes[key]._expr.proxy(), include_indexes=include_indexes)
+ key: maybe_unbatch(pc, dataframes[key])
for (key, pc) in results.items()
}
@@ -156,3 +152,18 @@
return results[0]
else:
return tuple(value for key, value in sorted(results.items()))
+
+
+def _var_name(obj, level):
+ frame = inspect.currentframe()
+ for _ in range(level):
+ if frame is None:
+ return '...'
+ frame = frame.f_back
+ for key, value in frame.f_locals.items():
+ if obj is value:
+ return key
+ for key, value in frame.f_globals.items():
+ if obj is value:
+ return key
+ return '...'
diff --git a/sdks/python/apache_beam/dataframe/convert_test.py b/sdks/python/apache_beam/dataframe/convert_test.py
index e1f663d..bb1032e 100644
--- a/sdks/python/apache_beam/dataframe/convert_test.py
+++ b/sdks/python/apache_beam/dataframe/convert_test.py
@@ -66,11 +66,11 @@
a = pd.Series([1, 2, 3])
b = pd.Series([100, 200, 300])
- pc_a = p | 'A' >> beam.Create([a])
- pc_b = p | 'B' >> beam.Create([b])
+ pc_a = p | 'A' >> beam.Create(a)
+ pc_b = p | 'B' >> beam.Create(b)
- df_a = convert.to_dataframe(pc_a, proxy=a[:0])
- df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+ df_a = convert.to_dataframe(pc_a)
+ df_b = convert.to_dataframe(pc_b)
df_2a = 2 * df_a
df_3a = 3 * df_a
@@ -85,6 +85,13 @@
assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+ def test_convert_scalar(self):
+ with beam.Pipeline() as p:
+ pc = p | 'A' >> beam.Create([1, 2, 3])
+ s = convert.to_dataframe(pc)
+ pc_sum = convert.to_pcollection(s.sum())
+ assert_that(pc_sum, equal_to([6]))
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py
index 26d1649..f9ff55b 100644
--- a/sdks/python/apache_beam/dataframe/schemas.py
+++ b/sdks/python/apache_beam/dataframe/schemas.py
@@ -146,15 +146,23 @@
Batching parameters are inherited from
:class:`~apache_beam.transforms.util.BatchElements`.
"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, proxy=None, **kwargs):
self._batch_elements_transform = BatchElements(*args, **kwargs)
+ self._proxy = proxy
def expand(self, pcoll):
- columns = [
- name for name, _ in named_fields_from_element_type(pcoll.element_type)
- ]
- return pcoll | self._batch_elements_transform | beam.Map(
- lambda batch: pd.DataFrame.from_records(batch, columns=columns))
+ proxy = generate_proxy(
+ pcoll.element_type) if self._proxy is None else self._proxy
+ if isinstance(proxy, pd.DataFrame):
+ columns = proxy.columns
+ construct = lambda batch: pd.DataFrame.from_records(
+ batch, columns=columns)
+ elif isinstance(proxy, pd.Series):
+ dtype = proxy.dtype
+ construct = lambda batch: pd.Series(batch, dtype=dtype)
+ else:
+ raise NotImplementedError("Unknown proxy type: %s" % proxy)
+ return pcoll | self._batch_elements_transform | beam.Map(construct)
def generate_proxy(element_type):
@@ -163,18 +171,21 @@
"""Generate a proxy pandas object for the given PCollection element_type.
Currently only supports generating a DataFrame proxy from a schema-aware
- PCollection.
+ PCollection or a Series proxy from a primitively typed PCollection.
"""
- fields = named_fields_from_element_type(element_type)
- proxy = pd.DataFrame(columns=[name for name, _ in fields])
+ if element_type != Any and element_type in BEAM_TO_PANDAS:
+ return pd.Series(dtype=BEAM_TO_PANDAS[element_type])
- for name, typehint in fields:
- # Default to np.object. This is lossy, we won't be able to recover the type
- # at the output.
- dtype = BEAM_TO_PANDAS.get(typehint, np.object)
- proxy[name] = proxy[name].astype(dtype)
+ else:
+ fields = named_fields_from_element_type(element_type)
+ proxy = pd.DataFrame(columns=[name for name, _ in fields])
+ for name, typehint in fields:
+ # Default to np.object. This is lossy, we won't be able to recover
+ # the type at the output.
+ dtype = BEAM_TO_PANDAS.get(typehint, np.object)
+ proxy[name] = proxy[name].astype(dtype)
- return proxy
+ return proxy
def element_type_from_dataframe(proxy, include_indexes=False):