blob: a0e46c1b4d8879ff7f0bc42b8668b99a870b9c62 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Google Pub/Sub Lite sources and sinks.
This API is currently under development and is subject to change.
"""
# pytype: skip-file
import typing
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
_ReadSchema = typing.NamedTuple(
'_ReadSchema', [('subscription_path', str), ('deduplicate', bool)])
def _default_io_expansion_service():
return BeamJarExpansionService(
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
class _ReadExternal(ExternalTransform):
"""
An external PTransform which reads from Pub/Sub Lite and returns a
SequencedMessage as serialized bytes.
This transform is not part of the public API.
Experimental; no backwards-compatibility guarantees.
"""
def __init__(
self,
subscription_path,
deduplicate=None,
expansion_service=None,
):
"""
Initializes a read operation from Pub/Sub Lite, returning the serialized
bytes of SequencedMessage protos.
Args:
subscription_path: A Pub/Sub Lite Subscription path.
deduplicate: Whether to deduplicate messages based on the value of
the 'x-goog-pubsublite-dataflow-uuid' attribute.
"""
if deduplicate is None:
deduplicate = False
if expansion_service is None:
expansion_service = _default_io_expansion_service()
super().__init__(
'beam:transform:org.apache.beam:pubsublite_read:v1',
NamedTupleBasedPayloadBuilder(
_ReadSchema(
subscription_path=subscription_path, deduplicate=deduplicate)),
expansion_service)
_WriteSchema = typing.NamedTuple(
'_WriteSchema', [('topic_path', str), ('add_uuids', bool)])
class _WriteExternal(ExternalTransform):
"""
An external PTransform which writes serialized PubSubMessage protos to
Pub/Sub Lite.
This transform is not part of the public API.
Experimental; no backwards-compatibility guarantees.
"""
def __init__(
self,
topic_path,
add_uuids=None,
expansion_service=None,
):
"""
Initializes a write operation to Pub/Sub Lite, writing the serialized bytes
of PubSubMessage protos.
Args:
topic_path: A Pub/Sub Lite Topic path.
add_uuids: Whether to add uuids to the 'x-goog-pubsublite-dataflow-uuid'
uuid attribute.
"""
if add_uuids is None:
add_uuids = False
if expansion_service is None:
expansion_service = _default_io_expansion_service()
super().__init__(
'beam:transform:org.apache.beam:pubsublite_write:v1',
NamedTupleBasedPayloadBuilder(
_WriteSchema(
topic_path=topic_path,
add_uuids=add_uuids,
)),
expansion_service)