blob: 66e9de058f1a1ea32a33040c6cb327cf31e8f461 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import inspect
from typing import NamedTuple
import apache_beam as beam
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.yaml.yaml_utils import SafeLineLoader
class ErrorHandlingConfig(NamedTuple):
"""This option specifies whether and where to output error rows.
Args:
output (str): Name to use for the output error collection
"""
output: str
# TODO: Other parameters are valid here too, but not common to Java.
def exception_handling_args(error_handling_spec):
if error_handling_spec:
# error_handling_spec may have come from a yaml file and have metadata.
clean_spec = SafeLineLoader.strip_metadata(error_handling_spec)
return {
'dead_letter_tag' if k == 'output' else k: v
for (k, v) in clean_spec.items()
}
else:
return None
def map_errors_to_standard_format(input_type):
# TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
return beam.Map(
lambda x: beam.Row(
element=x[0], msg=str(x[1][1]), stack=''.join(x[1][2]))
).with_output_types(
RowTypeConstraint.from_fields([("element", input_type), ("msg", str),
("stack", str)]))
def maybe_with_exception_handling(inner_expand):
def expand(self, pcoll):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, self._exception_handling_args)
return inner_expand(self, wrapped_pcoll).as_result(
map_errors_to_standard_format(pcoll.element_type))
return expand
def maybe_with_exception_handling_transform_fn(transform_fn):
@functools.wraps(transform_fn)
def expand(pcoll, error_handling=None, **kwargs):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, exception_handling_args(error_handling))
return transform_fn(wrapped_pcoll, **kwargs).as_result(
map_errors_to_standard_format(pcoll.element_type))
original_signature = inspect.signature(transform_fn)
new_parameters = list(original_signature.parameters.values())
error_handling_param = inspect.Parameter(
'error_handling',
inspect.Parameter.KEYWORD_ONLY,
default=None,
annotation=ErrorHandlingConfig)
if new_parameters[-1].kind == inspect.Parameter.VAR_KEYWORD:
new_parameters.insert(-1, error_handling_param)
else:
new_parameters.append(error_handling_param)
expand.__signature__ = original_signature.replace(parameters=new_parameters)
return expand