|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | """Package for SqlTransform and related classes.""" | 
|  |  | 
|  | # pytype: skip-file | 
|  |  | 
|  | import typing | 
|  |  | 
|  | from apache_beam.transforms.external import BeamJarExpansionService | 
|  | from apache_beam.transforms.external import ExternalTransform | 
|  | from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder | 
|  |  | 
|  | __all__ = ['SqlTransform'] | 
|  |  | 
|  | SqlTransformSchema = typing.NamedTuple( | 
|  | 'SqlTransformSchema', [('query', str), ('dialect', typing.Optional[str])]) | 
|  |  | 
|  |  | 
|  | class SqlTransform(ExternalTransform): | 
|  | """A transform that can translate a SQL query into PTransforms. | 
|  |  | 
|  | Input PCollections must have a schema. Currently, there are two ways to define | 
|  | a schema for a PCollection: | 
|  |  | 
|  | 1) Register a `typing.NamedTuple` type to use RowCoder, and specify it as the | 
|  | output type. For example:: | 
|  |  | 
|  | Purchase = typing.NamedTuple('Purchase', | 
|  | [('item_name', unicode), ('price', float)]) | 
|  | coders.registry.register_coder(Purchase, coders.RowCoder) | 
|  | with Pipeline() as p: | 
|  | purchases = (p | beam.io... | 
|  | | beam.Map(..).with_output_types(Purchase)) | 
|  |  | 
|  | 2) Produce `beam.Row` instances. Note this option will fail if Beam is unable | 
|  | to infer data types for any of the fields. For example:: | 
|  |  | 
|  | with Pipeline() as p: | 
|  | purchases = (p | beam.io... | 
|  | | beam.Map(lambda x: beam.Row(item_name=unicode(..), | 
|  | price=float(..)))) | 
|  |  | 
|  | Similarly, the output of SqlTransform is a PCollection with a schema. | 
|  | The columns produced by the query can be accessed as attributes. For example:: | 
|  |  | 
|  | purchases | SqlTransform(\"\"\" | 
|  | SELECT item_name, COUNT(*) AS `count` | 
|  | FROM PCOLLECTION GROUP BY item_name\"\"\") | 
|  | | beam.Map(lambda row: "We've sold %d %ss!" % (row.count, | 
|  | row.item_name)) | 
|  |  | 
|  | Additional examples can be found in | 
|  | `apache_beam.examples.wordcount_xlang_sql`, `apache_beam.examples.sql_taxi`, | 
|  | and `apache_beam.transforms.sql_test`. | 
|  |  | 
|  | For more details about Beam SQL in general see the `Java transform | 
|  | <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/sql/SqlTransform.html>`_, | 
|  | and the `documentation | 
|  | <https://beam.apache.org/documentation/dsls/sql/overview/>`_. | 
|  | """ | 
|  | URN = 'beam:external:java:sql:v1' | 
|  |  | 
|  | def __init__(self, query, dialect=None, expansion_service=None): | 
|  | """ | 
|  | Creates a SqlTransform which will be expanded to Java's SqlTransform. | 
|  | (See class docs). | 
|  | :param query: The SQL query. | 
|  | :param dialect: (optional) The dialect, e.g. use 'zetasql' for ZetaSQL. | 
|  | :param expansion_service: (optional) The URL of the expansion service to use | 
|  | """ | 
|  | expansion_service = expansion_service or BeamJarExpansionService( | 
|  | ':sdks:java:extensions:sql:expansion-service:shadowJar') | 
|  | super().__init__( | 
|  | self.URN, | 
|  | NamedTupleBasedPayloadBuilder( | 
|  | SqlTransformSchema(query=query, dialect=dialect)), | 
|  | expansion_service=expansion_service) |