blob: d71faa6d8477b3fce5bfc5be08fb8e2d2f9889b3 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
# pylint: disable=line-too-long
def enrichment_with_bigtable():
# [START enrichment_with_bigtable]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
project_id = 'apache-beam-testing'
instance_id = 'beam-test'
table_id = 'bigtable-enrichment-test'
row_key = 'product_id'
data = [
beam.Row(sale_id=1, customer_id=1, product_id=1, quantity=1),
beam.Row(sale_id=3, customer_id=3, product_id=2, quantity=3),
beam.Row(sale_id=5, customer_id=5, product_id=4, quantity=2)
]
bigtable_handler = BigTableEnrichmentHandler(
project_id=project_id,
instance_id=instance_id,
table_id=table_id,
row_key=row_key)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ BigTable" >> Enrichment(bigtable_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_bigtable]
def enrichment_with_vertex_ai():
# [START enrichment_with_vertex_ai]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
import VertexAIFeatureStoreEnrichmentHandler
project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
beam.Row(user_id='2963', product_id=14235, sale_price=15.0),
beam.Row(user_id='21422', product_id=11203, sale_price=12.0),
beam.Row(user_id='20592', product_id=8579, sale_price=9.0),
]
vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(
project=project_id,
location=location,
api_endpoint=api_endpoint,
feature_store_name="vertexai_enrichment_example",
feature_view_name="users",
row_key="user_id",
)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_vertex_ai]
def enrichment_with_vertex_ai_legacy():
# [START enrichment_with_vertex_ai_legacy]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
import VertexAIFeatureStoreLegacyEnrichmentHandler
project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
beam.Row(entity_id="movie_01", title='The Shawshank Redemption'),
beam.Row(entity_id="movie_02", title="The Shining"),
beam.Row(entity_id="movie_04", title='The Dark Knight'),
]
vertex_ai_handler = VertexAIFeatureStoreLegacyEnrichmentHandler(
project=project_id,
location=location,
api_endpoint=api_endpoint,
entity_type_id='movies',
feature_store_id="movie_prediction_unique",
feature_ids=["title", "genres"],
row_key="entity_id",
)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_vertex_ai_legacy]
def enrichment_with_google_cloudsql_pg():
# [START enrichment_with_google_cloudsql_pg]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
CloudSQLConnectionConfig)
import os
database_adapter = DatabaseTypeAdapter.POSTGRESQL
database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI")
database_user = os.environ.get("GOOGLE_CLOUD_SQL_DB_USER")
database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD")
database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID")
table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]
data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]
connection_config = CloudSQLConnectionConfig(
db_adapter=database_adapter,
instance_connection_uri=database_uri,
user=database_user,
password=database_password,
db_id=database_id)
query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)
cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
|
"Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_google_cloudsql_pg]
def enrichment_with_external_pg():
# [START enrichment_with_external_pg]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
ExternalSQLDBConnectionConfig)
import os
database_adapter = DatabaseTypeAdapter.POSTGRESQL
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]
data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]
connection_config = ExternalSQLDBConnectionConfig(
db_adapter=database_adapter,
host=database_host,
port=database_port,
user=database_user,
password=database_password,
db_id=database_id)
query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)
cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_pg]
def enrichment_with_external_mysql():
# [START enrichment_with_external_mysql]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
ExternalSQLDBConnectionConfig)
import os
database_adapter = DatabaseTypeAdapter.MYSQL
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]
data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]
connection_config = ExternalSQLDBConnectionConfig(
db_adapter=database_adapter,
host=database_host,
port=database_port,
user=database_user,
password=database_password,
db_id=database_id)
query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)
cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_mysql]
def enrichment_with_external_sqlserver():
# [START enrichment_with_external_sqlserver]
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.cloudsql import (
CloudSQLEnrichmentHandler,
DatabaseTypeAdapter,
TableFieldsQueryConfig,
ExternalSQLDBConnectionConfig)
import os
database_adapter = DatabaseTypeAdapter.SQLSERVER
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
where_clause_template = "product_id = :pid"
where_clause_fields = ["product_id"]
data = [
beam.Row(product_id=1, name='A'),
beam.Row(product_id=2, name='B'),
beam.Row(product_id=3, name='C'),
]
connection_config = ExternalSQLDBConnectionConfig(
db_adapter=database_adapter,
host=database_host,
port=database_port,
user=database_user,
password=database_password,
db_id=database_id)
query_config = TableFieldsQueryConfig(
table_id=table_id,
where_clause_template=where_clause_template,
where_clause_fields=where_clause_fields)
cloudsql_handler = CloudSQLEnrichmentHandler(
connection_config=connection_config,
table_id=table_id,
query_config=query_config)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
| "Print" >> beam.Map(print))
# [END enrichment_with_external_sqlserver]