blob: 03b6cee417d438b841ef67bba2790179bcd3834b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Type coders registration.
This module contains functionality to define and use coders for custom classes.
Let's say we have a class Xyz and we are processing a PCollection with elements
of type Xyz. If we do not register a coder for Xyz, a default pickle-based
fallback coder will be used. This can be undesirable for two reasons. First, we
may want a faster coder or a more space efficient one. Second, the pickle-based
coder is not deterministic in the sense that objects like dictionaries or sets
are not guaranteed to be encoded in the same way every time (elements are not
really ordered).
Two (sometimes three) steps are needed to define and use a custom coder:
- define the coder class
- associate the code with the class (a.k.a. coder registration)
- typehint DoFns or transforms with the new class or composite types using
the class.
A coder class is defined by subclassing from CoderBase and defining the
encode_to_bytes and decode_from_bytes methods. The framework uses duck-typing
for coders so it is not strictly required to subclass from CoderBase as long as
the encode/decode methods are defined.
Registering a coder class is made with a register_coder() call::
from apache_beam import coders
...
coders.registry.register_coder(Xyz, XyzCoder)
Additionally, DoFns and PTransforms may need type hints. This is not always
necessary since there is functionality to infer the return types of DoFns by
analyzing the code. For instance, for the function below the return type of
'Xyz' will be inferred::
def MakeXyzs(v):
return Xyz(v)
If Xyz is inferred then its coder will be used whenever the framework needs to
serialize data (e.g., writing to the shuffler subsystem responsible for group by
key operations). If a typehint is needed it can be specified by decorating the
DoFns or using with_input_types/with_output_types methods on PTransforms. For
example, the above function can be decorated::
@with_output_types(Xyz)
def MakeXyzs(v):
return complex_operation_returning_Xyz(v)
See apache_beam.typehints.decorators module for more details.
"""
# pytype: skip-file
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Type
from apache_beam.coders import coders
from apache_beam.typehints import typehints
__all__ = ['registry']
class CoderRegistry(object):
"""A coder registry for typehint/coder associations."""
def __init__(self, fallback_coder=None):
self._coders = {} # type: Dict[Any, Type[coders.Coder]]
self.custom_types = [] # type: List[Any]
self.register_standard_coders(fallback_coder)
def register_standard_coders(self, fallback_coder):
"""Register coders for all basic and composite types."""
# Coders without subclasses.
self._register_coder_internal(int, coders.VarIntCoder)
self._register_coder_internal(float, coders.FloatCoder)
self._register_coder_internal(bytes, coders.BytesCoder)
self._register_coder_internal(bool, coders.BooleanCoder)
self._register_coder_internal(str, coders.StrUtf8Coder)
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
self._register_coder_internal(typehints.DictConstraint, coders.MapCoder)
# Default fallback coders applied in that order until the first matching
# coder found.
default_fallback_coders = [coders.ProtoCoder, coders.FastPrimitivesCoder]
self._fallback_coder = fallback_coder or FirstOf(default_fallback_coders)
def register_fallback_coder(self, fallback_coder):
self._fallback_coder = FirstOf([fallback_coder, self._fallback_coder])
def _register_coder_internal(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
self._coders[typehint_type] = typehint_coder_class
def register_coder(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
if not isinstance(typehint_coder_class, type):
raise TypeError(
'Coder registration requires a coder class object. '
'Received %r instead.' % typehint_coder_class)
if typehint_type not in self.custom_types:
self.custom_types.append(typehint_type)
self._register_coder_internal(typehint_type, typehint_coder_class)
def get_coder(self, typehint):
# type: (Any) -> coders.Coder
coder = self._coders.get(
typehint.__class__
if isinstance(typehint, typehints.TypeConstraint) else typehint,
None)
if isinstance(typehint, typehints.TypeConstraint) and coder is not None:
return coder.from_type_hint(typehint, self)
if coder is None:
# We use the fallback coder when there is no coder registered for a
# typehint. For example a user defined class with no coder specified.
if not hasattr(self, '_fallback_coder'):
raise RuntimeError(
'Coder registry has no fallback coder. This can happen if the '
'fast_coders module could not be imported.')
if isinstance(typehint, typehints.IterableTypeConstraint):
return coders.IterableCoder.from_type_hint(typehint, self)
elif isinstance(typehint, typehints.ListConstraint):
return coders.ListCoder.from_type_hint(typehint, self)
elif typehint is None:
# In some old code, None is used for Any.
# TODO(robertwb): Clean this up.
pass
elif typehint is object or typehint == typehints.Any:
# We explicitly want the fallback coder.
pass
elif isinstance(typehint, typehints.TypeVariable):
# TODO(robertwb): Clean this up when type inference is fully enabled.
pass
else:
# TODO(robertwb): Re-enable this warning when it's actionable.
# warnings.warn('Using fallback coder for typehint: %r.' % typehint)
pass
coder = self._fallback_coder
return coder.from_type_hint(typehint, self)
def get_custom_type_coder_tuples(self, types):
"""Returns type/coder tuples for all custom types passed in."""
return [(t, self._coders[t]) for t in types if t in self.custom_types]
def verify_deterministic(self, key_coder, op_name, silent=True):
if not key_coder.is_deterministic():
error_msg = (
'The key coder "%s" for %s '
'is not deterministic. This may result in incorrect '
'pipeline output. This can be fixed by adding a type '
'hint to the operation preceding the GroupByKey step, '
'and for custom key classes, by writing a '
'deterministic custom Coder. Please see the '
'documentation for more details.' % (key_coder, op_name))
return key_coder.as_deterministic_coder(op_name, error_msg)
else:
return key_coder
class FirstOf(object):
"""For internal use only; no backwards-compatibility guarantees.
A class used to get the first matching coder from a list of coders."""
def __init__(self, coders):
# type: (Iterable[Type[coders.Coder]]) -> None
self._coders = coders
def from_type_hint(self, typehint, registry):
messages = []
for coder in self._coders:
try:
return coder.from_type_hint(typehint, registry)
except Exception as e:
msg = (
'%s could not provide a Coder for type %s: %s' %
(coder, typehint, e))
messages.append(msg)
raise ValueError(
'Cannot provide coder for %s: %s' % (typehint, ';'.join(messages)))
registry = CoderRegistry()