| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """PTransforms for supporting Jdbc in Python pipelines. |
| |
| These transforms are currently supported by Beam portable |
| Flink, Spark, and Dataflow v2 runners. |
| |
| **Setup** |
| |
| Transforms provided in this module are cross-language transforms |
| implemented in the Beam Java SDK. During the pipeline construction, Python SDK |
| will connect to a Java expansion service to expand these transforms. |
| To facilitate this, a small amount of setup is needed before using these |
| transforms in a Beam Python pipeline. |
| |
| There are several ways to setup cross-language Jdbc transforms. |
| |
| * Option 1: use the default expansion service |
| * Option 2: specify a custom expansion service |
| |
| See below for details regarding each of these options. |
| |
| *Option 1: Use the default expansion service* |
| |
| This is the recommended and easiest setup option for using Python Jdbc |
| transforms. This option is only available for Beam 2.24.0 and later. |
| |
| This option requires following pre-requisites before running the Beam |
| pipeline. |
| |
| * Install Java runtime in the computer from where the pipeline is constructed |
| and make sure that 'java' command is available. |
| |
| In this option, Python SDK will either download (for released Beam version) or |
| build (when running from a Beam Git clone) a expansion service jar and use |
| that to expand transforms. Currently Jdbc transforms use the |
| 'beam-sdks-java-io-expansion-service' jar for this purpose. |
| |
| The transforms in this file support an extra `classpath` argument, where one |
| can specify any extra JARs to be included in the classpath for the expansion |
| service. Users will need to specify this option to include the JDBC driver |
| for the database that you're trying to use. **By default, a Postgres JDBC |
| driver** is included (i.e. the Java package |
| `"org.postgresql:postgresql:42.2.16"`). |
| |
| *Option 2: specify a custom expansion service* |
| |
| In this option, you startup your own expansion service and provide that as |
| a parameter when using the transforms provided in this module. |
| |
| This option requires following pre-requisites before running the Beam |
| pipeline. |
| |
| * Startup your own expansion service. |
| * Update your pipeline to provide the expansion service address when |
| initiating Jdbc transforms provided in this module. |
| |
| Flink Users can use the built-in Expansion Service of the Flink Runner's |
| Job Server. If you start Flink's Job Server, the expansion service will be |
| started on port 8097. For a different address, please set the |
| expansion_service parameter. |
| |
| **More information** |
| |
| For more information regarding cross-language transforms see: |
| - https://beam.apache.org/roadmap/portability/ |
| |
| For more information specific to Flink runner see: |
| - https://beam.apache.org/documentation/runners/flink/ |
| """ |
| |
| # pytype: skip-file |
| |
| import contextlib |
| import typing |
| |
| import numpy as np |
| |
| from apache_beam.coders import RowCoder |
| from apache_beam.transforms.external import BeamJarExpansionService |
| from apache_beam.transforms.external import ExternalTransform |
| from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder |
| from apache_beam.typehints.schemas import JdbcDateType # pylint: disable=unused-import |
| from apache_beam.typehints.schemas import JdbcTimeType # pylint: disable=unused-import |
| from apache_beam.typehints.schemas import LogicalType |
| from apache_beam.typehints.schemas import MillisInstant |
| from apache_beam.typehints.schemas import typing_to_runner_api |
| |
| __all__ = [ |
| 'WriteToJdbc', |
| 'ReadFromJdbc', |
| ] |
| |
| |
| def default_io_expansion_service(classpath=None): |
| return BeamJarExpansionService( |
| ':sdks:java:extensions:schemaio-expansion-service:shadowJar', |
| classpath=classpath) |
| |
| |
| JdbcConfigSchema = typing.NamedTuple( |
| 'JdbcConfigSchema', |
| [('location', str), ('config', bytes), |
| ('dataSchema', typing.Optional[bytes])], |
| ) |
| |
| Config = typing.NamedTuple( |
| 'Config', |
| [('driver_class_name', str), ('jdbc_url', str), ('username', str), |
| ('password', str), ('connection_properties', typing.Optional[str]), |
| ('connection_init_sqls', typing.Optional[typing.List[str]]), |
| ('read_query', typing.Optional[str]), |
| ('write_statement', typing.Optional[str]), |
| ('fetch_size', typing.Optional[np.int16]), |
| ('disable_autocommit', typing.Optional[bool]), |
| ('output_parallelization', typing.Optional[bool]), |
| ('autosharding', typing.Optional[bool]), |
| ('partition_column', typing.Optional[str]), |
| ('partitions', typing.Optional[np.int16]), |
| ('max_connections', typing.Optional[np.int16]), |
| ('driver_jars', typing.Optional[str]), |
| ('write_batch_size', typing.Optional[np.int64])], |
| ) |
| |
| DEFAULT_JDBC_CLASSPATH = ['org.postgresql:postgresql:42.2.16'] |
| |
| |
| class WriteToJdbc(ExternalTransform): |
| """A PTransform which writes Rows to the specified database via JDBC. |
| |
| This transform receives Rows defined as NamedTuple type and registered in |
| the coders registry, e.g.:: |
| |
| ExampleRow = typing.NamedTuple('ExampleRow', |
| [('id', int), ('name', unicode)]) |
| coders.registry.register_coder(ExampleRow, coders.RowCoder) |
| |
| with TestPipeline() as p: |
| _ = ( |
| p |
| | beam.Create([ExampleRow(1, 'abc')]) |
| .with_output_types(ExampleRow) |
| | 'Write to jdbc' >> WriteToJdbc( |
| table_name='jdbc_external_test_write' |
| driver_class_name='org.postgresql.Driver', |
| jdbc_url='jdbc:postgresql://localhost:5432/example', |
| username='postgres', |
| password='postgres', |
| )) |
| |
| table_name is a required paramater, and by default, the write_statement is |
| generated from it. |
| |
| The generated write_statement can be overridden by passing in a |
| write_statment. |
| |
| |
| Experimental; no backwards compatibility guarantees. |
| """ |
| |
| URN = 'beam:transform:org.apache.beam:schemaio_jdbc_write:v1' |
| |
| def __init__( |
| self, |
| table_name, |
| driver_class_name, |
| jdbc_url, |
| username, |
| password, |
| statement=None, |
| connection_properties=None, |
| connection_init_sqls=None, |
| autosharding=False, |
| max_connections=None, |
| driver_jars=None, |
| expansion_service=None, |
| classpath=None, |
| write_batch_size=None, |
| ): |
| """ |
| Initializes a write operation to Jdbc. |
| |
| :param driver_class_name: name of the jdbc driver class |
| :param jdbc_url: full jdbc url to the database. |
| :param username: database username |
| :param password: database password |
| :param statement: sql statement to be executed |
| :param connection_properties: properties of the jdbc connection |
| passed as string with format |
| [propertyName=property;]* |
| :param connection_init_sqls: required only for MySql and MariaDB. |
| passed as list of strings |
| :param autosharding: enable automatic re-sharding of bundles to scale the |
| number of shards with the number of workers. |
| :param max_connections: sets the maximum total number of connections. |
| use a negative value for no limit. |
| :param driver_jars: comma separated paths for JDBC drivers. if not |
| specified, the default classloader is used to load the |
| driver jars. |
| :param expansion_service: The address (host:port) of the ExpansionService. |
| :param classpath: A list of JARs or Java packages to include in the |
| classpath for the expansion service. This option is |
| usually needed for `jdbc` to include extra JDBC driver |
| packages. |
| The packages can be in these three formats: (1) A local |
| file, (2) A URL, (3) A gradle-style identifier of a Maven |
| package (e.g. "org.postgresql:postgresql:42.3.1"). |
| By default, this argument includes a Postgres SQL JDBC |
| driver. |
| :param write_batch_size: sets the maximum size in number of SQL statement |
| for the batch. |
| default is {@link JdbcIO.DEFAULT_BATCH_SIZE} |
| """ |
| classpath = classpath or DEFAULT_JDBC_CLASSPATH |
| super().__init__( |
| self.URN, |
| NamedTupleBasedPayloadBuilder( |
| JdbcConfigSchema( |
| location=table_name, |
| config=RowCoder( |
| typing_to_runner_api(Config).row_type.schema).encode( |
| Config( |
| driver_class_name=driver_class_name, |
| jdbc_url=jdbc_url, |
| username=username, |
| password=password, |
| connection_properties=connection_properties, |
| connection_init_sqls=connection_init_sqls, |
| write_statement=statement, |
| write_batch_size=write_batch_size, |
| read_query=None, |
| fetch_size=None, |
| disable_autocommit=None, |
| output_parallelization=None, |
| autosharding=autosharding, |
| max_connections=max_connections, |
| driver_jars=driver_jars, |
| partitions=None, |
| partition_column=None)), |
| dataSchema=None), |
| ), |
| expansion_service or default_io_expansion_service(classpath), |
| ) |
| |
| |
| @contextlib.contextmanager |
| def enforce_millis_instant_for_timestamp(): |
| old_registry = LogicalType._known_logical_types |
| LogicalType._known_logical_types = old_registry.copy() |
| try: |
| LogicalType.register_logical_type(MillisInstant) |
| yield |
| finally: |
| LogicalType._known_logical_types = old_registry |
| |
| |
| class ReadFromJdbc(ExternalTransform): |
| """A PTransform which reads Rows from the specified database via JDBC. |
| |
| This transform delivers Rows defined as NamedTuple registered in |
| the coders registry, e.g.:: |
| |
| ExampleRow = typing.NamedTuple('ExampleRow', |
| [('id', int), ('name', unicode)]) |
| coders.registry.register_coder(ExampleRow, coders.RowCoder) |
| |
| with TestPipeline() as p: |
| result = ( |
| p |
| | 'Read from jdbc' >> ReadFromJdbc( |
| table_name='jdbc_external_test_read' |
| driver_class_name='org.postgresql.Driver', |
| jdbc_url='jdbc:postgresql://localhost:5432/example', |
| username='postgres', |
| password='postgres', |
| )) |
| |
| table_name is a required paramater, and by default, the read_query is |
| generated from it. |
| |
| The generated read_query can be overridden by passing in a read_query. |
| |
| Experimental; no backwards compatibility guarantees. |
| """ |
| |
| URN = 'beam:transform:org.apache.beam:schemaio_jdbc_read:v1' |
| |
| def __init__( |
| self, |
| table_name, |
| driver_class_name, |
| jdbc_url, |
| username, |
| password, |
| query=None, |
| disable_autocommit=None, |
| output_parallelization=None, |
| fetch_size=None, |
| partition_column=None, |
| partitions=None, |
| connection_properties=None, |
| connection_init_sqls=None, |
| max_connections=None, |
| driver_jars=None, |
| expansion_service=None, |
| classpath=None, |
| schema=None): |
| """ |
| Initializes a read operation from Jdbc. |
| |
| :param driver_class_name: name of the jdbc driver class |
| :param jdbc_url: full jdbc url to the database. |
| :param username: database username |
| :param password: database password |
| :param query: sql query to be executed |
| :param disable_autocommit: disable autocommit on read |
| :param output_parallelization: is output parallelization on |
| :param fetch_size: how many rows to fetch |
| :param partition_column: enable partitioned reads by splitting on this |
| column |
| :param partitions: override the default number of splits when using |
| partition_column |
| :param connection_properties: properties of the jdbc connection |
| passed as string with format |
| [propertyName=property;]* |
| :param connection_init_sqls: required only for MySql and MariaDB. |
| passed as list of strings |
| :param max_connections: sets the maximum total number of connections. |
| use a negative value for no limit. |
| :param driver_jars: comma separated paths for JDBC drivers. if not |
| specified, the default classloader is used to load the |
| driver jars. |
| :param expansion_service: The address (host:port) of the ExpansionService. |
| :param classpath: A list of JARs or Java packages to include in the |
| classpath for the expansion service. This option is |
| usually needed for `jdbc` to include extra JDBC driver |
| packages. |
| The packages can be in these three formats: (1) A local |
| file, (2) A URL, (3) A gradle-style identifier of a Maven |
| package (e.g. "org.postgresql:postgresql:42.3.1"). |
| By default, this argument includes a Postgres SQL JDBC |
| driver. |
| :param schema: Optional custom schema for the returned rows. If provided, |
| this should be a NamedTuple type that defines the structure |
| of the output PCollection elements. This bypasses automatic |
| schema inference during pipeline construction. |
| """ |
| classpath = classpath or DEFAULT_JDBC_CLASSPATH |
| |
| dataSchema = None |
| if schema is not None: |
| with enforce_millis_instant_for_timestamp(): |
| # Convert Python schema to Beam Schema proto |
| schema_proto = typing_to_runner_api(schema).row_type.schema |
| # Serialize the proto to bytes for transmission |
| dataSchema = schema_proto.SerializeToString() |
| |
| super().__init__( |
| self.URN, |
| NamedTupleBasedPayloadBuilder( |
| JdbcConfigSchema( |
| location=table_name, |
| config=RowCoder( |
| typing_to_runner_api(Config).row_type.schema).encode( |
| Config( |
| driver_class_name=driver_class_name, |
| jdbc_url=jdbc_url, |
| username=username, |
| password=password, |
| connection_properties=connection_properties, |
| connection_init_sqls=connection_init_sqls, |
| write_statement=None, |
| write_batch_size=None, |
| read_query=query, |
| fetch_size=fetch_size, |
| disable_autocommit=disable_autocommit, |
| output_parallelization=output_parallelization, |
| autosharding=None, |
| max_connections=max_connections, |
| driver_jars=driver_jars, |
| partition_column=partition_column, |
| partitions=partitions)), |
| dataSchema=dataSchema), |
| ), |
| expansion_service or default_io_expansion_service(classpath), |
| ) |