|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | # cython: profile=True | 
|  |  | 
|  | """Worker operations executor. | 
|  |  | 
|  | For internal use only; no backwards-compatibility guarantees. | 
|  | """ | 
|  |  | 
|  | import sys | 
|  | import traceback | 
|  |  | 
|  | from apache_beam.internal import util | 
|  | from apache_beam.metrics.execution import ScopedMetricsContainer | 
|  | from apache_beam.pvalue import TaggedOutput | 
|  | from apache_beam.transforms import core | 
|  | from apache_beam.transforms.window import TimestampedValue | 
|  | from apache_beam.transforms.window import WindowFn | 
|  | from apache_beam.transforms.window import GlobalWindow | 
|  | from apache_beam.utils.windowed_value import WindowedValue | 
|  |  | 
|  |  | 
|  | class LoggingContext(object): | 
|  | """For internal use only; no backwards-compatibility guarantees.""" | 
|  |  | 
|  | def enter(self): | 
|  | pass | 
|  |  | 
|  | def exit(self): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class Receiver(object): | 
|  | """For internal use only; no backwards-compatibility guarantees. | 
|  |  | 
|  | An object that consumes a WindowedValue. | 
|  |  | 
|  | This class can be efficiently used to pass values between the | 
|  | sdk and worker harnesses. | 
|  | """ | 
|  |  | 
|  | def receive(self, windowed_value): | 
|  | raise NotImplementedError | 
|  |  | 
|  |  | 
|  | class DoFnMethodWrapper(object): | 
|  | """For internal use only; no backwards-compatibility guarantees. | 
|  |  | 
|  | Represents a method of a DoFn object.""" | 
|  |  | 
|  | def __init__(self, do_fn, method_name): | 
|  | """ | 
|  | Initiates a ``DoFnMethodWrapper``. | 
|  |  | 
|  | Args: | 
|  | do_fn: A DoFn object that contains the method. | 
|  | method_name: name of the method as a string. | 
|  | """ | 
|  |  | 
|  | args, _, _, defaults = do_fn.get_function_arguments(method_name) | 
|  | defaults = defaults if defaults else [] | 
|  | method_value = getattr(do_fn, method_name) | 
|  | self.method_value = method_value | 
|  | self.args = args | 
|  | self.defaults = defaults | 
|  |  | 
|  |  | 
|  | class DoFnSignature(object): | 
|  | """Represents the signature of a given ``DoFn`` object. | 
|  |  | 
|  | Signature of a ``DoFn`` provides a view of the properties of a given ``DoFn``. | 
|  | Among other things, this will give an extensible way for for (1) accessing the | 
|  | structure of the ``DoFn`` including methods and method parameters | 
|  | (2) identifying features that a given ``DoFn`` support, for example, whether | 
|  | a given ``DoFn`` is a Splittable ``DoFn`` ( | 
|  | https://s.apache.org/splittable-do-fn) (3) validating a ``DoFn`` based on the | 
|  | feature set offered by it. | 
|  | """ | 
|  |  | 
|  | def __init__(self, do_fn): | 
|  | # We add a property here for all methods defined by Beam DoFn features. | 
|  |  | 
|  | assert isinstance(do_fn, core.DoFn) | 
|  | self.do_fn = do_fn | 
|  |  | 
|  | self.process_method = DoFnMethodWrapper(do_fn, 'process') | 
|  | self.start_bundle_method = DoFnMethodWrapper(do_fn, 'start_bundle') | 
|  | self.finish_bundle_method = DoFnMethodWrapper(do_fn, 'finish_bundle') | 
|  | self._validate() | 
|  |  | 
|  | def _validate(self): | 
|  | self._validate_process() | 
|  | self._validate_bundle_method(self.start_bundle_method) | 
|  | self._validate_bundle_method(self.finish_bundle_method) | 
|  |  | 
|  | def _validate_process(self): | 
|  | """Validate that none of the DoFnParameters are repeated in the function | 
|  | """ | 
|  | for param in core.DoFn.DoFnParams: | 
|  | assert self.process_method.defaults.count(param) <= 1 | 
|  |  | 
|  | def _validate_bundle_method(self, method_wrapper): | 
|  | """Validate that none of the DoFnParameters are used in the function | 
|  | """ | 
|  | for param in core.DoFn.DoFnParams: | 
|  | assert param not in method_wrapper.defaults | 
|  |  | 
|  |  | 
|  | class DoFnInvoker(object): | 
|  | """An abstraction that can be used to execute DoFn methods. | 
|  |  | 
|  | A DoFnInvoker describes a particular way for invoking methods of a DoFn | 
|  | represented by a given DoFnSignature.""" | 
|  |  | 
|  | def __init__(self, output_processor, signature): | 
|  | self.output_processor = output_processor | 
|  | self.signature = signature | 
|  |  | 
|  | @staticmethod | 
|  | def create_invoker( | 
|  | output_processor, | 
|  | signature, context, side_inputs, input_args, input_kwargs): | 
|  | """ Creates a new DoFnInvoker based on given arguments. | 
|  |  | 
|  | Args: | 
|  | signature: a DoFnSignature for the DoFn being invoked. | 
|  | context: Context to be used when invoking the DoFn (deprecated). | 
|  | side_inputs: side inputs to be used when invoking th process method. | 
|  | input_args: arguments to be used when invoking the process method | 
|  | input_kwargs: kwargs to be used when invoking the process method. | 
|  | """ | 
|  | default_arg_values = signature.process_method.defaults | 
|  | use_simple_invoker = ( | 
|  | not side_inputs and not input_args and not input_kwargs and | 
|  | not default_arg_values) | 
|  | if use_simple_invoker: | 
|  | return SimpleInvoker(output_processor, signature) | 
|  | else: | 
|  | return PerWindowInvoker( | 
|  | output_processor, | 
|  | signature, context, side_inputs, input_args, input_kwargs) | 
|  |  | 
|  | def invoke_process(self, windowed_value): | 
|  | """Invokes the DoFn.process() function. | 
|  |  | 
|  | Args: | 
|  | windowed_value: a WindowedValue object that gives the element for which | 
|  | process() method should be invoked along with the window | 
|  | the element belongs to. | 
|  | """ | 
|  | raise NotImplementedError | 
|  |  | 
|  | def invoke_start_bundle(self): | 
|  | """Invokes the DoFn.start_bundle() method. | 
|  | """ | 
|  | self.output_processor.start_bundle_outputs( | 
|  | self.signature.start_bundle_method.method_value()) | 
|  |  | 
|  | def invoke_finish_bundle(self): | 
|  | """Invokes the DoFn.finish_bundle() method. | 
|  | """ | 
|  | self.output_processor.finish_bundle_outputs( | 
|  | self.signature.finish_bundle_method.method_value()) | 
|  |  | 
|  |  | 
|  | class SimpleInvoker(DoFnInvoker): | 
|  | """An invoker that processes elements ignoring windowing information.""" | 
|  |  | 
|  | def __init__(self, output_processor, signature): | 
|  | super(SimpleInvoker, self).__init__(output_processor, signature) | 
|  | self.process_method = signature.process_method.method_value | 
|  |  | 
|  | def invoke_process(self, windowed_value): | 
|  | self.output_processor.process_outputs( | 
|  | windowed_value, self.process_method(windowed_value.value)) | 
|  |  | 
|  |  | 
|  | class PerWindowInvoker(DoFnInvoker): | 
|  | """An invoker that processes elements considering windowing information.""" | 
|  |  | 
|  | def __init__(self, output_processor, signature, context, | 
|  | side_inputs, input_args, input_kwargs): | 
|  | super(PerWindowInvoker, self).__init__(output_processor, signature) | 
|  | self.side_inputs = side_inputs | 
|  | self.context = context | 
|  | self.process_method = signature.process_method.method_value | 
|  | default_arg_values = signature.process_method.defaults | 
|  | self.has_windowed_inputs = ( | 
|  | not all(si.is_globally_windowed() for si in side_inputs) or | 
|  | (core.DoFn.WindowParam in default_arg_values)) | 
|  |  | 
|  | # Try to prepare all the arguments that can just be filled in | 
|  | # without any additional work. in the process function. | 
|  | # Also cache all the placeholders needed in the process function. | 
|  |  | 
|  | # Fill in sideInputs if they are globally windowed | 
|  | global_window = GlobalWindow() | 
|  |  | 
|  | input_args = input_args if input_args else [] | 
|  | input_kwargs = input_kwargs if input_kwargs else {} | 
|  |  | 
|  | if not self.has_windowed_inputs: | 
|  | input_args, input_kwargs = util.insert_values_in_args( | 
|  | input_args, input_kwargs, [si[global_window] for si in side_inputs]) | 
|  |  | 
|  | arguments = signature.process_method.args | 
|  | defaults = signature.process_method.defaults | 
|  |  | 
|  | # Create placeholder for element parameter of DoFn.process() method. | 
|  | self_in_args = int(signature.do_fn.is_process_bounded()) | 
|  |  | 
|  | class ArgPlaceholder(object): | 
|  | def __init__(self, placeholder): | 
|  | self.placeholder = placeholder | 
|  |  | 
|  | if core.DoFn.ElementParam not in default_arg_values: | 
|  | args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args | 
|  | args_with_placeholders = ( | 
|  | [ArgPlaceholder(core.DoFn.ElementParam)] + input_args[:args_to_pick]) | 
|  | else: | 
|  | args_to_pick = len(arguments) - len(defaults) - self_in_args | 
|  | args_with_placeholders = input_args[:args_to_pick] | 
|  |  | 
|  | # Fill the OtherPlaceholders for context, window or timestamp | 
|  | remaining_args_iter = iter(input_args[args_to_pick:]) | 
|  | for a, d in zip(arguments[-len(defaults):], defaults): | 
|  | if d == core.DoFn.ElementParam: | 
|  | args_with_placeholders.append(ArgPlaceholder(d)) | 
|  | elif d == core.DoFn.WindowParam: | 
|  | args_with_placeholders.append(ArgPlaceholder(d)) | 
|  | elif d == core.DoFn.TimestampParam: | 
|  | args_with_placeholders.append(ArgPlaceholder(d)) | 
|  | elif d == core.DoFn.SideInputParam: | 
|  | # If no more args are present then the value must be passed via kwarg | 
|  | try: | 
|  | args_with_placeholders.append(remaining_args_iter.next()) | 
|  | except StopIteration: | 
|  | if a not in input_kwargs: | 
|  | raise ValueError("Value for sideinput %s not provided" % a) | 
|  | else: | 
|  | # If no more args are present then the value must be passed via kwarg | 
|  | try: | 
|  | args_with_placeholders.append(remaining_args_iter.next()) | 
|  | except StopIteration: | 
|  | pass | 
|  | args_with_placeholders.extend(list(remaining_args_iter)) | 
|  |  | 
|  | # Stash the list of placeholder positions for performance | 
|  | self.placeholders = [(i, x.placeholder) for (i, x) in enumerate( | 
|  | args_with_placeholders) | 
|  | if isinstance(x, ArgPlaceholder)] | 
|  |  | 
|  | self.args_for_process = args_with_placeholders | 
|  | self.kwargs_for_process = input_kwargs | 
|  |  | 
|  | def invoke_process(self, windowed_value): | 
|  | self.context.set_element(windowed_value) | 
|  | # Call for the process function for each window if has windowed side inputs | 
|  | # or if the process accesses the window parameter. We can just call it once | 
|  | # otherwise as none of the arguments are changing | 
|  | if self.has_windowed_inputs and len(windowed_value.windows) != 1: | 
|  | for w in windowed_value.windows: | 
|  | self._invoke_per_window( | 
|  | WindowedValue(windowed_value.value, windowed_value.timestamp, (w,))) | 
|  | else: | 
|  | self._invoke_per_window(windowed_value) | 
|  |  | 
|  | def _invoke_per_window(self, windowed_value): | 
|  | if self.has_windowed_inputs: | 
|  | window, = windowed_value.windows | 
|  | args_for_process, kwargs_for_process = util.insert_values_in_args( | 
|  | self.args_for_process, self.kwargs_for_process, | 
|  | [si[window] for si in self.side_inputs]) | 
|  | else: | 
|  | args_for_process, kwargs_for_process = ( | 
|  | self.args_for_process, self.kwargs_for_process) | 
|  | # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == | 
|  | for i, p in self.placeholders: | 
|  | if p == core.DoFn.ElementParam: | 
|  | args_for_process[i] = windowed_value.value | 
|  | elif p == core.DoFn.WindowParam: | 
|  | args_for_process[i] = window | 
|  | elif p == core.DoFn.TimestampParam: | 
|  | args_for_process[i] = windowed_value.timestamp | 
|  |  | 
|  | if kwargs_for_process: | 
|  | self.output_processor.process_outputs( | 
|  | windowed_value, | 
|  | self.process_method(*args_for_process, **kwargs_for_process)) | 
|  | else: | 
|  | self.output_processor.process_outputs( | 
|  | windowed_value, self.process_method(*args_for_process)) | 
|  |  | 
|  |  | 
|  | class DoFnRunner(Receiver): | 
|  | """For internal use only; no backwards-compatibility guarantees. | 
|  |  | 
|  | A helper class for executing ParDo operations. | 
|  | """ | 
|  |  | 
|  | def __init__(self, | 
|  | fn, | 
|  | args, | 
|  | kwargs, | 
|  | side_inputs, | 
|  | windowing, | 
|  | context=None, | 
|  | tagged_receivers=None, | 
|  | logger=None, | 
|  | step_name=None, | 
|  | # Preferred alternative to logger | 
|  | # TODO(robertwb): Remove once all runners are updated. | 
|  | logging_context=None, | 
|  | # Preferred alternative to context | 
|  | # TODO(robertwb): Remove once all runners are updated. | 
|  | state=None, | 
|  | scoped_metrics_container=None): | 
|  | """Initializes a DoFnRunner. | 
|  |  | 
|  | Args: | 
|  | fn: user DoFn to invoke | 
|  | args: positional side input arguments (static and placeholder), if any | 
|  | kwargs: keyword side input arguments (static and placeholder), if any | 
|  | side_inputs: list of sideinput.SideInputMaps for deferred side inputs | 
|  | windowing: windowing properties of the output PCollection(s) | 
|  | context: a DoFnContext to use (deprecated) | 
|  | tagged_receivers: a dict of tag name to Receiver objects | 
|  | logger: a logging module (deprecated) | 
|  | step_name: the name of this step | 
|  | logging_context: a LoggingContext object | 
|  | state: handle for accessing DoFn state | 
|  | scoped_metrics_container: Context switcher for metrics container | 
|  | """ | 
|  | self.scoped_metrics_container = (scoped_metrics_container | 
|  | or ScopedMetricsContainer()) | 
|  | self.step_name = step_name | 
|  |  | 
|  | # Need to support multiple iterations. | 
|  | side_inputs = list(side_inputs) | 
|  |  | 
|  | if logging_context: | 
|  | self.logging_context = logging_context | 
|  | else: | 
|  | self.logging_context = get_logging_context(logger, step_name=step_name) | 
|  |  | 
|  | # TODO(sourabh): Deprecate the use of context | 
|  | if state: | 
|  | assert context is None | 
|  | context = DoFnContext(step_name, state=state) | 
|  | else: | 
|  | assert context is not None | 
|  | context = context | 
|  |  | 
|  | self.context = context | 
|  |  | 
|  | do_fn_signature = DoFnSignature(fn) | 
|  |  | 
|  | # Optimize for the common case. | 
|  | main_receivers = as_receiver(tagged_receivers[None]) | 
|  | output_processor = _OutputProcessor( | 
|  | windowing.windowfn, main_receivers, tagged_receivers) | 
|  |  | 
|  | self.do_fn_invoker = DoFnInvoker.create_invoker( | 
|  | output_processor, do_fn_signature, context, side_inputs, args, kwargs) | 
|  |  | 
|  | def receive(self, windowed_value): | 
|  | self.process(windowed_value) | 
|  |  | 
|  | def process(self, windowed_value): | 
|  | try: | 
|  | self.logging_context.enter() | 
|  | self.scoped_metrics_container.enter() | 
|  | self.do_fn_invoker.invoke_process(windowed_value) | 
|  | except BaseException as exn: | 
|  | self._reraise_augmented(exn) | 
|  | finally: | 
|  | self.scoped_metrics_container.exit() | 
|  | self.logging_context.exit() | 
|  |  | 
|  | def _invoke_bundle_method(self, bundle_method): | 
|  | try: | 
|  | self.logging_context.enter() | 
|  | self.scoped_metrics_container.enter() | 
|  | self.context.set_element(None) | 
|  | bundle_method() | 
|  | except BaseException as exn: | 
|  | self._reraise_augmented(exn) | 
|  | finally: | 
|  | self.scoped_metrics_container.exit() | 
|  | self.logging_context.exit() | 
|  |  | 
|  | def start(self): | 
|  | self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle) | 
|  |  | 
|  | def finish(self): | 
|  | self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle) | 
|  |  | 
|  | def _reraise_augmented(self, exn): | 
|  | if getattr(exn, '_tagged_with_step', False) or not self.step_name: | 
|  | raise | 
|  | step_annotation = " [while running '%s']" % self.step_name | 
|  | # To emulate exception chaining (not available in Python 2). | 
|  | original_traceback = sys.exc_info()[2] | 
|  | try: | 
|  | # Attempt to construct the same kind of exception | 
|  | # with an augmented message. | 
|  | new_exn = type(exn)(exn.args[0] + step_annotation, *exn.args[1:]) | 
|  | new_exn._tagged_with_step = True  # Could raise attribute error. | 
|  | except:  # pylint: disable=bare-except | 
|  | # If anything goes wrong, construct a RuntimeError whose message | 
|  | # records the original exception's type and message. | 
|  | new_exn = RuntimeError( | 
|  | traceback.format_exception_only(type(exn), exn)[-1].strip() | 
|  | + step_annotation) | 
|  | new_exn._tagged_with_step = True | 
|  | raise new_exn, None, original_traceback | 
|  |  | 
|  |  | 
|  | class _OutputProcessor(object): | 
|  | """Processes output produced by DoFn method invocations.""" | 
|  |  | 
|  | def __init__(self, window_fn, main_receivers, tagged_receivers): | 
|  | """Initializes ``_OutputProcessor``. | 
|  |  | 
|  | Args: | 
|  | window_fn: a windowing function (WindowFn). | 
|  | main_receivers: a dict of tag name to Receiver objects. | 
|  | tagged_receivers: main receiver object. | 
|  | """ | 
|  | self.window_fn = window_fn | 
|  | self.main_receivers = main_receivers | 
|  | self.tagged_receivers = tagged_receivers | 
|  |  | 
|  | def process_outputs(self, windowed_input_element, results): | 
|  | """Dispatch the result of process computation to the appropriate receivers. | 
|  |  | 
|  | A value wrapped in a TaggedOutput object will be unwrapped and | 
|  | then dispatched to the appropriate indexed output. | 
|  | """ | 
|  | if results is None: | 
|  | return | 
|  |  | 
|  | for result in results: | 
|  | tag = None | 
|  | if isinstance(result, TaggedOutput): | 
|  | tag = result.tag | 
|  | if not isinstance(tag, basestring): | 
|  | raise TypeError('In %s, tag %s is not a string' % (self, tag)) | 
|  | result = result.value | 
|  | if isinstance(result, WindowedValue): | 
|  | windowed_value = result | 
|  | if (windowed_input_element is not None | 
|  | and len(windowed_input_element.windows) != 1): | 
|  | windowed_value.windows *= len(windowed_input_element.windows) | 
|  | elif isinstance(result, TimestampedValue): | 
|  | assign_context = WindowFn.AssignContext(result.timestamp, result.value) | 
|  | windowed_value = WindowedValue( | 
|  | result.value, result.timestamp, | 
|  | self.window_fn.assign(assign_context)) | 
|  | if len(windowed_input_element.windows) != 1: | 
|  | windowed_value.windows *= len(windowed_input_element.windows) | 
|  | else: | 
|  | windowed_value = windowed_input_element.with_value(result) | 
|  | if tag is None: | 
|  | self.main_receivers.receive(windowed_value) | 
|  | else: | 
|  | self.tagged_receivers[tag].output(windowed_value) | 
|  |  | 
|  | def start_bundle_outputs(self, results): | 
|  | """Validate that start_bundle does not output any elements""" | 
|  | if results is None: | 
|  | return | 
|  | raise RuntimeError( | 
|  | 'Start Bundle should not output any elements but got %s' % results) | 
|  |  | 
|  | def finish_bundle_outputs(self, results): | 
|  | """Dispatch the result of finish_bundle to the appropriate receivers. | 
|  |  | 
|  | A value wrapped in a TaggedOutput object will be unwrapped and | 
|  | then dispatched to the appropriate indexed output. | 
|  | """ | 
|  | if results is None: | 
|  | return | 
|  |  | 
|  | for result in results: | 
|  | tag = None | 
|  | if isinstance(result, TaggedOutput): | 
|  | tag = result.tag | 
|  | if not isinstance(tag, basestring): | 
|  | raise TypeError('In %s, tag %s is not a string' % (self, tag)) | 
|  | result = result.value | 
|  |  | 
|  | if isinstance(result, WindowedValue): | 
|  | windowed_value = result | 
|  | else: | 
|  | raise RuntimeError('Finish Bundle should only output WindowedValue ' +\ | 
|  | 'type but got %s' % type(result)) | 
|  |  | 
|  | if tag is None: | 
|  | self.main_receivers.receive(windowed_value) | 
|  | else: | 
|  | self.tagged_receivers[tag].output(windowed_value) | 
|  |  | 
|  |  | 
|  | class _NoContext(WindowFn.AssignContext): | 
|  | """An uninspectable WindowFn.AssignContext.""" | 
|  | NO_VALUE = object() | 
|  |  | 
|  | def __init__(self, value, timestamp=NO_VALUE): | 
|  | self.value = value | 
|  | self._timestamp = timestamp | 
|  |  | 
|  | @property | 
|  | def timestamp(self): | 
|  | if self._timestamp is self.NO_VALUE: | 
|  | raise ValueError('No timestamp in this context.') | 
|  | else: | 
|  | return self._timestamp | 
|  |  | 
|  | @property | 
|  | def existing_windows(self): | 
|  | raise ValueError('No existing_windows in this context.') | 
|  |  | 
|  |  | 
|  | class DoFnState(object): | 
|  | """For internal use only; no backwards-compatibility guarantees. | 
|  |  | 
|  | Keeps track of state that DoFns want, currently, user counters. | 
|  | """ | 
|  |  | 
|  | def __init__(self, counter_factory): | 
|  | self.step_name = '' | 
|  | self._counter_factory = counter_factory | 
|  |  | 
|  | def counter_for(self, aggregator): | 
|  | """Looks up the counter for this aggregator, creating one if necessary.""" | 
|  | return self._counter_factory.get_aggregator_counter( | 
|  | self.step_name, aggregator) | 
|  |  | 
|  |  | 
|  | # TODO(robertwb): Replace core.DoFnContext with this. | 
|  | class DoFnContext(object): | 
|  | """For internal use only; no backwards-compatibility guarantees.""" | 
|  |  | 
|  | def __init__(self, label, element=None, state=None): | 
|  | self.label = label | 
|  | self.state = state | 
|  | if element is not None: | 
|  | self.set_element(element) | 
|  |  | 
|  | def set_element(self, windowed_value): | 
|  | self.windowed_value = windowed_value | 
|  |  | 
|  | @property | 
|  | def element(self): | 
|  | if self.windowed_value is None: | 
|  | raise AttributeError('element not accessible in this context') | 
|  | else: | 
|  | return self.windowed_value.value | 
|  |  | 
|  | @property | 
|  | def timestamp(self): | 
|  | if self.windowed_value is None: | 
|  | raise AttributeError('timestamp not accessible in this context') | 
|  | else: | 
|  | return self.windowed_value.timestamp | 
|  |  | 
|  | @property | 
|  | def windows(self): | 
|  | if self.windowed_value is None: | 
|  | raise AttributeError('windows not accessible in this context') | 
|  | else: | 
|  | return self.windowed_value.windows | 
|  |  | 
|  |  | 
|  | # TODO(robertwb): Remove all these adapters once service is updated out. | 
|  | class _LoggingContextAdapter(LoggingContext): | 
|  |  | 
|  | def __init__(self, underlying): | 
|  | self.underlying = underlying | 
|  |  | 
|  | def enter(self): | 
|  | self.underlying.enter() | 
|  |  | 
|  | def exit(self): | 
|  | self.underlying.exit() | 
|  |  | 
|  |  | 
|  | def get_logging_context(maybe_logger, **kwargs): | 
|  | if maybe_logger: | 
|  | maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs) | 
|  | if isinstance(maybe_context, LoggingContext): | 
|  | return maybe_context | 
|  | return _LoggingContextAdapter(maybe_context) | 
|  | return LoggingContext() | 
|  |  | 
|  |  | 
|  | class _ReceiverAdapter(Receiver): | 
|  |  | 
|  | def __init__(self, underlying): | 
|  | self.underlying = underlying | 
|  |  | 
|  | def receive(self, windowed_value): | 
|  | self.underlying.output(windowed_value) | 
|  |  | 
|  |  | 
|  | def as_receiver(maybe_receiver): | 
|  | """For internal use only; no backwards-compatibility guarantees.""" | 
|  |  | 
|  | if isinstance(maybe_receiver, Receiver): | 
|  | return maybe_receiver | 
|  | return _ReceiverAdapter(maybe_receiver) |