| # | 
 | # Licensed to the Apache Software Foundation (ASF) under one or more | 
 | # contributor license agreements.  See the NOTICE file distributed with | 
 | # this work for additional information regarding copyright ownership. | 
 | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
 | # (the "License"); you may not use this file except in compliance with | 
 | # the License.  You may obtain a copy of the License at | 
 | # | 
 | #    http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | # | 
 |  | 
 | """Simple utility PTransforms. | 
 | """ | 
 |  | 
 | # pytype: skip-file | 
 |  | 
 | import collections | 
 | import contextlib | 
 | import random | 
 | import re | 
 | import threading | 
 | import time | 
 | import uuid | 
 | from typing import TYPE_CHECKING | 
 | from typing import Any | 
 | from typing import Iterable | 
 | from typing import List | 
 | from typing import Tuple | 
 | from typing import TypeVar | 
 | from typing import Union | 
 |  | 
 | from apache_beam import coders | 
 | from apache_beam import typehints | 
 | from apache_beam.metrics import Metrics | 
 | from apache_beam.portability import common_urns | 
 | from apache_beam.portability.api import beam_runner_api_pb2 | 
 | from apache_beam.pvalue import AsSideInput | 
 | from apache_beam.transforms import window | 
 | from apache_beam.transforms.combiners import CountCombineFn | 
 | from apache_beam.transforms.core import CombinePerKey | 
 | from apache_beam.transforms.core import DoFn | 
 | from apache_beam.transforms.core import FlatMap | 
 | from apache_beam.transforms.core import Flatten | 
 | from apache_beam.transforms.core import GroupByKey | 
 | from apache_beam.transforms.core import Map | 
 | from apache_beam.transforms.core import MapTuple | 
 | from apache_beam.transforms.core import ParDo | 
 | from apache_beam.transforms.core import Windowing | 
 | from apache_beam.transforms.ptransform import PTransform | 
 | from apache_beam.transforms.ptransform import ptransform_fn | 
 | from apache_beam.transforms.timeutil import TimeDomain | 
 | from apache_beam.transforms.trigger import AccumulationMode | 
 | from apache_beam.transforms.trigger import Always | 
 | from apache_beam.transforms.userstate import BagStateSpec | 
 | from apache_beam.transforms.userstate import CombiningValueStateSpec | 
 | from apache_beam.transforms.userstate import TimerSpec | 
 | from apache_beam.transforms.userstate import on_timer | 
 | from apache_beam.transforms.window import NonMergingWindowFn | 
 | from apache_beam.transforms.window import TimestampCombiner | 
 | from apache_beam.transforms.window import TimestampedValue | 
 | from apache_beam.typehints.decorators import get_signature | 
 | from apache_beam.typehints.sharded_key_type import ShardedKeyType | 
 | from apache_beam.utils import windowed_value | 
 | from apache_beam.utils.annotations import deprecated | 
 | from apache_beam.utils.annotations import experimental | 
 | from apache_beam.utils.sharded_key import ShardedKey | 
 |  | 
 | if TYPE_CHECKING: | 
 |   from apache_beam import pvalue | 
 |   from apache_beam.runners.pipeline_context import PipelineContext | 
 |  | 
 | __all__ = [ | 
 |     'BatchElements', | 
 |     'CoGroupByKey', | 
 |     'Distinct', | 
 |     'Keys', | 
 |     'KvSwap', | 
 |     'Regex', | 
 |     'Reify', | 
 |     'RemoveDuplicates', | 
 |     'Reshuffle', | 
 |     'ToString', | 
 |     'Values', | 
 |     'WithKeys', | 
 |     'GroupIntoBatches' | 
 | ] | 
 |  | 
 | K = TypeVar('K') | 
 | V = TypeVar('V') | 
 | T = TypeVar('T') | 
 |  | 
 |  | 
 | class CoGroupByKey(PTransform): | 
 |   """Groups results across several PCollections by key. | 
 |  | 
 |   Given an input dict of serializable keys (called "tags") to 0 or more | 
 |   PCollections of (key, value) tuples, it creates a single output PCollection | 
 |   of (key, value) tuples whose keys are the unique input keys from all inputs, | 
 |   and whose values are dicts mapping each tag to an iterable of whatever values | 
 |   were under the key in the corresponding PCollection, in this manner:: | 
 |  | 
 |       ('some key', {'tag1': ['value 1 under "some key" in pcoll1', | 
 |                              'value 2 under "some key" in pcoll1', | 
 |                              ...], | 
 |                     'tag2': ... , | 
 |                     ... }) | 
 |  | 
 |   For example, given:: | 
 |  | 
 |       {'tag1': pc1, 'tag2': pc2, 333: pc3} | 
 |  | 
 |   where:: | 
 |  | 
 |       pc1 = [(k1, v1)] | 
 |       pc2 = [] | 
 |       pc3 = [(k1, v31), (k1, v32), (k2, v33)] | 
 |  | 
 |   The output PCollection would be:: | 
 |  | 
 |       [(k1, {'tag1': [v1], 'tag2': [], 333: [v31, v32]}), | 
 |        (k2, {'tag1': [], 'tag2': [], 333: [v33]})] | 
 |  | 
 |   CoGroupByKey also works for tuples, lists, or other flat iterables of | 
 |   PCollections, in which case the values of the resulting PCollections | 
 |   will be tuples whose nth value is the list of values from the nth | 
 |   PCollection---conceptually, the "tags" are the indices into the input. | 
 |   Thus, for this input:: | 
 |  | 
 |      (pc1, pc2, pc3) | 
 |  | 
 |   the output would be:: | 
 |  | 
 |       [(k1, ([v1], [], [v31, v32]), | 
 |        (k2, ([], [], [v33]))] | 
 |  | 
 |   Attributes: | 
 |     **kwargs: Accepts a single named argument "pipeline", which specifies the | 
 |       pipeline that "owns" this PTransform. Ordinarily CoGroupByKey can obtain | 
 |       this information from one of the input PCollections, but if there are none | 
 |       (or if there's a chance there may be none), this argument is the only way | 
 |       to provide pipeline information, and should be considered mandatory. | 
 |   """ | 
 |   def __init__(self, *, pipeline=None): | 
 |     self.pipeline = pipeline | 
 |  | 
 |   def _extract_input_pvalues(self, pvalueish): | 
 |     try: | 
 |       # If this works, it's a dict. | 
 |       return pvalueish, tuple(pvalueish.values()) | 
 |     except AttributeError: | 
 |       # Cast iterables a tuple so we can do re-iteration. | 
 |       pcolls = tuple(pvalueish) | 
 |       return pcolls, pcolls | 
 |  | 
 |   def expand(self, pcolls): | 
 |     if isinstance(pcolls, dict): | 
 |       if all(isinstance(tag, str) and len(tag) < 10 for tag in pcolls.keys()): | 
 |         # Small, string tags. Pass them as data. | 
 |         pcolls_dict = pcolls | 
 |         restore_tags = None | 
 |       else: | 
 |         # Pass the tags in the restore_tags closure. | 
 |         tags = list(pcolls.keys()) | 
 |         pcolls_dict = {str(ix): pcolls[tag] for (ix, tag) in enumerate(tags)} | 
 |         restore_tags = lambda vs: { | 
 |             tag: vs[str(ix)] | 
 |             for (ix, tag) in enumerate(tags) | 
 |         } | 
 |     else: | 
 |       # Tags are tuple indices. | 
 |       num_tags = len(pcolls) | 
 |       pcolls_dict = {str(ix): pcolls[ix] for ix in range(num_tags)} | 
 |       restore_tags = lambda vs: tuple(vs[str(ix)] for ix in range(num_tags)) | 
 |  | 
 |     result = ( | 
 |         pcolls_dict | 'CoGroupByKeyImpl' >> _CoGBKImpl(pipeline=self.pipeline)) | 
 |     if restore_tags: | 
 |       return result | 'RestoreTags' >> MapTuple( | 
 |           lambda k, vs: (k, restore_tags(vs))) | 
 |     else: | 
 |       return result | 
 |  | 
 |  | 
 | class _CoGBKImpl(PTransform): | 
 |   def __init__(self, *, pipeline=None): | 
 |     self.pipeline = pipeline | 
 |  | 
 |   def expand(self, pcolls): | 
 |     # Check input PCollections for PCollection-ness, and that they all belong | 
 |     # to the same pipeline. | 
 |     for pcoll in pcolls.values(): | 
 |       self._check_pcollection(pcoll) | 
 |       if self.pipeline: | 
 |         assert pcoll.pipeline == self.pipeline | 
 |  | 
 |     tags = list(pcolls.keys()) | 
 |  | 
 |     def add_tag(tag): | 
 |       return lambda k, v: (k, (tag, v)) | 
 |  | 
 |     def collect_values(key, tagged_values): | 
 |       grouped_values = {tag: [] for tag in tags} | 
 |       for tag, value in tagged_values: | 
 |         grouped_values[tag].append(value) | 
 |       return key, grouped_values | 
 |  | 
 |     return ([ | 
 |         pcoll | 
 |         | 'Tag[%s]' % tag >> MapTuple(add_tag(tag)) | 
 |         for (tag, pcoll) in pcolls.items() | 
 |     ] | 
 |             | Flatten(pipeline=self.pipeline) | 
 |             | GroupByKey() | 
 |             | MapTuple(collect_values)) | 
 |  | 
 |  | 
 | @ptransform_fn | 
 | @typehints.with_input_types(Tuple[K, V]) | 
 | @typehints.with_output_types(K) | 
 | def Keys(pcoll, label='Keys'):  # pylint: disable=invalid-name | 
 |   """Produces a PCollection of first elements of 2-tuples in a PCollection.""" | 
 |   return pcoll | label >> MapTuple(lambda k, _: k) | 
 |  | 
 |  | 
 | @ptransform_fn | 
 | @typehints.with_input_types(Tuple[K, V]) | 
 | @typehints.with_output_types(V) | 
 | def Values(pcoll, label='Values'):  # pylint: disable=invalid-name | 
 |   """Produces a PCollection of second elements of 2-tuples in a PCollection.""" | 
 |   return pcoll | label >> MapTuple(lambda _, v: v) | 
 |  | 
 |  | 
 | @ptransform_fn | 
 | @typehints.with_input_types(Tuple[K, V]) | 
 | @typehints.with_output_types(Tuple[V, K]) | 
 | def KvSwap(pcoll, label='KvSwap'):  # pylint: disable=invalid-name | 
 |   """Produces a PCollection reversing 2-tuples in a PCollection.""" | 
 |   return pcoll | label >> MapTuple(lambda k, v: (v, k)) | 
 |  | 
 |  | 
 | @ptransform_fn | 
 | @typehints.with_input_types(T) | 
 | @typehints.with_output_types(T) | 
 | def Distinct(pcoll):  # pylint: disable=invalid-name | 
 |   """Produces a PCollection containing distinct elements of a PCollection.""" | 
 |   return ( | 
 |       pcoll | 
 |       | 'ToPairs' >> Map(lambda v: (v, None)) | 
 |       | 'Group' >> CombinePerKey(lambda vs: None) | 
 |       | 'Distinct' >> Keys()) | 
 |  | 
 |  | 
 | @deprecated(since='2.12', current='Distinct') | 
 | @ptransform_fn | 
 | @typehints.with_input_types(T) | 
 | @typehints.with_output_types(T) | 
 | def RemoveDuplicates(pcoll): | 
 |   """Produces a PCollection containing distinct elements of a PCollection.""" | 
 |   return pcoll | 'RemoveDuplicates' >> Distinct() | 
 |  | 
 |  | 
 | class _BatchSizeEstimator(object): | 
 |   """Estimates the best size for batches given historical timing. | 
 |   """ | 
 |  | 
 |   _MAX_DATA_POINTS = 100 | 
 |   _MAX_GROWTH_FACTOR = 2 | 
 |  | 
 |   def __init__( | 
 |       self, | 
 |       min_batch_size=1, | 
 |       max_batch_size=10000, | 
 |       target_batch_overhead=.05, | 
 |       target_batch_duration_secs=1, | 
 |       variance=0.25, | 
 |       clock=time.time, | 
 |       ignore_first_n_seen_per_batch_size=0): | 
 |     if min_batch_size > max_batch_size: | 
 |       raise ValueError( | 
 |           "Minimum (%s) must not be greater than maximum (%s)" % | 
 |           (min_batch_size, max_batch_size)) | 
 |     if target_batch_overhead and not 0 < target_batch_overhead <= 1: | 
 |       raise ValueError( | 
 |           "target_batch_overhead (%s) must be between 0 and 1" % | 
 |           (target_batch_overhead)) | 
 |     if target_batch_duration_secs and target_batch_duration_secs <= 0: | 
 |       raise ValueError( | 
 |           "target_batch_duration_secs (%s) must be positive" % | 
 |           (target_batch_duration_secs)) | 
 |     if not (target_batch_overhead or target_batch_duration_secs): | 
 |       raise ValueError( | 
 |           "At least one of target_batch_overhead or " | 
 |           "target_batch_duration_secs must be positive.") | 
 |     if ignore_first_n_seen_per_batch_size < 0: | 
 |       raise ValueError( | 
 |           'ignore_first_n_seen_per_batch_size (%s) must be non ' | 
 |           'negative' % (ignore_first_n_seen_per_batch_size)) | 
 |     self._min_batch_size = min_batch_size | 
 |     self._max_batch_size = max_batch_size | 
 |     self._target_batch_overhead = target_batch_overhead | 
 |     self._target_batch_duration_secs = target_batch_duration_secs | 
 |     self._variance = variance | 
 |     self._clock = clock | 
 |     self._data = [] | 
 |     self._ignore_next_timing = False | 
 |     self._ignore_first_n_seen_per_batch_size = ( | 
 |         ignore_first_n_seen_per_batch_size) | 
 |     self._batch_size_num_seen = {} | 
 |     self._replay_last_batch_size = None | 
 |  | 
 |     self._size_distribution = Metrics.distribution( | 
 |         'BatchElements', 'batch_size') | 
 |     self._time_distribution = Metrics.distribution( | 
 |         'BatchElements', 'msec_per_batch') | 
 |     # Beam distributions only accept integer values, so we use this to | 
 |     # accumulate under-reported values until they add up to whole milliseconds. | 
 |     # (Milliseconds are chosen because that's conventionally used elsewhere in | 
 |     # profiling-style counters.) | 
 |     self._remainder_msecs = 0 | 
 |  | 
 |   def ignore_next_timing(self): | 
 |     """Call to indicate the next timing should be ignored. | 
 |  | 
 |     For example, the first emit of a ParDo operation is known to be anomalous | 
 |     due to setup that may occur. | 
 |     """ | 
 |     self._ignore_next_timing = True | 
 |  | 
 |   @contextlib.contextmanager | 
 |   def record_time(self, batch_size): | 
 |     start = self._clock() | 
 |     yield | 
 |     elapsed = self._clock() - start | 
 |     elapsed_msec = 1e3 * elapsed + self._remainder_msecs | 
 |     self._size_distribution.update(batch_size) | 
 |     self._time_distribution.update(int(elapsed_msec)) | 
 |     self._remainder_msecs = elapsed_msec - int(elapsed_msec) | 
 |     # If we ignore the next timing, replay the batch size to get accurate | 
 |     # timing. | 
 |     if self._ignore_next_timing: | 
 |       self._ignore_next_timing = False | 
 |       self._replay_last_batch_size = batch_size | 
 |     else: | 
 |       self._data.append((batch_size, elapsed)) | 
 |       if len(self._data) >= self._MAX_DATA_POINTS: | 
 |         self._thin_data() | 
 |  | 
 |   def _thin_data(self): | 
 |     # Make sure we don't change the parity of len(self._data) | 
 |     # As it's used below to alternate jitter. | 
 |     self._data.pop(random.randrange(len(self._data) // 4)) | 
 |     self._data.pop(random.randrange(len(self._data) // 2)) | 
 |  | 
 |   @staticmethod | 
 |   def linear_regression_no_numpy(xs, ys): | 
 |     # Least squares fit for y = a + bx over all points. | 
 |     n = float(len(xs)) | 
 |     xbar = sum(xs) / n | 
 |     ybar = sum(ys) / n | 
 |     if xbar == 0: | 
 |       return ybar, 0 | 
 |     if all(xs[0] == x for x in xs): | 
 |       # Simply use the mean if all values in xs are same. | 
 |       return 0, ybar / xbar | 
 |     b = ( | 
 |         sum([(x - xbar) * (y - ybar) | 
 |              for x, y in zip(xs, ys)]) / sum([(x - xbar)**2 for x in xs])) | 
 |     a = ybar - b * xbar | 
 |     return a, b | 
 |  | 
 |   @staticmethod | 
 |   def linear_regression_numpy(xs, ys): | 
 |     # pylint: disable=wrong-import-order, wrong-import-position | 
 |     import numpy as np | 
 |     from numpy import sum | 
 |     n = len(xs) | 
 |     if all(xs[0] == x for x in xs): | 
 |       # If all values of xs are same then fallback to linear_regression_no_numpy | 
 |       return _BatchSizeEstimator.linear_regression_no_numpy(xs, ys) | 
 |     xs = np.asarray(xs, dtype=float) | 
 |     ys = np.asarray(ys, dtype=float) | 
 |  | 
 |     # First do a simple least squares fit for y = a + bx over all points. | 
 |     b, a = np.polyfit(xs, ys, 1) | 
 |  | 
 |     if n < 10: | 
 |       return a, b | 
 |     else: | 
 |       # Refine this by throwing out outliers, according to Cook's distance. | 
 |       # https://en.wikipedia.org/wiki/Cook%27s_distance | 
 |       sum_x = sum(xs) | 
 |       sum_x2 = sum(xs**2) | 
 |       errs = a + b * xs - ys | 
 |       s2 = sum(errs**2) / (n - 2) | 
 |       if s2 == 0: | 
 |         # It's an exact fit! | 
 |         return a, b | 
 |       h = (sum_x2 - 2 * sum_x * xs + n * xs**2) / (n * sum_x2 - sum_x**2) | 
 |       cook_ds = 0.5 / s2 * errs**2 * (h / (1 - h)**2) | 
 |  | 
 |       # Re-compute the regression, excluding those points with Cook's distance | 
 |       # greater than 0.5, and weighting by the inverse of x to give a more | 
 |       # stable y-intercept (as small batches have relatively more information | 
 |       # about the fixed overhead). | 
 |       weight = (cook_ds <= 0.5) / xs | 
 |       b, a = np.polyfit(xs, ys, 1, w=weight) | 
 |       return a, b | 
 |  | 
 |   try: | 
 |     # pylint: disable=wrong-import-order, wrong-import-position | 
 |     import numpy as np | 
 |     linear_regression = linear_regression_numpy | 
 |   except ImportError: | 
 |     linear_regression = linear_regression_no_numpy | 
 |  | 
 |   def _calculate_next_batch_size(self): | 
 |     if self._min_batch_size == self._max_batch_size: | 
 |       return self._min_batch_size | 
 |     elif len(self._data) < 1: | 
 |       return self._min_batch_size | 
 |     elif len(self._data) < 2: | 
 |       # Force some variety so we have distinct batch sizes on which to do | 
 |       # linear regression below. | 
 |       return int( | 
 |           max( | 
 |               min( | 
 |                   self._max_batch_size, | 
 |                   self._min_batch_size * self._MAX_GROWTH_FACTOR), | 
 |               self._min_batch_size + 1)) | 
 |  | 
 |     # There tends to be a lot of noise in the top quantile, which also | 
 |     # has outsided influence in the regression.  If we have enough data, | 
 |     # Simply declare the top 20% to be outliers. | 
 |     trimmed_data = sorted(self._data)[:max(20, len(self._data) * 4 // 5)] | 
 |  | 
 |     # Linear regression for y = a + bx, where x is batch size and y is time. | 
 |     xs, ys = zip(*trimmed_data) | 
 |     a, b = self.linear_regression(xs, ys) | 
 |  | 
 |     # Avoid nonsensical or division-by-zero errors below due to noise. | 
 |     a = max(a, 1e-10) | 
 |     b = max(b, 1e-20) | 
 |  | 
 |     last_batch_size = self._data[-1][0] | 
 |     cap = min(last_batch_size * self._MAX_GROWTH_FACTOR, self._max_batch_size) | 
 |  | 
 |     target = self._max_batch_size | 
 |  | 
 |     if self._target_batch_duration_secs: | 
 |       # Solution to a + b*x = self._target_batch_duration_secs. | 
 |       target = min(target, (self._target_batch_duration_secs - a) / b) | 
 |  | 
 |     if self._target_batch_overhead: | 
 |       # Solution to a / (a + b*x) = self._target_batch_overhead. | 
 |       target = min(target, (a / b) * (1 / self._target_batch_overhead - 1)) | 
 |  | 
 |     # Avoid getting stuck at a single batch size (especially the minimal | 
 |     # batch size) which would not allow us to extrapolate to other batch | 
 |     # sizes. | 
 |     # Jitter alternates between 0 and 1. | 
 |     jitter = len(self._data) % 2 | 
 |     # Smear our samples across a range centered at the target. | 
 |     if len(self._data) > 10: | 
 |       target += int(target * self._variance * 2 * (random.random() - .5)) | 
 |  | 
 |     return int(max(self._min_batch_size + jitter, min(target, cap))) | 
 |  | 
 |   def next_batch_size(self): | 
 |     # Check if we should replay a previous batch size due to it not being | 
 |     # recorded. | 
 |     if self._replay_last_batch_size: | 
 |       result = self._replay_last_batch_size | 
 |       self._replay_last_batch_size = None | 
 |     else: | 
 |       result = self._calculate_next_batch_size() | 
 |  | 
 |     seen_count = self._batch_size_num_seen.get(result, 0) + 1 | 
 |     if seen_count <= self._ignore_first_n_seen_per_batch_size: | 
 |       self.ignore_next_timing() | 
 |     self._batch_size_num_seen[result] = seen_count | 
 |     return result | 
 |  | 
 |  | 
 | class _GlobalWindowsBatchingDoFn(DoFn): | 
 |   def __init__(self, batch_size_estimator): | 
 |     self._batch_size_estimator = batch_size_estimator | 
 |  | 
 |   def start_bundle(self): | 
 |     self._batch = [] | 
 |     self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |     # The first emit often involves non-trivial setup. | 
 |     self._batch_size_estimator.ignore_next_timing() | 
 |  | 
 |   def process(self, element): | 
 |     self._batch.append(element) | 
 |     if len(self._batch) >= self._batch_size: | 
 |       with self._batch_size_estimator.record_time(self._batch_size): | 
 |         yield self._batch | 
 |       self._batch = [] | 
 |       self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |  | 
 |   def finish_bundle(self): | 
 |     if self._batch: | 
 |       with self._batch_size_estimator.record_time(self._batch_size): | 
 |         yield window.GlobalWindows.windowed_value(self._batch) | 
 |       self._batch = None | 
 |       self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |  | 
 |  | 
 | class _WindowAwareBatchingDoFn(DoFn): | 
 |  | 
 |   _MAX_LIVE_WINDOWS = 10 | 
 |  | 
 |   def __init__(self, batch_size_estimator): | 
 |     self._batch_size_estimator = batch_size_estimator | 
 |  | 
 |   def start_bundle(self): | 
 |     self._batches = collections.defaultdict(list) | 
 |     self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |     # The first emit often involves non-trivial setup. | 
 |     self._batch_size_estimator.ignore_next_timing() | 
 |  | 
 |   def process(self, element, window=DoFn.WindowParam): | 
 |     self._batches[window].append(element) | 
 |     if len(self._batches[window]) >= self._batch_size: | 
 |       with self._batch_size_estimator.record_time(self._batch_size): | 
 |         yield windowed_value.WindowedValue( | 
 |             self._batches[window], window.max_timestamp(), (window, )) | 
 |       del self._batches[window] | 
 |       self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |     elif len(self._batches) > self._MAX_LIVE_WINDOWS: | 
 |       window, _ = sorted( | 
 |           self._batches.items(), | 
 |           key=lambda window_batch: len(window_batch[1]), | 
 |           reverse=True)[0] | 
 |       with self._batch_size_estimator.record_time(self._batch_size): | 
 |         yield windowed_value.WindowedValue( | 
 |             self._batches[window], window.max_timestamp(), (window, )) | 
 |       del self._batches[window] | 
 |       self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |  | 
 |   def finish_bundle(self): | 
 |     for window, batch in self._batches.items(): | 
 |       if batch: | 
 |         with self._batch_size_estimator.record_time(self._batch_size): | 
 |           yield windowed_value.WindowedValue( | 
 |               batch, window.max_timestamp(), (window, )) | 
 |     self._batches = None | 
 |     self._batch_size = self._batch_size_estimator.next_batch_size() | 
 |  | 
 |  | 
 | @typehints.with_input_types(T) | 
 | @typehints.with_output_types(List[T]) | 
 | class BatchElements(PTransform): | 
 |   """A Transform that batches elements for amortized processing. | 
 |  | 
 |   This transform is designed to precede operations whose processing cost | 
 |   is of the form | 
 |  | 
 |       time = fixed_cost + num_elements * per_element_cost | 
 |  | 
 |   where the per element cost is (often significantly) smaller than the fixed | 
 |   cost and could be amortized over multiple elements.  It consumes a PCollection | 
 |   of element type T and produces a PCollection of element type List[T]. | 
 |  | 
 |   This transform attempts to find the best batch size between the minimim | 
 |   and maximum parameters by profiling the time taken by (fused) downstream | 
 |   operations. For a fixed batch size, set the min and max to be equal. | 
 |  | 
 |   Elements are batched per-window and batches emitted in the window | 
 |   corresponding to its contents. | 
 |  | 
 |   Args: | 
 |     min_batch_size: (optional) the smallest number of elements per batch | 
 |     max_batch_size: (optional) the largest number of elements per batch | 
 |     target_batch_overhead: (optional) a target for fixed_cost / time, | 
 |         as used in the formula above | 
 |     target_batch_duration_secs: (optional) a target for total time per bundle, | 
 |         in seconds | 
 |     variance: (optional) the permitted (relative) amount of deviation from the | 
 |         (estimated) ideal batch size used to produce a wider base for | 
 |         linear interpolation | 
 |     clock: (optional) an alternative to time.time for measuring the cost of | 
 |         donwstream operations (mostly for testing) | 
 |   """ | 
 |   def __init__( | 
 |       self, | 
 |       min_batch_size=1, | 
 |       max_batch_size=10000, | 
 |       target_batch_overhead=.05, | 
 |       target_batch_duration_secs=1, | 
 |       variance=0.25, | 
 |       clock=time.time): | 
 |     self._batch_size_estimator = _BatchSizeEstimator( | 
 |         min_batch_size=min_batch_size, | 
 |         max_batch_size=max_batch_size, | 
 |         target_batch_overhead=target_batch_overhead, | 
 |         target_batch_duration_secs=target_batch_duration_secs, | 
 |         variance=variance, | 
 |         clock=clock) | 
 |  | 
 |   def expand(self, pcoll): | 
 |     if getattr(pcoll.pipeline.runner, 'is_streaming', False): | 
 |       raise NotImplementedError("Requires stateful processing (BEAM-2687)") | 
 |     elif pcoll.windowing.is_default(): | 
 |       # This is the same logic as _GlobalWindowsBatchingDoFn, but optimized | 
 |       # for that simpler case. | 
 |       return pcoll | ParDo( | 
 |           _GlobalWindowsBatchingDoFn(self._batch_size_estimator)) | 
 |     else: | 
 |       return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) | 
 |  | 
 |  | 
 | class _IdentityWindowFn(NonMergingWindowFn): | 
 |   """Windowing function that preserves existing windows. | 
 |  | 
 |   To be used internally with the Reshuffle transform. | 
 |   Will raise an exception when used after DoFns that return TimestampedValue | 
 |   elements. | 
 |   """ | 
 |   def __init__(self, window_coder): | 
 |     """Create a new WindowFn with compatible coder. | 
 |     To be applied to PCollections with windows that are compatible with the | 
 |     given coder. | 
 |  | 
 |     Arguments: | 
 |       window_coder: coders.Coder object to be used on windows. | 
 |     """ | 
 |     super().__init__() | 
 |     if window_coder is None: | 
 |       raise ValueError('window_coder should not be None') | 
 |     self._window_coder = window_coder | 
 |  | 
 |   def assign(self, assign_context): | 
 |     if assign_context.window is None: | 
 |       raise ValueError( | 
 |           'assign_context.window should not be None. ' | 
 |           'This might be due to a DoFn returning a TimestampedValue.') | 
 |     return [assign_context.window] | 
 |  | 
 |   def get_window_coder(self): | 
 |     return self._window_coder | 
 |  | 
 |  | 
 | @typehints.with_input_types(Tuple[K, V]) | 
 | @typehints.with_output_types(Tuple[K, V]) | 
 | class ReshufflePerKey(PTransform): | 
 |   """PTransform that returns a PCollection equivalent to its input, | 
 |   but operationally provides some of the side effects of a GroupByKey, | 
 |   in particular checkpointing, and preventing fusion of the surrounding | 
 |   transforms. | 
 |  | 
 |   ReshufflePerKey is experimental. No backwards compatibility guarantees. | 
 |   """ | 
 |   def expand(self, pcoll): | 
 |     windowing_saved = pcoll.windowing | 
 |     if windowing_saved.is_default(): | 
 |       # In this (common) case we can use a trivial trigger driver | 
 |       # and avoid the (expensive) window param. | 
 |       globally_windowed = window.GlobalWindows.windowed_value(None) | 
 |       MIN_TIMESTAMP = window.MIN_TIMESTAMP | 
 |  | 
 |       def reify_timestamps(element, timestamp=DoFn.TimestampParam): | 
 |         key, value = element | 
 |         if timestamp == MIN_TIMESTAMP: | 
 |           timestamp = None | 
 |         return key, (value, timestamp) | 
 |  | 
 |       def restore_timestamps(element): | 
 |         key, values = element | 
 |         return [ | 
 |             globally_windowed.with_value((key, value)) if timestamp is None else | 
 |             window.GlobalWindows.windowed_value((key, value), timestamp) | 
 |             for (value, timestamp) in values | 
 |         ] | 
 |     else: | 
 |  | 
 |       # typing: All conditional function variants must have identical signatures | 
 |       def reify_timestamps(  # type: ignore[misc] | 
 |           element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): | 
 |         key, value = element | 
 |         # Transport the window as part of the value and restore it later. | 
 |         return key, windowed_value.WindowedValue(value, timestamp, [window]) | 
 |  | 
 |       def restore_timestamps(element): | 
 |         key, windowed_values = element | 
 |         return [wv.with_value((key, wv.value)) for wv in windowed_values] | 
 |  | 
 |     ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) | 
 |  | 
 |     # TODO(BEAM-8104) Using global window as one of the standard window. | 
 |     # This is to mitigate the Dataflow Java Runner Harness limitation to | 
 |     # accept only standard coders. | 
 |     ungrouped._windowing = Windowing( | 
 |         window.GlobalWindows(), | 
 |         triggerfn=Always(), | 
 |         accumulation_mode=AccumulationMode.DISCARDING, | 
 |         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST) | 
 |     result = ( | 
 |         ungrouped | 
 |         | GroupByKey() | 
 |         | FlatMap(restore_timestamps).with_output_types(Any)) | 
 |     result._windowing = windowing_saved | 
 |     return result | 
 |  | 
 |  | 
 | @typehints.with_input_types(T) | 
 | @typehints.with_output_types(T) | 
 | class Reshuffle(PTransform): | 
 |   """PTransform that returns a PCollection equivalent to its input, | 
 |   but operationally provides some of the side effects of a GroupByKey, | 
 |   in particular checkpointing, and preventing fusion of the surrounding | 
 |   transforms. | 
 |  | 
 |   Reshuffle adds a temporary random key to each element, performs a | 
 |   ReshufflePerKey, and finally removes the temporary key. | 
 |  | 
 |   Reshuffle is experimental. No backwards compatibility guarantees. | 
 |   """ | 
 |   def expand(self, pcoll): | 
 |     # type: (pvalue.PValue) -> pvalue.PCollection | 
 |     return ( | 
 |         pcoll | 
 |         | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t)). | 
 |         with_input_types(T).with_output_types(Tuple[int, T]) | 
 |         | ReshufflePerKey() | 
 |         | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( | 
 |             Tuple[int, T]).with_output_types(T)) | 
 |  | 
 |   def to_runner_api_parameter(self, unused_context): | 
 |     # type: (PipelineContext) -> Tuple[str, None] | 
 |     return common_urns.composites.RESHUFFLE.urn, None | 
 |  | 
 |   @staticmethod | 
 |   @PTransform.register_urn(common_urns.composites.RESHUFFLE.urn, None) | 
 |   def from_runner_api_parameter( | 
 |       unused_ptransform, unused_parameter, unused_context): | 
 |     return Reshuffle() | 
 |  | 
 |  | 
 | def fn_takes_side_inputs(fn): | 
 |   try: | 
 |     signature = get_signature(fn) | 
 |   except TypeError: | 
 |     # We can't tell; maybe it does. | 
 |     return True | 
 |  | 
 |   return ( | 
 |       len(signature.parameters) > 1 or any( | 
 |           p.kind == p.VAR_POSITIONAL or p.kind == p.VAR_KEYWORD | 
 |           for p in signature.parameters.values())) | 
 |  | 
 |  | 
 | @ptransform_fn | 
 | def WithKeys(pcoll, k, *args, **kwargs): | 
 |   """PTransform that takes a PCollection, and either a constant key or a | 
 |   callable, and returns a PCollection of (K, V), where each of the values in | 
 |   the input PCollection has been paired with either the constant key or a key | 
 |   computed from the value.  The callable may optionally accept positional or | 
 |   keyword arguments, which should be passed to WithKeys directly.  These may | 
 |   be either SideInputs or static (non-PCollection) values, such as ints. | 
 |   """ | 
 |   if callable(k): | 
 |     if fn_takes_side_inputs(k): | 
 |       if all(isinstance(arg, AsSideInput) | 
 |              for arg in args) and all(isinstance(kwarg, AsSideInput) | 
 |                                       for kwarg in kwargs.values()): | 
 |         return pcoll | Map( | 
 |             lambda v, | 
 |             *args, | 
 |             **kwargs: (k(v, *args, **kwargs), v), | 
 |             *args, | 
 |             **kwargs) | 
 |       return pcoll | Map(lambda v: (k(v, *args, **kwargs), v)) | 
 |     return pcoll | Map(lambda v: (k(v), v)) | 
 |   return pcoll | Map(lambda v: (k, v)) | 
 |  | 
 |  | 
 | @experimental() | 
 | @typehints.with_input_types(Tuple[K, V]) | 
 | @typehints.with_output_types(Tuple[K, Iterable[V]]) | 
 | class GroupIntoBatches(PTransform): | 
 |   """PTransform that batches the input into desired batch size. Elements are | 
 |   buffered until they are equal to batch size provided in the argument at which | 
 |   point they are output to the output Pcollection. | 
 |  | 
 |   Windows are preserved (batches will contain elements from the same window) | 
 |  | 
 |   GroupIntoBatches is experimental. Its use case will depend on the runner if | 
 |   it has support of States and Timers. | 
 |   """ | 
 |   def __init__( | 
 |       self, batch_size, max_buffering_duration_secs=None, clock=time.time): | 
 |     """Create a new GroupIntoBatches. | 
 |  | 
 |     Arguments: | 
 |       batch_size: (required) How many elements should be in a batch | 
 |       max_buffering_duration_secs: (optional) How long in seconds at most an | 
 |         incomplete batch of elements is allowed to be buffered in the states. | 
 |         The duration must be a positive second duration and should be given as | 
 |         an int or float. Setting this parameter to zero effectively means no | 
 |         buffering limit. | 
 |       clock: (optional) an alternative to time.time (mostly for testing) | 
 |     """ | 
 |     self.params = _GroupIntoBatchesParams( | 
 |         batch_size, max_buffering_duration_secs) | 
 |     self.clock = clock | 
 |  | 
 |   def expand(self, pcoll): | 
 |     input_coder = coders.registry.get_coder(pcoll) | 
 |     return pcoll | ParDo( | 
 |         _pardo_group_into_batches( | 
 |             input_coder, | 
 |             self.params.batch_size, | 
 |             self.params.max_buffering_duration_secs, | 
 |             self.clock)) | 
 |  | 
 |   def to_runner_api_parameter( | 
 |       self, | 
 |       unused_context  # type: PipelineContext | 
 |   ):  # type: (...) -> Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] | 
 |     return ( | 
 |         common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn, | 
 |         self.params.get_payload()) | 
 |  | 
 |   @staticmethod | 
 |   @PTransform.register_urn( | 
 |       common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn, | 
 |       beam_runner_api_pb2.GroupIntoBatchesPayload) | 
 |   def from_runner_api_parameter(unused_ptransform, proto, unused_context): | 
 |     return GroupIntoBatches(*_GroupIntoBatchesParams.parse_payload(proto)) | 
 |  | 
 |   @typehints.with_input_types(Tuple[K, V]) | 
 |   @typehints.with_output_types( | 
 |       typehints.Tuple[ | 
 |           ShardedKeyType[typehints.TypeVariable(K)],  # type: ignore[misc] | 
 |           typehints.Iterable[typehints.TypeVariable(V)]]) | 
 |   class WithShardedKey(PTransform): | 
 |     """A GroupIntoBatches transform that outputs batched elements associated | 
 |     with sharded input keys. | 
 |  | 
 |     By default, keys are sharded to such that the input elements with the same | 
 |     key are spread to all available threads executing the transform. Runners may | 
 |     override the default sharding to do a better load balancing during the | 
 |     execution time. | 
 |     """ | 
 |     def __init__( | 
 |         self, batch_size, max_buffering_duration_secs=None, clock=time.time): | 
 |       """Create a new GroupIntoBatches with sharded output. | 
 |       See ``GroupIntoBatches`` transform for a description of input parameters. | 
 |       """ | 
 |       self.params = _GroupIntoBatchesParams( | 
 |           batch_size, max_buffering_duration_secs) | 
 |       self.clock = clock | 
 |  | 
 |     _shard_id_prefix = uuid.uuid4().bytes | 
 |  | 
 |     def expand(self, pcoll): | 
 |       key_type, value_type = pcoll.element_type.tuple_types | 
 |       sharded_pcoll = pcoll | Map( | 
 |           lambda key_value: ( | 
 |               ShardedKey( | 
 |                   key_value[0], | 
 |                   # Use [uuid, thread id] as the shard id. | 
 |                   GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes( | 
 |                       threading.get_ident().to_bytes(8, 'big'))), | 
 |               key_value[1])).with_output_types( | 
 |                   typehints.Tuple[ | 
 |                       ShardedKeyType[key_type],  # type: ignore[misc] | 
 |                       value_type]) | 
 |       return ( | 
 |           sharded_pcoll | 
 |           | GroupIntoBatches( | 
 |               self.params.batch_size, | 
 |               self.params.max_buffering_duration_secs, | 
 |               self.clock)) | 
 |  | 
 |     def to_runner_api_parameter( | 
 |         self, | 
 |         unused_context  # type: PipelineContext | 
 |     ):  # type: (...) -> Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] | 
 |       return ( | 
 |           common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn, | 
 |           self.params.get_payload()) | 
 |  | 
 |     @staticmethod | 
 |     @PTransform.register_urn( | 
 |         common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn, | 
 |         beam_runner_api_pb2.GroupIntoBatchesPayload) | 
 |     def from_runner_api_parameter(unused_ptransform, proto, unused_context): | 
 |       return GroupIntoBatches.WithShardedKey( | 
 |           *_GroupIntoBatchesParams.parse_payload(proto)) | 
 |  | 
 |  | 
 | class _GroupIntoBatchesParams: | 
 |   """This class represents the parameters for | 
 |   :class:`apache_beam.utils.GroupIntoBatches` transform, used to define how | 
 |   elements should be batched. | 
 |   """ | 
 |   def __init__(self, batch_size, max_buffering_duration_secs): | 
 |     self.batch_size = batch_size | 
 |     self.max_buffering_duration_secs = ( | 
 |         0 | 
 |         if max_buffering_duration_secs is None else max_buffering_duration_secs) | 
 |     self._validate() | 
 |  | 
 |   def __eq__(self, other): | 
 |     if other is None or not isinstance(other, _GroupIntoBatchesParams): | 
 |       return False | 
 |     return ( | 
 |         self.batch_size == other.batch_size and | 
 |         self.max_buffering_duration_secs == other.max_buffering_duration_secs) | 
 |  | 
 |   def _validate(self): | 
 |     assert self.batch_size is not None and self.batch_size > 0, ( | 
 |         'batch_size must be a positive value') | 
 |     assert ( | 
 |         self.max_buffering_duration_secs is not None and | 
 |         self.max_buffering_duration_secs >= 0), ( | 
 |             'max_buffering_duration must be a non-negative value') | 
 |  | 
 |   def get_payload(self): | 
 |     return beam_runner_api_pb2.GroupIntoBatchesPayload( | 
 |         batch_size=self.batch_size, | 
 |         max_buffering_duration_millis=int( | 
 |             self.max_buffering_duration_secs * 1000)) | 
 |  | 
 |   @staticmethod | 
 |   def parse_payload( | 
 |       proto  # type: beam_runner_api_pb2.GroupIntoBatchesPayload | 
 |   ): | 
 |     return proto.batch_size, proto.max_buffering_duration_millis / 1000 | 
 |  | 
 |  | 
 | def _pardo_group_into_batches( | 
 |     input_coder, batch_size, max_buffering_duration_secs, clock=time.time): | 
 |   ELEMENT_STATE = BagStateSpec('values', input_coder) | 
 |   COUNT_STATE = CombiningValueStateSpec('count', input_coder, CountCombineFn()) | 
 |   WINDOW_TIMER = TimerSpec('window_end', TimeDomain.WATERMARK) | 
 |   BUFFERING_TIMER = TimerSpec('buffering_end', TimeDomain.REAL_TIME) | 
 |  | 
 |   class _GroupIntoBatchesDoFn(DoFn): | 
 |     def process( | 
 |         self, | 
 |         element, | 
 |         window=DoFn.WindowParam, | 
 |         element_state=DoFn.StateParam(ELEMENT_STATE), | 
 |         count_state=DoFn.StateParam(COUNT_STATE), | 
 |         window_timer=DoFn.TimerParam(WINDOW_TIMER), | 
 |         buffering_timer=DoFn.TimerParam(BUFFERING_TIMER)): | 
 |       # Allowed lateness not supported in Python SDK | 
 |       # https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data | 
 |       window_timer.set(window.end) | 
 |       element_state.add(element) | 
 |       count_state.add(1) | 
 |       count = count_state.read() | 
 |       if count == 1 and max_buffering_duration_secs > 0: | 
 |         # This is the first element in batch. Start counting buffering time if a | 
 |         # limit was set. | 
 |         # pylint: disable=deprecated-method | 
 |         buffering_timer.set(clock() + max_buffering_duration_secs) | 
 |       if count >= batch_size: | 
 |         return self.flush_batch(element_state, count_state, buffering_timer) | 
 |  | 
 |     @on_timer(WINDOW_TIMER) | 
 |     def on_window_timer( | 
 |         self, | 
 |         element_state=DoFn.StateParam(ELEMENT_STATE), | 
 |         count_state=DoFn.StateParam(COUNT_STATE), | 
 |         buffering_timer=DoFn.TimerParam(BUFFERING_TIMER)): | 
 |       return self.flush_batch(element_state, count_state, buffering_timer) | 
 |  | 
 |     @on_timer(BUFFERING_TIMER) | 
 |     def on_buffering_timer( | 
 |         self, | 
 |         element_state=DoFn.StateParam(ELEMENT_STATE), | 
 |         count_state=DoFn.StateParam(COUNT_STATE), | 
 |         buffering_timer=DoFn.TimerParam(BUFFERING_TIMER)): | 
 |       return self.flush_batch(element_state, count_state, buffering_timer) | 
 |  | 
 |     def flush_batch(self, element_state, count_state, buffering_timer): | 
 |       batch = [element for element in element_state.read()] | 
 |       if not batch: | 
 |         return | 
 |       key, _ = batch[0] | 
 |       batch_values = [v for (k, v) in batch] | 
 |       element_state.clear() | 
 |       count_state.clear() | 
 |       buffering_timer.clear() | 
 |       yield key, batch_values | 
 |  | 
 |   return _GroupIntoBatchesDoFn() | 
 |  | 
 |  | 
 | class ToString(object): | 
 |   """ | 
 |   PTransform for converting a PCollection element, KV or PCollection Iterable | 
 |   to string. | 
 |   """ | 
 |  | 
 |   # pylint: disable=invalid-name | 
 |   @staticmethod | 
 |   def Element(): | 
 |     """ | 
 |     Transforms each element of the PCollection to a string. | 
 |     """ | 
 |     return 'ElementToString' >> Map(str) | 
 |  | 
 |   @staticmethod | 
 |   def Iterables(delimiter=None): | 
 |     """ | 
 |     Transforms each item in the iterable of the input of PCollection to a | 
 |     string. There is no trailing delimiter. | 
 |     """ | 
 |     if delimiter is None: | 
 |       delimiter = ',' | 
 |     return ( | 
 |         'IterablesToString' >> | 
 |         Map(lambda xs: delimiter.join(str(x) for x in xs)).with_input_types( | 
 |             Iterable[Any]).with_output_types(str)) | 
 |  | 
 |   # An alias for Iterables. | 
 |   Kvs = Iterables | 
 |  | 
 |  | 
 | class Reify(object): | 
 |   """PTransforms for converting between explicit and implicit form of various | 
 |   Beam values.""" | 
 |   @typehints.with_input_types(T) | 
 |   @typehints.with_output_types(T) | 
 |   class Timestamp(PTransform): | 
 |     """PTransform to wrap a value in a TimestampedValue with it's | 
 |     associated timestamp.""" | 
 |     @staticmethod | 
 |     def add_timestamp_info(element, timestamp=DoFn.TimestampParam): | 
 |       yield TimestampedValue(element, timestamp) | 
 |  | 
 |     def expand(self, pcoll): | 
 |       return pcoll | ParDo(self.add_timestamp_info) | 
 |  | 
 |   @typehints.with_input_types(T) | 
 |   @typehints.with_output_types(T) | 
 |   class Window(PTransform): | 
 |     """PTransform to convert an element in a PCollection into a tuple of | 
 |     (element, timestamp, window), wrapped in a TimestampedValue with it's | 
 |     associated timestamp.""" | 
 |     @staticmethod | 
 |     def add_window_info( | 
 |         element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): | 
 |       yield TimestampedValue((element, timestamp, window), timestamp) | 
 |  | 
 |     def expand(self, pcoll): | 
 |       return pcoll | ParDo(self.add_window_info) | 
 |  | 
 |   @typehints.with_input_types(Tuple[K, V]) | 
 |   @typehints.with_output_types(Tuple[K, V]) | 
 |   class TimestampInValue(PTransform): | 
 |     """PTransform to wrap the Value in a KV pair in a TimestampedValue with | 
 |     the element's associated timestamp.""" | 
 |     @staticmethod | 
 |     def add_timestamp_info(element, timestamp=DoFn.TimestampParam): | 
 |       key, value = element | 
 |       yield (key, TimestampedValue(value, timestamp)) | 
 |  | 
 |     def expand(self, pcoll): | 
 |       return pcoll | ParDo(self.add_timestamp_info) | 
 |  | 
 |   @typehints.with_input_types(Tuple[K, V]) | 
 |   @typehints.with_output_types(Tuple[K, V]) | 
 |   class WindowInValue(PTransform): | 
 |     """PTransform to convert the Value in a KV pair into a tuple of | 
 |     (value, timestamp, window), with the whole element being wrapped inside a | 
 |     TimestampedValue.""" | 
 |     @staticmethod | 
 |     def add_window_info( | 
 |         element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): | 
 |       key, value = element | 
 |       yield TimestampedValue((key, (value, timestamp, window)), timestamp) | 
 |  | 
 |     def expand(self, pcoll): | 
 |       return pcoll | ParDo(self.add_window_info) | 
 |  | 
 |  | 
 | class Regex(object): | 
 |   """ | 
 |   PTransform  to use Regular Expression to process the elements in a | 
 |   PCollection. | 
 |   """ | 
 |  | 
 |   ALL = "__regex_all_groups" | 
 |  | 
 |   @staticmethod | 
 |   def _regex_compile(regex): | 
 |     """Return re.compile if the regex has a string value""" | 
 |     if isinstance(regex, str): | 
 |       regex = re.compile(regex) | 
 |     return regex | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(str) | 
 |   @ptransform_fn | 
 |   def matches(pcoll, regex, group=0): | 
 |     """ | 
 |     Returns the matches (group 0 by default) if zero or more characters at the | 
 |     beginning of string match the regular expression. To match the entire | 
 |     string, add "$" sign at the end of regex expression. | 
 |  | 
 |     Group can be integer value or a string value. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       group: (optional) name/number of the group, it can be integer or a string | 
 |         value. Defaults to 0, meaning the entire matched string will be | 
 |         returned. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |  | 
 |     def _process(element): | 
 |       m = regex.match(element) | 
 |       if m: | 
 |         yield m.group(group) | 
 |  | 
 |     return pcoll | FlatMap(_process) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(List[str]) | 
 |   @ptransform_fn | 
 |   def all_matches(pcoll, regex): | 
 |     """ | 
 |     Returns all matches (groups) if zero or more characters at the beginning | 
 |     of string match the regular expression. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |  | 
 |     def _process(element): | 
 |       m = regex.match(element) | 
 |       if m: | 
 |         yield [m.group(ix) for ix in range(m.lastindex + 1)] | 
 |  | 
 |     return pcoll | FlatMap(_process) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(Tuple[str, str]) | 
 |   @ptransform_fn | 
 |   def matches_kv(pcoll, regex, keyGroup, valueGroup=0): | 
 |     """ | 
 |     Returns the KV pairs if the string matches the regular expression, deriving | 
 |     the key & value from the specified group of the regular expression. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       keyGroup: The Regex group to use as the key. Can be int or str. | 
 |       valueGroup: (optional) Regex group to use the value. Can be int or str. | 
 |         The default value "0" returns entire matched string. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |  | 
 |     def _process(element): | 
 |       match = regex.match(element) | 
 |       if match: | 
 |         yield (match.group(keyGroup), match.group(valueGroup)) | 
 |  | 
 |     return pcoll | FlatMap(_process) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(str) | 
 |   @ptransform_fn | 
 |   def find(pcoll, regex, group=0): | 
 |     """ | 
 |     Returns the matches if a portion of the line matches the Regex. Returns | 
 |     the entire group (group 0 by default). Group can be integer value or a | 
 |     string value. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       group: (optional) name of the group, it can be integer or a string value. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |  | 
 |     def _process(element): | 
 |       r = regex.search(element) | 
 |       if r: | 
 |         yield r.group(group) | 
 |  | 
 |     return pcoll | FlatMap(_process) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(Union[List[str], List[Tuple[str, str]]]) | 
 |   @ptransform_fn | 
 |   def find_all(pcoll, regex, group=0, outputEmpty=True): | 
 |     """ | 
 |     Returns the matches if a portion of the line matches the Regex. By default, | 
 |     list of group 0 will return with empty items. To get all groups, pass the | 
 |     `Regex.ALL` flag in the `group` parameter which returns all the groups in | 
 |     the tuple format. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       group: (optional) name of the group, it can be integer or a string value. | 
 |       outputEmpty: (optional) Should empty be output. True to output empties | 
 |         and false if not. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |  | 
 |     def _process(element): | 
 |       matches = regex.finditer(element) | 
 |       if group == Regex.ALL: | 
 |         yield [(m.group(), m.groups()[0]) for m in matches | 
 |                if outputEmpty or m.groups()[0]] | 
 |       else: | 
 |         yield [m.group(group) for m in matches if outputEmpty or m.group(group)] | 
 |  | 
 |     return pcoll | FlatMap(_process) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(Tuple[str, str]) | 
 |   @ptransform_fn | 
 |   def find_kv(pcoll, regex, keyGroup, valueGroup=0): | 
 |     """ | 
 |     Returns the matches if a portion of the line matches the Regex. Returns the | 
 |     specified groups as the key and value pair. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       keyGroup: The Regex group to use as the key. Can be int or str. | 
 |       valueGroup: (optional) Regex group to use the value. Can be int or str. | 
 |         The default value "0" returns entire matched string. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |  | 
 |     def _process(element): | 
 |       matches = regex.finditer(element) | 
 |       if matches: | 
 |         for match in matches: | 
 |           yield (match.group(keyGroup), match.group(valueGroup)) | 
 |  | 
 |     return pcoll | FlatMap(_process) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(str) | 
 |   @ptransform_fn | 
 |   def replace_all(pcoll, regex, replacement): | 
 |     """ | 
 |     Returns the matches if a portion of the line  matches the regex and | 
 |     replaces all matches with the replacement string. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       replacement: the string to be substituted for each match. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |     return pcoll | Map(lambda elem: regex.sub(replacement, elem)) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(str) | 
 |   @ptransform_fn | 
 |   def replace_first(pcoll, regex, replacement): | 
 |     """ | 
 |     Returns the matches if a portion of the line matches the regex and replaces | 
 |     the first match with the replacement string. | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       replacement: the string to be substituted for each match. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |     return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1)) | 
 |  | 
 |   @staticmethod | 
 |   @typehints.with_input_types(str) | 
 |   @typehints.with_output_types(List[str]) | 
 |   @ptransform_fn | 
 |   def split(pcoll, regex, outputEmpty=False): | 
 |     """ | 
 |     Returns the list string which was splitted on the basis of regular | 
 |     expression. It will not output empty items (by defaults). | 
 |  | 
 |     Args: | 
 |       regex: the regular expression string or (re.compile) pattern. | 
 |       outputEmpty: (optional) Should empty be output. True to output empties | 
 |           and false if not. | 
 |     """ | 
 |     regex = Regex._regex_compile(regex) | 
 |     outputEmpty = bool(outputEmpty) | 
 |  | 
 |     def _process(element): | 
 |       r = regex.split(element) | 
 |       if r and not outputEmpty: | 
 |         r = list(filter(None, r)) | 
 |       yield r | 
 |  | 
 |     return pcoll | FlatMap(_process) |