blob: f540f23088ce1df71ea06da14e90f8879586582c [file]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import enum
import logging
from typing import Optional
import apache_beam as beam
from apache_beam.coders import BooleanCoder
from apache_beam.coders import PickleCoder
from apache_beam.coders import TimestampCoder
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import OrderedListStateSpec
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.transforms.userstate import TimerSpec
from apache_beam.transforms.userstate import on_timer
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.typehints.typehints import TupleConstraint
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import DurationTypes # pylint: disable=unused-import
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import TimestampTypes # pylint: disable=unused-import
_LOGGER = logging.getLogger("ordered_window_elements")
"""An example putting elements into window in time order on a streaming setting.
The PTransform is a turn-key transform that can handle different input window
settings and element types.
Not only does it buffer elements, it can also prepend a window with
the last seen element if the window is empty or there is a gap between
the beginning of the window and the timestamp of its first element.
"""
class OrderedWindowElementsDoFn(beam.DoFn):
"""A Stateful DoFn that buffers and emits elements in time-ordered windows.
This DoFn uses Beam's stateful processing capabilities to buffer elements
and emit them in order within sliding windows. It handles out-of-order data,
late data, and can fill starting gaps in windows by leveraging states and
timers.
Attributes:
BUFFER_STATE: A `StateSpec` for storing incoming elements (timestamp, value)
in a time-ordered buffer.
WINDOW_TIMER: A `TimerSpec` set to the watermark time domain, used to
trigger the emission of windowed elements.
TIMER_STATE: A `ReadModifyWriteStateSpec` (BooleanCoder) to track whether
the window timer has been initialized and set for the current key.
BUFFER_MIN_TS_STATE: A `ReadModifyWriteStateSpec` (TimestampCoder) to
keep track of the minimum timestamp currently present in the
`buffer_state` for efficient clearing.
BUFFER_MAX_TS_STATE: A `ReadModifyWriteStateSpec` (TimestampCoder) to
keep track of the maximum timestamp currently present in the
`buffer_state` for deciding whether to set further timers or not.
ESTIMATED_WM_STATE: A `ReadModifyWriteStateSpec` (TimestampCoder) to
store the highest observed timestamp for a key, used as an estimated
watermark to detect and filter excessively late data.
"""
BUFFER_STATE = OrderedListStateSpec('buffer', PickleCoder())
WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
# LAST_VALUE = ReadModifyWriteStateSpec('last_value', PickleCoder())
BUFFER_MIN_TS_STATE = ReadModifyWriteStateSpec(
'buffer_min_ts', TimestampCoder())
BUFFER_MAX_TS_STATE = ReadModifyWriteStateSpec(
'buffer_max_ts', TimestampCoder())
ESTIMATED_WM_STATE = ReadModifyWriteStateSpec(
'estimated_wm', TimestampCoder())
def __init__(
self,
duration: DurationTypes,
slide_interval: DurationTypes,
offset: DurationTypes,
allowed_lateness: DurationTypes):
"""Initializes the OrderedWindowElementsFn.
Args:
duration: The duration of each window.
slide_interval: The interval at which windows slide.
offset: The offset of the window boundaries. Windows start at `offset`
past each `duration` interval.
allowed_lateness: The duration for which late data is still processed
after the window's end.
"""
self.duration = duration
self.slide_interval = slide_interval
self.offset = offset
self.allowed_lateness = allowed_lateness
def start_bundle(self):
_LOGGER.info("start bundle")
def finish_bundle(self):
_LOGGER.info("finish bundle")
def _state_add(self, buffer_state, timestamp, value):
"""Add a timestamped-value into the buffer state."""
buffer_state.add((timestamp, value))
def _state_read_range(self, buffer_state, range_lo, range_hi):
"""Retrieves a specified range of elements from the buffer state."""
return list(buffer_state.read_range(range_lo, range_hi))
def _state_clear_range(self, buffer_state, range_lo, range_hi):
"""Clears a specified range of elements from the buffer state."""
# TODO: Dataflow runner v2 gets stuck when MIN_TIMESTAMP is used
# as the lower bound for clear_range. Investigate this further.
buffer_state.clear_range(range_lo, range_hi)
def process(
self,
element,
timestamp=beam.DoFn.TimestampParam,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
timer_state=beam.DoFn.StateParam(TIMER_STATE),
#last_value_state=beam.DoFn.StateParam(LAST_VALUE),
buffer_min_ts_state=beam.DoFn.StateParam(BUFFER_MIN_TS_STATE),
buffer_max_ts_state=beam.DoFn.StateParam(BUFFER_MAX_TS_STATE),
estimated_wm_state=beam.DoFn.StateParam(ESTIMATED_WM_STATE),
):
"""Processes incoming elements, buffering them and setting timers.
This method receives elements, updates the estimated watermark, buffers
the element in `buffer_state`, and sets an initial window timer if
one hasn't been set yet for the current key. It also handles the
`fill_start_if_missing` logic for the `last_value_state`.
Args:
element: A `(key, value)` tuple representing the input element.
timestamp: The event-time timestamp of the element.
buffer_state: The `State` instance for buffering elements.
window_timer: The `Timer` instance for scheduling window firings.
timer_state: The `ReadModifyWriteState` instance to check/set if the
timer has been initialized.
buffer_min_ts_state: The `ReadModifyWriteState` instance for the
minimum timestamp in the buffer.
buffer_max_ts_state: The `ReadModifyWriteState` instance for the
maximum timestamp in the buffer.
estimated_wm_state: The `ReadModifyWriteState` instance for the
estimated watermark.
Returns:
An empty list, as elements are emitted by the `on_timer` method, not
directly by `process`.
"""
_, value = element
_LOGGER.info(
"[process] received element %s at timestamp %s", element, timestamp)
estimated_wm = estimated_wm_state.read()
if not estimated_wm or estimated_wm < timestamp:
estimated_wm = timestamp
estimated_wm_state.write(estimated_wm)
else:
# If the element is too late for the current watermark, drop it.
if estimated_wm > timestamp + self.allowed_lateness:
_LOGGER.info(
"[process] data %s at %s is too late for watermark %s; dropping.",
element,
timestamp,
estimated_wm)
return []
buffer_min_ts = buffer_min_ts_state.read()
if not buffer_min_ts or timestamp < buffer_min_ts:
buffer_min_ts_state.write(timestamp)
buffer_max_ts = buffer_max_ts_state.read()
if not buffer_max_ts or timestamp > buffer_max_ts:
buffer_max_ts_state.write(timestamp)
self._state_add(buffer_state, timestamp, value)
timer_started = timer_state.read()
if not timer_started:
offset_duration = Duration.of(self.offset)
slide_duration = Duration.of(self.slide_interval)
duration_duration = Duration.of(self.duration)
# Align the timestamp with the windowing scheme.
aligned_micros = (timestamp - offset_duration).micros
# Calculate the start of the last window that could contain this timestamp
last_window_start_aligned_micros = (
(aligned_micros // slide_duration.micros) * slide_duration.micros)
last_window_start = Timestamp(
micros=last_window_start_aligned_micros) + offset_duration
n = (duration_duration.micros - 1) // slide_duration.micros
# Calculate the start of the first sliding window.
first_slide_start_ts = last_window_start - Duration(
micros=n * slide_duration.micros)
# Set the initial timer to fire at the end of the first window plus
# allowed lateness.
first_window_end_ts = first_slide_start_ts + self.duration
_LOGGER.info(
"[process] setting initial timer to %s",
first_window_end_ts + self.allowed_lateness)
# if (self.stop_timestamp is None or
# first_window_end_ts + self.allowed_lateness < self.stop_timestamp):
window_timer.set(first_window_end_ts + self.allowed_lateness)
timer_state.write(True)
# if self.fill_start_if_missing:
# last_value = last_value_state.read()
# if not last_value:
# last_value_state.write((MIN_TIMESTAMP, self.default_start_value))
return []
def _get_windowed_values_from_state(
self, buffer_state, window_start_ts, window_end_ts):
"""Retrieves values for a window from the state, handling missing data.
This helper method reads elements within a given window range from the
buffer state. If `fill_start_if_missing` is enabled, it prepends
the `last_value` if the window is initially empty or if there's a gap
between the window start and the first element. It also updates the
`last_value_state` with the last relevant element for the next window.
Args:
buffer_state: The state instance containing buffered elements.
window_start_ts: The start timestamp of the window.
window_end_ts: The end timestamp of the window.
last_value_state: The `ReadModifyWriteState` instance storing the last
emitted value.
Returns:
A list of `(timestamp, value)` tuples for the current window, potentially
including a prepended last value if `fill_start_if_missing` is true.
"""
windowed_values = self._state_read_range(
buffer_state, window_start_ts, window_end_ts)
_LOGGER.info(
"[on_timer] windowed data in buffer (%d): %s",
len(windowed_values),
windowed_values)
# if self.fill_start_if_missing:
# if not windowed_values:
# # If the window is empty, use the last value.
# last_value = last_value_state.read()
# value_to_insert = (window_start_ts, last_value[1])
# windowed_values.append(value_to_insert)
# else:
# first_timestamp = windowed_values[0][0]
# last_value = last_value_state.read()
# if first_timestamp > window_start_ts and last_value:
# # Prepend the last value if there's a gap between the first element
# # in the window and the start of the window.
# value_to_insert = (window_start_ts, last_value[1])
# windowed_values = [value_to_insert] + windowed_values
# # Find the last element before the beginning of the next window to update
# # last_value_state.
# i = 0
# for v in windowed_values:
# if v[0] >= window_start_ts + self.slide_interval:
# break
# i += 1
# if i > 0:
# last_value = windowed_values[i - 1]
# last_value_state.write(last_value)
return [v[1] for v in windowed_values]
@on_timer(WINDOW_TIMER)
def on_timer(
self,
key=beam.DoFn.KeyParam,
fire_ts=beam.DoFn.TimestampParam,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
timer_state=beam.DoFn.StateParam(TIMER_STATE),
buffer_min_ts_state=beam.DoFn.StateParam(BUFFER_MIN_TS_STATE),
buffer_max_ts_state=beam.DoFn.StateParam(BUFFER_MAX_TS_STATE),
):
"""Handles timer firings to emit windowed elements.
When the `WINDOW_TIMER` fires, this method extracts elements for the
current window from the `buffer_state`, handles late-firing windows
(if `allowed_lateness` > 0), and emits them as a windowed `PCollection`.
It also clears processed elements from the buffer and sets the next timer.
Args:
key: The key for which the timer fired.
fire_ts: The event-time timestamp at which the timer fired.
buffer_state: The `State` instance containing buffered
elements.
window_timer: The `Timer` instance for scheduling subsequent timers.
buffer_min_ts_state: The `ReadModifyWriteState` instance for the
minimum timestamp in the buffer.
buffer_max_ts_state: The `ReadModifyWriteState` instance for the
maximum timestamp in the buffer.
Yields:
`TimestampedValue`: A tuple `((key, window_start_ts, window_end_ts),
list_of_values)` where `list_of_values` are the elements windowed and
ordered, timestamped at `window_end_ts - 1`.
"""
_LOGGER.info("[on_timer] timer fired at %s", fire_ts)
window_end_ts = fire_ts - self.allowed_lateness
window_start_ts = window_end_ts - self.duration
buffer_min_ts = buffer_min_ts_state.read()
if not buffer_min_ts or buffer_min_ts > window_start_ts:
buffer_min_ts = window_start_ts
buffer_max_ts = buffer_max_ts_state.read()
if self.allowed_lateness > 0:
# Emit late windows that occurred prior to the current window.
late_start_ts = window_start_ts
while late_start_ts > buffer_min_ts:
late_start_ts -= self.slide_interval
while late_start_ts < window_start_ts:
late_end_ts = late_start_ts + self.duration
_LOGGER.info(
"[on_timer] emitting late window: start=%s, end=%s",
late_start_ts,
late_end_ts)
windowed_values = self._get_windowed_values_from_state(
buffer_state, late_start_ts, late_end_ts)
yield TimestampedValue(
(key, ((late_start_ts, late_end_ts), windowed_values)),
late_end_ts - 1)
late_start_ts += self.slide_interval
# Read and emit elements for the on-time window.
_LOGGER.info(
"[on_timer] emitting on-time window: start=%s, end=%s",
window_start_ts,
window_end_ts)
windowed_values = self._get_windowed_values_from_state(
buffer_state, window_start_ts, window_end_ts)
yield TimestampedValue(
(key, ((window_start_ts, window_end_ts), windowed_values)),
window_end_ts - 1)
# Post-emit actions for the current window:
# - Compute the next window's start and end timestamps.
# - Clean up states for expired windows.
# - Set a new timer for the next window.
next_window_end_ts = fire_ts - self.allowed_lateness + self.slide_interval
next_window_start_ts = window_start_ts + self.slide_interval
_LOGGER.info(
"[on_timer] clearing timestamp range [%s, %s]",
buffer_min_ts,
next_window_start_ts)
self._state_clear_range(buffer_state, buffer_min_ts, next_window_start_ts)
buffer_min_ts_state.write(next_window_start_ts)
if buffer_max_ts and buffer_max_ts < next_window_start_ts:
# buffer is empty, don't set the timer.
buffer_max_ts_state.clear()
buffer_min_ts_state.clear()
timer_state.write(False)
else:
_LOGGER.info(
"[on_timer] setting follow-up timer to %s",
next_window_end_ts + self.allowed_lateness)
window_timer.set(next_window_end_ts + self.allowed_lateness)
# class OrderedWindowElementsDoFnWithBag(OrderedWindowElementsDoFn):
# """The implementation of stateful Dofn with BagState as buffer state"""
# BUFFER_STATE = BagStateSpec('buffer', PickleCoder())
# WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
# def _state_add(self, buffer_state, timestamp, value):
# """Add a timestamped-value into the buffer state."""
# buffer_state.add((timestamp, value))
# def _state_read_range(self, buffer_state, range_lo, range_hi):
# """Retrieves a specified range of elements from the buffer state."""
# all_elements = list(buffer_state.read())
# filtered_elements = [(ts, val) for ts, val in all_elements
# if range_lo <= ts < range_hi]
# filtered_elements.sort(key=lambda x: x[0])
# return filtered_elements
# def _state_clear_range(self, buffer_state, range_lo, range_hi):
# """Clears a specified range of elements from the buffer state."""
# remaining_elements = self._state_read_range(
# buffer_state, range_hi, MAX_TIMESTAMP)
# buffer_state.clear()
# for e in remaining_elements:
# buffer_state.add(e)
# def process(
# self,
# element,
# timestamp=beam.DoFn.TimestampParam,
# buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
# window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
# timer_state=beam.DoFn.StateParam(OrderedWindowElementsDoFn.TIMER_STATE),
# last_value_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.LAST_VALUE),
# buffer_min_ts_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.BUFFER_MIN_TS_STATE),
# estimated_wm_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.ESTIMATED_WM_STATE),
# ):
# yield from super().process(
# element,
# timestamp,
# buffer_state,
# window_timer,
# timer_state,
# last_value_state,
# buffer_min_ts_state,
# estimated_wm_state)
# @on_timer(WINDOW_TIMER)
# def on_timer(
# self,
# key=beam.DoFn.KeyParam,
# fire_ts=beam.DoFn.TimestampParam,
# buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
# window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
# last_value_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.LAST_VALUE),
# buffer_min_ts_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.BUFFER_MIN_TS_STATE),
# ):
# yield from super().on_timer(
# key=key,
# fire_ts=fire_ts,
# buffer_state=buffer_state,
# window_timer=window_timer,
# last_value_state=last_value_state,
# buffer_min_ts_state=buffer_min_ts_state)
# class OrderedWindowElementsDoFnWithValue(OrderedWindowElementsDoFn):
# """The implementation of stateful Dofn with ValueState as buffer state"""
# BUFFER_STATE = ReadModifyWriteStateSpec('buffer', PickleCoder())
# WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
# def _state_add(self, buffer_state, timestamp, value):
# """Add a timestamped-value into the buffer state."""
# buffer = buffer_state.read() or []
# buffer.append((timestamp, value))
# buffer_state.write(buffer)
# def _state_read_range(self, buffer_state, range_lo, range_hi):
# """Retrieves a specified range of elements from the buffer state."""
# all_elements = buffer_state.read()
# filtered_elements = [(ts, val) for ts, val in all_elements
# if range_lo <= ts < range_hi]
# filtered_elements.sort(key=lambda x: x[0])
# return filtered_elements
# def _state_clear_range(self, buffer_state, range_lo, range_hi):
# """Clears a specified range of elements from the buffer state."""
# remaining_elements = self._state_read_range(
# buffer_state, range_hi, MAX_TIMESTAMP)
# buffer_state.write(remaining_elements)
# def process(
# self,
# element,
# timestamp=beam.DoFn.TimestampParam,
# buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
# window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
# timer_state=beam.DoFn.StateParam(OrderedWindowElementsDoFn.TIMER_STATE),
# last_value_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.LAST_VALUE),
# buffer_min_ts_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.BUFFER_MIN_TS_STATE),
# estimated_wm_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.ESTIMATED_WM_STATE),
# ):
# yield from super().process(
# element,
# timestamp,
# buffer_state,
# window_timer,
# timer_state,
# last_value_state,
# buffer_min_ts_state,
# estimated_wm_state)
# @on_timer(WINDOW_TIMER)
# def on_timer(
# self,
# key=beam.DoFn.KeyParam,
# fire_ts=beam.DoFn.TimestampParam,
# buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
# window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
# last_value_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.LAST_VALUE),
# buffer_min_ts_state=beam.DoFn.StateParam(
# OrderedWindowElementsDoFn.BUFFER_MIN_TS_STATE),
# ):
# yield from super().on_timer(
# key=key,
# fire_ts=fire_ts,
# buffer_state=buffer_state,
# window_timer=window_timer,
# last_value_state=last_value_state,
# buffer_min_ts_state=buffer_min_ts_state)
# class BufferStateType(enum.Enum):
# ORDERED_LIST = 0
# BAG = 1
# VALUE = 2
# class OrderedWindowElements(PTransform):
# """A PTransform that batches elements into ordered, sliding windows.
# This transform processes elements with timestamps, buffering them and
# emitting them in fixed or sliding windows. It supports late data handling
# and can fill missing initial values in a window.
# """
# def __init__(
# self,
# duration: DurationTypes,
# slide_interval: Optional[DurationTypes] = None,
# offset: DurationTypes = 0,
# allowed_lateness: DurationTypes = 0,
# default_start_value=None,
# fill_start_if_missing: bool = False,
# stop_timestamp: Optional[TimestampTypes] = None,
# buffer_state_type: BufferStateType = BufferStateType.ORDERED_LIST,
# ):
# """Initializes the OrderedWindowElements transform.
# Args:
# duration: The duration of each window.
# slide_interval: The interval at which windows slide. Defaults to
# `duration` if not provided (i.e., fixed windows).
# offset: The offset of the window boundaries.
# allowed_lateness: The maximum amount of time an element can be late and
# still be processed.
# default_start_value: The default value to use if `fill_start_if_missing`
# is true and a window is empty at its start.
# fill_start_if_missing: If true, the transform will attempt to fill the
# beginning of a window with the last known value if no elements are
# present at the window's start.
# stop_timestamp: An optional timestamp to stop processing and firing
# timers.
# buffer_state_type: An optional enum to control what backend state to use
# to store buffered elements. By default, it is using ordered list state.
# """
# self.duration = duration
# self.slide_interval = duration if slide_interval is None else slide_interval
# self.offset = offset
# self.allowed_lateness = allowed_lateness
# self.default_start_value = default_start_value
# self.fill_start_if_missing = fill_start_if_missing
# self.stop_timestamp = stop_timestamp
# self.buffer_state_type = buffer_state_type
# def expand(self, input):
# """Applies the OrderedWindowElements transform to the input PCollection.
# The input PCollection is first ensured to be in `GlobalWindows`. If it's
# unkeyed, a default key is added. The `OrderedWindowElementsFn` is then
# applied. If the input was originally unkeyed, the default key is removed.
# Args:
# input: The input `PCollection`. Can be keyed (e.g.,
# `PCollection[Tuple[K, V]]`) or unkeyed (e.g., `PCollection[V]`).
# Returns:
# A `PCollection` of `((key, window_start, window_end), list_of_values)`
# (if input was keyed) or `list_of_values` (if input was unkeyed), where
# `list_of_values` are the elements windowed and ordered.
# """
# windowing = input.windowing
# if not isinstance(windowing.windowfn, GlobalWindows):
# _LOGGER.warning(
# 'Input PCollection is not in GlobalWindows. Overwriting windowing '
# 'function with GlobalWindows.')
# input = input | "ToGlobalWindows" >> beam.WindowInto(GlobalWindows())
# if isinstance(input.element_type, TupleConstraint):
# keyed_input = input
# else:
# # Add a default key (0) if the input PCollection is unkeyed.
# keyed_input = input | beam.WithKeys(0)
# if self.buffer_state_type == BufferStateType.ORDERED_LIST:
# dofn = OrderedWindowElementsDoFn
# elif self.buffer_state_type == BufferStateType.BAG:
# dofn = OrderedWindowElementsDoFnWithBag
# elif self.buffer_state_type == BufferStateType.VALUE:
# dofn = OrderedWindowElementsDoFnWithValue
# else:
# raise ValueError("Unknown buffer_state_type: " + self.buffer_state_type)
# keyed_output = (
# keyed_input | 'Ordered Sliding Window' >> beam.ParDo(
# dofn(
# self.duration,
# self.slide_interval,
# self.offset,
# self.allowed_lateness,
# self.default_start_value,
# self.fill_start_if_missing,
# self.stop_timestamp)))
# if isinstance(input.element_type, TupleConstraint):
# ret = keyed_output
# else:
# # Remove the default key if the input PCollection was originally unkeyed.
# ret = keyed_output | beam.Values()
# return ret