Merge pull request #13126 More flexible dataframe conversions.

diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py
index 24ba96b..2ac7e13 100644
--- a/sdks/python/apache_beam/dataframe/convert.py
+++ b/sdks/python/apache_beam/dataframe/convert.py
@@ -31,13 +31,15 @@
 
 if TYPE_CHECKING:
   # pylint: disable=ungrouped-imports
+  from typing import Optional
   import pandas
 
 
 # TODO: Or should this be called as_dataframe?
 def to_dataframe(
     pcoll,  # type: pvalue.PCollection
-    proxy=None,  # type: pandas.core.generic.NDFrame
+    proxy=None,  # type: Optional[pandas.core.generic.NDFrame]
+    label=None,  # type: Optional[str]
 ):
   # type: (...) -> frame_base.DeferredFrame
 
@@ -61,8 +63,12 @@
           "the input PCollection, or provide a proxy.")
     # If no proxy is given, assume this is an element-wise schema-aware
     # PCollection that needs to be batched.
+    if label is None:
+      # Attempt to come up with a reasonable, stable label by retrieving
+      # the name of these variables in the calling context.
+      label = 'BatchElements(%s)' % _var_name(pcoll, 2)
     proxy = schemas.generate_proxy(pcoll.element_type)
-    pcoll = pcoll | 'BatchElements' >> schemas.BatchRowsAsDataFrame()
+    pcoll = pcoll | label >> schemas.BatchRowsAsDataFrame(proxy=proxy)
   return frame_base.DeferredFrame.wrap(
       expressions.PlaceholderExpression(proxy, pcoll))
 
@@ -112,23 +118,7 @@
   if label is None:
     # Attempt to come up with a reasonable, stable label by retrieving the name
     # of these variables in the calling context.
-    current_frame = inspect.currentframe()
-    if current_frame is None:
-      label = 'ToDataframe(...)'
-
-    else:
-      previous_frame = current_frame.f_back
-
-      def name(obj):
-        for key, value in previous_frame.f_locals.items():
-          if obj is value:
-            return key
-        for key, value in previous_frame.f_globals.items():
-          if obj is value:
-            return key
-        return '...'
-
-      label = 'ToDataframe(%s)' % ', '.join(name(e) for e in dataframes)
+    label = 'ToPCollection(%s)' % ', '.join(_var_name(e, 3) for e in dataframes)
 
   def extract_input(placeholder):
     if not isinstance(placeholder._reference, pvalue.PCollection):
@@ -145,10 +135,16 @@
                      dataframes)))  # type: Dict[Any, pvalue.PCollection]
 
   if yield_elements == "schemas":
+
+    def maybe_unbatch(pc, value):
+      if isinstance(value, frame_base._DeferredScalar):
+        return pc
+      else:
+        return pc | "Unbatch '%s'" % value._expr._id >> schemas.UnbatchPandas(
+            value._expr.proxy(), include_indexes=include_indexes)
+
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
+        key: maybe_unbatch(pc, dataframes[key])
         for (key, pc) in results.items()
     }
 
@@ -156,3 +152,18 @@
     return results[0]
   else:
     return tuple(value for key, value in sorted(results.items()))
+
+
+def _var_name(obj, level):
+  frame = inspect.currentframe()
+  for _ in range(level):
+    if frame is None:
+      return '...'
+    frame = frame.f_back
+  for key, value in frame.f_locals.items():
+    if obj is value:
+      return key
+  for key, value in frame.f_globals.items():
+    if obj is value:
+      return key
+  return '...'
diff --git a/sdks/python/apache_beam/dataframe/convert_test.py b/sdks/python/apache_beam/dataframe/convert_test.py
index e1f663d..bb1032e 100644
--- a/sdks/python/apache_beam/dataframe/convert_test.py
+++ b/sdks/python/apache_beam/dataframe/convert_test.py
@@ -66,11 +66,11 @@
       a = pd.Series([1, 2, 3])
       b = pd.Series([100, 200, 300])
 
-      pc_a = p | 'A' >> beam.Create([a])
-      pc_b = p | 'B' >> beam.Create([b])
+      pc_a = p | 'A' >> beam.Create(a)
+      pc_b = p | 'B' >> beam.Create(b)
 
-      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
-      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+      df_a = convert.to_dataframe(pc_a)
+      df_b = convert.to_dataframe(pc_b)
 
       df_2a = 2 * df_a
       df_3a = 3 * df_a
@@ -85,6 +85,13 @@
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_scalar(self):
+    with beam.Pipeline() as p:
+      pc = p | 'A' >> beam.Create([1, 2, 3])
+      s = convert.to_dataframe(pc)
+      pc_sum = convert.to_pcollection(s.sum())
+      assert_that(pc_sum, equal_to([6]))
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py
index 26d1649..f9ff55b 100644
--- a/sdks/python/apache_beam/dataframe/schemas.py
+++ b/sdks/python/apache_beam/dataframe/schemas.py
@@ -146,15 +146,23 @@
   Batching parameters are inherited from
   :class:`~apache_beam.transforms.util.BatchElements`.
   """
-  def __init__(self, *args, **kwargs):
+  def __init__(self, *args, proxy=None, **kwargs):
     self._batch_elements_transform = BatchElements(*args, **kwargs)
+    self._proxy = proxy
 
   def expand(self, pcoll):
-    columns = [
-        name for name, _ in named_fields_from_element_type(pcoll.element_type)
-    ]
-    return pcoll | self._batch_elements_transform | beam.Map(
-        lambda batch: pd.DataFrame.from_records(batch, columns=columns))
+    proxy = generate_proxy(
+        pcoll.element_type) if self._proxy is None else self._proxy
+    if isinstance(proxy, pd.DataFrame):
+      columns = proxy.columns
+      construct = lambda batch: pd.DataFrame.from_records(
+          batch, columns=columns)
+    elif isinstance(proxy, pd.Series):
+      dtype = proxy.dtype
+      construct = lambda batch: pd.Series(batch, dtype=dtype)
+    else:
+      raise NotImplementedError("Unknown proxy type: %s" % proxy)
+    return pcoll | self._batch_elements_transform | beam.Map(construct)
 
 
 def generate_proxy(element_type):
@@ -163,18 +171,21 @@
   """Generate a proxy pandas object for the given PCollection element_type.
 
   Currently only supports generating a DataFrame proxy from a schema-aware
-  PCollection.
+  PCollection or a Series proxy from a primitively typed PCollection.
   """
-  fields = named_fields_from_element_type(element_type)
-  proxy = pd.DataFrame(columns=[name for name, _ in fields])
+  if element_type != Any and element_type in BEAM_TO_PANDAS:
+    return pd.Series(dtype=BEAM_TO_PANDAS[element_type])
 
-  for name, typehint in fields:
-    # Default to np.object. This is lossy, we won't be able to recover the type
-    # at the output.
-    dtype = BEAM_TO_PANDAS.get(typehint, np.object)
-    proxy[name] = proxy[name].astype(dtype)
+  else:
+    fields = named_fields_from_element_type(element_type)
+    proxy = pd.DataFrame(columns=[name for name, _ in fields])
+    for name, typehint in fields:
+      # Default to np.object. This is lossy, we won't be able to recover
+      # the type at the output.
+      dtype = BEAM_TO_PANDAS.get(typehint, np.object)
+      proxy[name] = proxy[name].astype(dtype)
 
-  return proxy
+    return proxy
 
 
 def element_type_from_dataframe(proxy, include_indexes=False):