| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Ptransform overrides for DataflowRunner.""" |
| |
| # pytype: skip-file |
| |
| from apache_beam.pipeline import PTransformOverride |
| |
| |
| class NativeReadPTransformOverride(PTransformOverride): |
| """A ``PTransformOverride`` for ``Read`` using native sources. |
| |
| The DataflowRunner expects that the Read PTransform using native sources act |
| as a primitive. So this override replaces the Read with a primitive. |
| """ |
| def matches(self, applied_ptransform): |
| # Imported here to avoid circular dependencies. |
| # pylint: disable=wrong-import-order, wrong-import-position |
| from apache_beam.io import Read |
| |
| # Consider the native Read to be a primitive for Dataflow by replacing. |
| return ( |
| isinstance(applied_ptransform.transform, Read) and |
| not getattr(applied_ptransform.transform, 'override', False) and |
| hasattr(applied_ptransform.transform.source, 'format')) |
| |
| def get_replacement_transform(self, ptransform): |
| # Imported here to avoid circular dependencies. |
| # pylint: disable=wrong-import-order, wrong-import-position |
| from apache_beam import pvalue |
| from apache_beam.io import iobase |
| |
| # This is purposely subclassed from the Read transform to take advantage of |
| # the existing windowing, typing, and display data. |
| class Read(iobase.Read): |
| override = True |
| |
| def expand(self, pbegin): |
| return pvalue.PCollection.from_(pbegin) |
| |
| # Use the source's coder type hint as this replacement's output. Otherwise, |
| # the typing information is not properly forwarded to the DataflowRunner and |
| # will choose the incorrect coder for this transform. |
| return Read(ptransform.source).with_output_types( |
| ptransform.source.coder.to_type_hint()) |