blob: f207939753c8eb328e929facab5346cd55a20bb9 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from enum import Enum
from typing import Dict, List, Union
from pyflink.common import typeinfo, Duration, ConfigOption, ExecutionConfig
from pyflink.common.serialization import DeserializationSchema, Encoder, SerializationSchema
from pyflink.common.typeinfo import RowTypeInfo, TypeInformation
from pyflink.datastream.functions import SourceFunction, SinkFunction, JavaFunctionWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray
from py4j.java_gateway import JavaObject
__all__ = [
'FileEnumeratorProvider',
'FileSink',
'FileSource',
'FileSourceBuilder',
'FileSplitAssignerProvider',
'FlinkKafkaConsumer',
'FlinkKafkaProducer',
'JdbcSink',
'JdbcConnectionOptions',
'JdbcExecutionOptions',
'NumberSequenceSource',
'OutputFileConfig',
'PulsarDeserializationSchema',
'PulsarSource',
'PulsarSourceBuilder',
'RMQConnectionConfig',
'RMQSource',
'RMQSink',
'RollingPolicy',
'Sink',
'Source',
'StartCursor',
'StopCursor',
'StreamFormat',
'StreamingFileSink',
'SubscriptionType']
class FlinkKafkaConsumerBase(SourceFunction, abc.ABC):
"""
Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
all kafka versions.
The Kafka version specific behavior is defined mainly in the specific subclasses.
"""
def __init__(self, j_flink_kafka_consumer):
super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer)
def set_commit_offsets_on_checkpoints(self,
commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase':
"""
Specifies whether or not the consumer should commit offsets back to kafka on checkpoints.
This setting will only have effect if checkpointing is enabled for the job. If checkpointing
isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
property settings will be used.
"""
self._j_function = self._j_function \
.setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
return self
def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consumer to start reading from the earliest offset for all partitions. This
lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function.setStartFromEarliest()
return self
def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consuer to start reading from the latest offset for all partitions. This lets
the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function.setStartFromLatest()
return self
def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consumer to start reading partitions from a specified timestamp. The specified
timestamp must be before the current timestamp. This lets the consumer ignore any committed
group offsets in Zookeeper / Kafka brokers.
The consumer will look up the earliest offset whose timestamp is greater than or equal to
the specific timestamp from Kafka. If there's no such offset, the consumer will use the
latest offset to read data from Kafka.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for
epoch.
"""
self._j_function = self._j_function.setStartFromTimestamp(
startup_offsets_timestamp)
return self
def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consumer to start reading from any committed group offsets found in Zookeeper/
Kafka brokers. The 'group.id' property must be set in the configuration properties. If no
offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the
configuration properties will be used for the partition.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function.setStartFromGroupOffsets()
return self
def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase':
"""
By default, when restoring from a checkpoint / savepoint, the consumer always ignores
restored partitions that are no longer associated with the current specified topics or topic
pattern to subscribe to.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function \
.disableFilterRestoredPartitionsWithSubscribedTopics()
return self
def get_produced_type(self) -> TypeInformation:
return typeinfo._from_java_type(self._j_function.getProducedType())
class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
"""
The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Apache Kafka. The consumer can run in multiple parallel instances, each of which will
pull data from one or more Kafka partitions.
The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
during a failure, and that the computation processes elements 'exactly once. (These guarantees
naturally assume that Kafka itself does not lose any data.)
Please note that Flink snapshots the offsets internally as part of its distributed checkpoints.
The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in
sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of
how far the Flink Kafka consumer has consumed a topic.
Please refer to Kafka's documentation for the available configuration properties:
http://kafka.apache.org/documentation.html#newconsumerconfigs
"""
def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema,
properties: Dict):
"""
Creates a new Kafka streaming source consumer for Kafka 0.10.x.
This constructor allows passing multiple topics to the consumer.
:param topics: The Kafka topics to read from.
:param deserialization_schema: The de-/serializer used to convert between Kafka's byte
messages and Flink's objects.
:param properties: The properties that are used to configure both the fetcher and the offset
handler.
"""
JFlinkKafkaConsumer = get_gateway().jvm \
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
JFlinkKafkaConsumer)
super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
class FlinkKafkaProducerBase(SinkFunction, abc.ABC):
"""
Flink Sink to produce data into a Kafka topic.
Please note that this producer provides at-least-once reliability guarantees when checkpoints
are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any
reliability guarantees.
"""
def __init__(self, j_flink_kafka_producer):
super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer)
def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase':
"""
Defines whether the producer should fail on errors, or only log them. If this is set to
true, then exceptions will be only logged, if set to false, exceptions will be eventually
thrown and cause the streaming program to fail (and enter recovery).
:param log_failures_only: The flag to indicate logging-only on exceptions.
"""
self._j_function.setLogFailuresOnly(log_failures_only)
return self
def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase':
"""
If set to true, the Flink producer will wait for all outstanding messages in the Kafka
buffers to be acknowledged by the Kafka producer on a checkpoint.
This way, the producer can guarantee that messages in the Kafka buffers are part of the
checkpoint.
:param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint)
"""
self._j_function.setFlushOnCheckpoint(flush_on_checkpoint)
return self
def set_write_timestamp_to_kafka(self,
write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase':
"""
If set to true, Flink will write the (event time) timestamp attached to each record into
Kafka. Timestamps must be positive for Kafka to accept them.
:param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written
to Kafka.
"""
self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka)
return self
class Semantic(Enum):
"""
Semantics that can be chosen.
:data: `EXACTLY_ONCE`:
The Flink producer will write all messages in a Kafka transaction that will be committed to
the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of
FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is
being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint
complete notifications are running late, FlinkKafkaProducer can run out of
FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot-
State() requests will fail and the FlinkKafkaProducer will keep using the
FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints
there are four options:
1. decrease number of max concurrent checkpoints
2. make checkpoints mre reliable (so that they complete faster)
3. increase delay between checkpoints
4. increase size of FlinkKafkaProducers pool
:data: `AT_LEAST_ONCE`:
The Flink producer will wait for all outstanding messages in the Kafka buffers to be
acknowledged by the Kafka producer on a checkpoint.
:data: `NONE`:
Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of
failure.
"""
EXACTLY_ONCE = 0,
AT_LEAST_ONCE = 1,
NONE = 2
def _to_j_semantic(self):
JSemantic = get_gateway().jvm \
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
return getattr(JSemantic, self.name)
class FlinkKafkaProducer(FlinkKafkaProducerBase):
"""
Flink Sink to produce data into a Kafka topic. By
default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to
Flink's Kafka connector documentation.
"""
def __init__(self, topic: str, serialization_schema: SerializationSchema,
producer_config: Dict, kafka_producer_pool_size: int = 5,
semantic=Semantic.AT_LEAST_ONCE):
"""
Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner.
This default partitioner maps each sink subtask to a single Kafka partition (i.e. all
records received by a sink subtask will end up in the same Kafka partition).
:param topic: ID of the Kafka topic.
:param serialization_schema: User defined key-less serialization schema.
:param producer_config: Properties with the producer configuration.
"""
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in producer_config.items():
j_properties.setProperty(key, value)
JFlinkKafkaProducer = gateway.jvm \
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
j_flink_kafka_producer = JFlinkKafkaProducer(
topic, serialization_schema._j_serialization_schema, j_properties, None,
semantic._to_j_semantic(), kafka_producer_pool_size)
super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer)
def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer':
"""
Disables the propagation of exceptions thrown when committing presumably timed out Kafka
transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions
will still be logged to inform the user that data loss might have occurred.
Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover,
only exceptions thrown during the recovery are caught, i.e., the producer will attempt at
least one commit of the transaction before giving up.
:return: This FlinkKafkaProducer.
"""
self._j_function.ignoreFailuresAfterTransactionTimeout()
return self
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
if not isinstance(topics, list):
topics = [topics]
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_flink_kafka_consumer = j_consumer_clz(topics,
deserialization_schema._j_deserialization_schema,
j_properties)
return j_flink_kafka_consumer
class JdbcSink(SinkFunction):
def __init__(self, j_jdbc_sink):
super(JdbcSink, self).__init__(sink_func=j_jdbc_sink)
@staticmethod
def sink(sql: str, type_info: RowTypeInfo, jdbc_connection_options: 'JdbcConnectionOptions',
jdbc_execution_options: 'JdbcExecutionOptions' = None):
"""
Create a JDBC sink.
:param sql: arbitrary DML query (e.g. insert, update, upsert)
:param type_info: A RowTypeInfo for query field types.
:param jdbc_execution_options: parameters of execution, such as batch size and maximum
retries.
:param jdbc_connection_options: parameters of connection, such as JDBC URL.
:return: A JdbcSink.
"""
sql_types = []
gateway = get_gateway()
JJdbcTypeUtil = gateway.jvm.org.apache.flink.connector.jdbc.utils.JdbcTypeUtil
for field_type in type_info.get_field_types():
sql_types.append(JJdbcTypeUtil
.typeInformationToSqlType(field_type.get_java_type_info()))
j_sql_type = to_jarray(gateway.jvm.int, sql_types)
output_format_clz = gateway.jvm.Class\
.forName('org.apache.flink.connector.jdbc.internal.JdbcOutputFormat', False,
get_gateway().jvm.Thread.currentThread().getContextClassLoader())
j_int_array_type = to_jarray(gateway.jvm.int, []).getClass()
j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
to_jarray(gateway.jvm.Class,
[j_int_array_type]))
j_builder_method.setAccessible(True)
j_statement_builder = j_builder_method.invoke(None, to_jarray(gateway.jvm.Object,
[j_sql_type]))
jdbc_execution_options = jdbc_execution_options if jdbc_execution_options is not None \
else JdbcExecutionOptions.defaults()
j_jdbc_sink = gateway.jvm.org.apache.flink.connector.jdbc.JdbcSink\
.sink(sql, j_statement_builder, jdbc_execution_options._j_jdbc_execution_options,
jdbc_connection_options._j_jdbc_connection_options)
return JdbcSink(j_jdbc_sink=j_jdbc_sink)
class JdbcConnectionOptions(object):
"""
JDBC connection options.
"""
def __init__(self, j_jdbc_connection_options):
self._j_jdbc_connection_options = j_jdbc_connection_options
def get_db_url(self) -> str:
return self._j_jdbc_connection_options.getDbURL()
def get_driver_name(self) -> str:
return self._j_jdbc_connection_options.getDriverName()
def get_user_name(self) -> str:
return self._j_jdbc_connection_options.getUsername()
def get_password(self) -> str:
return self._j_jdbc_connection_options.getPassword()
class JdbcConnectionOptionsBuilder(object):
"""
Builder for JdbcConnectionOptions.
"""
def __init__(self):
self._j_options_builder = get_gateway().jvm.org.apache.flink.connector\
.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
def with_url(self, url: str) -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withUrl(url)
return self
def with_driver_name(self, driver_name: str) \
-> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withDriverName(driver_name)
return self
def with_user_name(self, user_name: str) \
-> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withUsername(user_name)
return self
def with_password(self, password: str) \
-> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withPassword(password)
return self
def build(self) -> 'JdbcConnectionOptions':
return JdbcConnectionOptions(j_jdbc_connection_options=self._j_options_builder.build())
class JdbcExecutionOptions(object):
"""
JDBC sink batch options.
"""
def __init__(self, j_jdbc_execution_options):
self._j_jdbc_execution_options = j_jdbc_execution_options
def get_batch_interval_ms(self) -> int:
return self._j_jdbc_execution_options.getBatchIntervalMs()
def get_batch_size(self) -> int:
return self._j_jdbc_execution_options.getBatchSize()
def get_max_retries(self) -> int:
return self._j_jdbc_execution_options.getMaxRetries()
@staticmethod
def defaults() -> 'JdbcExecutionOptions':
return JdbcExecutionOptions(
j_jdbc_execution_options=get_gateway().jvm
.org.apache.flink.connector.jdbc.JdbcExecutionOptions.defaults())
@staticmethod
def builder() -> 'Builder':
return JdbcExecutionOptions.Builder()
class Builder(object):
"""
Builder for JdbcExecutionOptions.
"""
def __init__(self):
self._j_builder = get_gateway().jvm\
.org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder()
def with_batch_size(self, size: int) -> 'JdbcExecutionOptions.Builder':
self._j_builder.withBatchSize(size)
return self
def with_batch_interval_ms(self, interval_ms: int) -> 'JdbcExecutionOptions.Builder':
self._j_builder.withBatchIntervalMs(interval_ms)
return self
def with_max_retries(self, max_retries: int) -> 'JdbcExecutionOptions.Builder':
self._j_builder.withMaxRetries(max_retries)
return self
def build(self) -> 'JdbcExecutionOptions':
return JdbcExecutionOptions(j_jdbc_execution_options=self._j_builder.build())
class RollingPolicy(object):
"""
The policy based on which a Bucket in the FileSink rolls its currently
open part file and opens a new one.
"""
def __init__(self, j_rolling_policy):
self._j_rolling_policy = j_rolling_policy
@staticmethod
def default_rolling_policy(
part_size: int = 1024 * 1024 * 128,
rollover_interval: int = 60 * 1000,
inactivity_interval: int = 60 * 1000) -> 'RollingPolicy':
"""
Returns the default implementation of the RollingPolicy.
This policy rolls a part file if:
- there is no open part file,
- the current file has reached the maximum bucket size (by default 128MB),
- the current file is older than the roll over interval (by default 60 sec), or
- the current file has not been written to for more than the allowed inactivityTime (by
default 60 sec).
:param part_size: The maximum part file size before rolling.
:param rollover_interval: The maximum time duration a part file can stay open before
rolling.
:param inactivity_interval: The time duration of allowed inactivity after which a part file
will have to roll.
"""
JDefaultRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions.\
sink.filesystem.rollingpolicies.DefaultRollingPolicy
j_rolling_policy = JDefaultRollingPolicy.builder()\
.withMaxPartSize(part_size) \
.withRolloverInterval(rollover_interval) \
.withInactivityInterval(inactivity_interval) \
.build()
return RollingPolicy(j_rolling_policy)
@staticmethod
def on_checkpoint_rolling_policy() -> 'RollingPolicy':
"""
Returns a RollingPolicy which rolls (ONLY) on every checkpoint.
"""
JOnCheckpointRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions. \
sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy
return RollingPolicy(JOnCheckpointRollingPolicy.build())
class BucketAssigner(object):
"""
A BucketAssigner is used with a file sink to determine the bucket each incoming element should
be put into.
The StreamingFileSink can be writing to many buckets at a time, and it is responsible
for managing a set of active buckets. Whenever a new element arrives it will ask the
BucketAssigner for the bucket the element should fall in. The BucketAssigner can, for
example, determine buckets based on system time.
"""
def __init__(self, j_bucket_assigner):
self._j_bucket_assigner = j_bucket_assigner
@staticmethod
def base_path_bucket_assigner() -> 'BucketAssigner':
"""
Creates a BucketAssigner that does not perform any bucketing of files. All files are
written to the base path.
"""
return BucketAssigner(get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.
filesystem.bucketassigners.BasePathBucketAssigner())
@staticmethod
def date_time_bucket_assigner(format_str: str = "yyyy-MM-dd--HH", timezone_id: str = None):
"""
Creates a BucketAssigner that assigns to buckets based on current system time.
It will create directories of the following form: /{basePath}/{dateTimePath}/}.
The basePath is the path that was specified as a base path when creating the new bucket.
The dateTimePath is determined based on the current system time and the user provided format
string.
The Java DateTimeFormatter is used to derive a date string from the current system time and
the date format string. The default format string is "yyyy-MM-dd--HH" so the rolling files
will have a granularity of hours.
:param format_str: The format string used to determine the bucket id.
:param timezone_id: The timezone id, either an abbreviation such as "PST", a full name
such as "America/Los_Angeles", or a custom timezone_id such as
"GMT-08:00". Th e default time zone will b used if it's None.
"""
if timezone_id is not None and isinstance(timezone_id, str):
j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id)
else:
j_timezone = get_gateway().jvm.java.time.ZoneId.systemDefault()
return BucketAssigner(
get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.
filesystem.bucketassigners.DateTimeBucketAssigner(format_str, j_timezone))
class StreamingFileSink(SinkFunction):
"""
Sink that emits its input elements to `FileSystem` files within buckets. This is
integrated with the checkpointing mechanism to provide exactly once semantics.
When creating the sink a `basePath` must be specified. The base directory contains
one directory for every bucket. The bucket directories themselves contain several part files,
with at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
"""
def __init__(self, j_obj):
super(StreamingFileSink, self).__init__(j_obj)
class DefaultRowFormatBuilder(object):
"""
Builder for the vanilla `StreamingFileSink` using a row format.
"""
def __init__(self, j_default_row_format_builder):
self.j_default_row_format_builder = j_default_row_format_builder
def with_bucket_check_interval(
self, interval: int) -> 'StreamingFileSink.DefaultRowFormatBuilder':
self.j_default_row_format_builder.withBucketCheckInterval(interval)
return self
def with_bucket_assigner(
self,
bucket_assigner: BucketAssigner) -> 'StreamingFileSink.DefaultRowFormatBuilder':
self.j_default_row_format_builder.withBucketAssigner(bucket_assigner._j_bucket_assigner)
return self
def with_rolling_policy(
self,
policy: RollingPolicy) -> 'StreamingFileSink.DefaultRowFormatBuilder':
self.j_default_row_format_builder.withRollingPolicy(policy._j_rolling_policy)
return self
def with_output_file_config(
self,
output_file_config: 'OutputFileConfig') \
-> 'StreamingFileSink.DefaultRowFormatBuilder':
self.j_default_row_format_builder.withOutputFileConfig(
output_file_config._j_output_file_config)
return self
def build(self) -> 'StreamingFileSink':
j_stream_file_sink = self.j_default_row_format_builder.build()
return StreamingFileSink(j_stream_file_sink)
@staticmethod
def for_row_format(base_path: str, encoder: Encoder) -> 'DefaultRowFormatBuilder':
j_path = get_gateway().jvm.org.apache.flink.core.fs.Path(base_path)
j_default_row_format_builder = get_gateway().jvm.org.apache.flink.streaming.api.\
functions.sink.filesystem.StreamingFileSink.forRowFormat(j_path, encoder._j_encoder)
return StreamingFileSink.DefaultRowFormatBuilder(j_default_row_format_builder)
class OutputFileConfig(object):
"""
Part file name configuration.
This allow to define a prefix and a suffix to the part file name.
"""
@staticmethod
def builder():
return OutputFileConfig.OutputFileConfigBuilder()
def __init__(self, part_prefix: str, part_suffix: str):
self._j_output_file_config = get_gateway().jvm.org.apache.flink.streaming.api.\
functions.sink.filesystem.OutputFileConfig(part_prefix, part_suffix)
def get_part_prefix(self) -> str:
"""
The prefix for the part name.
"""
return self._j_output_file_config.getPartPrefix()
def get_part_suffix(self) -> str:
"""
The suffix for the part name.
"""
return self._j_output_file_config.getPartSuffix()
class OutputFileConfigBuilder(object):
"""
A builder to create the part file configuration.
"""
def __init__(self):
self.part_prefix = "part"
self.part_suffix = ""
def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder':
self.part_prefix = prefix
return self
def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder':
self.part_suffix = suffix
return self
def build(self) -> 'OutputFileConfig':
return OutputFileConfig(self.part_prefix, self.part_suffix)
class Source(JavaFunctionWrapper):
"""
Base class for all unified data source in Flink.
"""
def __init__(self, source: Union[str, JavaObject]):
"""
Constructor of Source.
:param source: The java Source object.
"""
super(Source, self).__init__(source)
class Sink(JavaFunctionWrapper):
"""
Base class for all unified data sink in Flink.
"""
def __init__(self, sink: Union[str, JavaObject]):
"""
Constructor of Sink.
:param sink: The java Sink object.
"""
super(Sink, self).__init__(sink)
class StreamFormat(object):
"""
A reader format that reads individual records from a stream.
Compared to the :class:`~pyflink.datastream.connectors.FileSource.BulkFormat`, the stream
format handles a few things out-of-the-box, like deciding how to batch records or dealing
with compression.
Internally in the file source, the readers pass batches of records from the reading threads
(that perform the typically blocking I/O operations) to the async mailbox threads that do
the streaming and batch data processing. Passing records in batches
(rather than one-at-a-time) much reduces the thread-to-thread handover overhead.
This batching is by default based on I/O fetch size for the StreamFormat, meaning the
set of records derived from one I/O buffer will be handed over as one. See config option
`source.file.stream.io-fetch-size` to configure that fetch size.
"""
def __init__(self, j_stream_format):
self._j_stream_format = j_stream_format
@staticmethod
def text_line_format(charset_name: str = "UTF-8") -> 'StreamFormat':
"""
Creates a reader format that text lines from a file.
The reader uses Java's built-in java.io.InputStreamReader to decode the byte stream
using various supported charset encodings.
This format does not support optimized recovery from checkpoints. On recovery, it will
re-read and discard the number of lined that were processed before the last checkpoint.
That is due to the fact that the offsets of lines in the file cannot be tracked through
the charset decoders with their internal buffering of stream input and charset decoder
state.
:param charset_name: The charset to decode the byte stream.
"""
j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \
TextLineInputFormat(charset_name)
return StreamFormat(j_stream_format)
class FileEnumeratorProvider(object):
"""
Factory for FileEnumerator which task is to discover all files to be read and to split them
into a set of file source splits. This includes possibly, path traversals, file filtering
(by name or other patterns) and deciding whether to split files into multiple splits, and
how to split them.
"""
def __init__(self, j_file_enumerator_provider):
self._j_file_enumerator_provider = j_file_enumerator_provider
@staticmethod
def default_splittable_file_enumerator() -> 'FileEnumeratorProvider':
"""
The default file enumerator used for splittable formats. The enumerator recursively
enumerates files, split files that consist of multiple distributed storage blocks into
multiple splits, and filters hidden files (files starting with '.' or '_'). Files with
suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...)
will not be split.
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileEnumeratorProvider(JFileSource.DEFAULT_SPLITTABLE_FILE_ENUMERATOR)
@staticmethod
def default_non_splittable_file_enumerator() -> 'FileEnumeratorProvider':
"""
The default file enumerator used for non-splittable formats. The enumerator recursively
enumerates files, creates one split for the file, and filters hidden files
(files starting with '.' or '_').
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileEnumeratorProvider(JFileSource.DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR)
class FileSplitAssignerProvider(object):
"""
Factory for FileSplitAssigner which is responsible for deciding what split should be
processed next by which node. It determines split processing order and locality.
"""
def __init__(self, j_file_split_assigner):
self._j_file_split_assigner = j_file_split_assigner
@staticmethod
def locality_aware_split_assigner() -> 'FileSplitAssignerProvider':
"""
A FileSplitAssigner that assigns to each host preferably splits that are local, before
assigning splits that are not local.
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileSplitAssignerProvider(JFileSource.DEFAULT_SPLIT_ASSIGNER)
class FileSourceBuilder(object):
"""
The builder for the :class:`~pyflink.datastream.connectors.FileSource`, to configure the
various behaviors.
Start building the source via one of the following methods:
- :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
"""
def __init__(self, j_file_source_builder):
self._j_file_source_builder = j_file_source_builder
def monitor_continuously(
self,
discovery_interval: Duration) -> 'FileSourceBuilder':
"""
Sets this source to streaming ("continuous monitoring") mode.
This makes the source a "continuous streaming" source that keeps running, monitoring
for new files, and reads these files when they appear and are discovered by the
monitoring.
The interval in which the source checks for new files is the discovery_interval. Shorter
intervals mean that files are discovered more quickly, but also imply more frequent
listing or directory traversal of the file system / object store.
"""
self._j_file_source_builder.monitorContinuously(discovery_interval._j_duration)
return self
def process_static_file_set(self) -> 'FileSourceBuilder':
"""
Sets this source to bounded (batch) mode.
In this mode, the source processes the files that are under the given paths when the
application is started. Once all files are processed, the source will finish.
This setting is also the default behavior. This method is mainly here to "switch back"
to bounded (batch) mode, or to make it explicit in the source construction.
"""
self._j_file_source_builder.processStaticFileSet()
return self
def set_file_enumerator(
self,
file_enumerator: 'FileEnumeratorProvider') -> 'FileSourceBuilder':
"""
Configures the FileEnumerator for the source. The File Enumerator is responsible
for selecting from the input path the set of files that should be processed (and which
to filter out). Furthermore, the File Enumerator may split the files further into
sub-regions, to enable parallelization beyond the number of files.
"""
self._j_file_source_builder.setFileEnumerator(
file_enumerator._j_file_enumerator_provider)
return self
def set_split_assigner(
self,
split_assigner: 'FileSplitAssignerProvider') -> 'FileSourceBuilder':
"""
Configures the FileSplitAssigner for the source. The File Split Assigner
determines which parallel reader instance gets which {@link FileSourceSplit}, and in
which order these splits are assigned.
"""
self._j_file_source_builder.setSplitAssigner(split_assigner._j_file_split_assigner)
return self
def build(self) -> 'FileSource':
"""
Creates the file source with the settings applied to this builder.
"""
return FileSource(self._j_file_source_builder.build())
class FileSource(Source):
"""
A unified data source that reads files - both in batch and in streaming mode.
This source supports all (distributed) file systems and object stores that can be accessed via
the Flink's FileSystem class.
Start building a file source via one of the following calls:
- :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
This creates a :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` on which
you can configure all the properties of the file source.
<h2>Batch and Streaming</h2>
This source supports both bounded/batch and continuous/streaming data inputs. For the
bounded/batch case, the file source processes all files under the given path(s). In the
continuous/streaming case, the source periodically checks the paths for new files and will start
reading those.
When you start creating a file source (via the
:class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` created
through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call
:func:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder.monitor_continuously` to put
the source into continuous streaming mode.
<h2>Format Types</h2>
The reading of each file happens through file readers defined by <i>file formats</i>. These
define the parsing logic for the contents of the file. There are multiple classes that the
source supports. Their interfaces trade of simplicity of implementation and
flexibility/efficiency.
- A :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` reads the contents of
a file from a file stream. It is the simplest format to implement, and provides many
features out-of-the-box (like checkpointing logic) but is limited in the optimizations it
can apply (such as object reuse, batching, etc.).
<h2>Discovering / Enumerating Files</h2>
The way that the source lists the files to be processes is defined by the
:class:`~pyflink.datastream.connectors.FileSource.FileEnumeratorProvider`. The
FileEnumeratorProvider is responsible to select the relevant files (for example filter out
hidden files) and to optionally splits files into multiple regions (= file source splits) that
can be read in parallel).
"""
def __init__(self, j_file_source):
super(FileSource, self).__init__(source=j_file_source)
@staticmethod
def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder:
"""
Builds a new FileSource using a
:class:`~pyflink.datastream.connectors.FileSource.StreamFormat` to read record-by-record
from a file stream.
When possible, stream-based formats are generally easier (preferable) to file-based
formats, because they support better default behavior around I/O batching or progress
tracking (checkpoints).
Stream formats also automatically de-compress files based on the file extension. This
supports files ending in ".deflate" (Deflate), ".xz" (XZ), ".bz2" (BZip2), ".gz", ".gzip"
(GZip).
"""
JPath = get_gateway().jvm.org.apache.flink.core.fs.Path
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
j_paths = to_jarray(JPath, [JPath(p) for p in paths])
return FileSourceBuilder(
JFileSource.forRecordStreamFormat(stream_format._j_stream_format, j_paths))
class NumberSequenceSource(Source):
"""
A data source that produces a sequence of numbers (longs). This source is useful for testing and
for cases that just need a stream of N events of any kind.
The source splits the sequence into as many parallel sub-sequences as there are parallel
source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
limited to one, this will produce one sequence in order.
This source is always bounded. For very long sequences (for example over the entire domain of
long integer values), user may want to consider executing the application in a streaming manner,
because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
"""
def __init__(self, start, end):
"""
Creates a new NumberSequenceSource that produces parallel sequences covering the
range start to end (both boundaries are inclusive).
"""
JNumberSequenceSource = get_gateway().jvm.org.apache.flink.api.connector.source.lib.\
NumberSequenceSource
j_seq_source = JNumberSequenceSource(start, end)
super(NumberSequenceSource, self).__init__(source=j_seq_source)
class FileSink(Sink):
"""
A unified sink that emits its input elements to FileSystem files within buckets. This
sink achieves exactly-once semantics for both BATCH and STREAMING.
When creating the sink a basePath must be specified. The base directory contains one
directory for every bucket. The bucket directories themselves contain several part files, with
at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
The sink uses a BucketAssigner to determine in which bucket directory each element
should be written to inside the base directory. The BucketAssigner can, for example, roll
on every checkpoint or use time or a property of the element to determine the bucket directory.
The default BucketAssigner is a DateTimeBucketAssigner which will create one new
bucket every hour. You can specify a custom BucketAssigner using the
:func:`~pyflink.datastream.connectors.FileSink.RowFormatBuilder.with_bucket_assigner`,
after calling :class:`~pyflink.datastream.connectors.FileSink.for_row_format`.
The names of the part files could be defined using OutputFileConfig. This
configuration contains a part prefix and a part suffix that will be used with a random uid
assigned to each subtask of the sink and a rolling counter to determine the file names. For
example with a prefix "prefix" and a suffix ".ext", a file named {@code
"prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"} contains the data from subtask with uid
{@code 81fc4980-a6af-41c8-9937-9939408a734b} of the sink and is the {@code 17th} part-file
created by that subtask.
Part files roll based on the user-specified RollingPolicy. By default, a DefaultRollingPolicy
is used for row-encoded sink output; a OnCheckpointRollingPolicy is
used for bulk-encoded sink output.
In some scenarios, the open buckets are required to change based on time. In these cases, the
user can specify a bucket_check_interval (by default 1m) and the sink will check
periodically and roll the part file if the specified rolling policy says so.
Part files can be in one of three states: in-progress, pending or finished. The reason for this
is how the sink works to provide exactly-once semantics and fault-tolerance. The part file that
is currently being written to is in-progress. Once a part file is closed for writing it becomes
pending. When a checkpoint is successful (for STREAMING) or at the end of the job (for BATCH)
the currently pending files will be moved to finished.
For STREAMING in order to guarantee exactly-once semantics in case of a failure, the
sink should roll back to the state it had when that last successful checkpoint occurred. To this
end, when restoring, the restored files in pending state are transferred into the finished state
while any in-progress files are rolled back, so that they do not contain data that arrived after
the checkpoint from which we restore.
"""
def __init__(self, j_file_sink):
super(FileSink, self).__init__(sink=j_file_sink)
class RowFormatBuilder(object):
"""
Builder for the vanilla FileSink using a row format.
"""
def __init__(self, j_row_format_builder):
self._j_row_format_builder = j_row_format_builder
def with_bucket_check_interval(self, interval: int):
"""
:param interval: The check interval in milliseconds.
"""
self._j_row_format_builder.withBucketCheckInterval(interval)
return self
def with_bucket_assigner(self, bucket_assigner: BucketAssigner):
self._j_row_format_builder.withBucketAssigner(bucket_assigner._j_bucket_assigner)
return self
def with_rolling_policy(self, rolling_policy: RollingPolicy):
self._j_row_format_builder.withRollingPolicy(rolling_policy._j_rolling_policy)
return self
def with_output_file_config(self, output_file_config: OutputFileConfig):
self._j_row_format_builder.withOutputFileConfig(
output_file_config._j_output_file_config)
return self
def build(self):
return FileSink(self._j_row_format_builder.build())
@staticmethod
def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuilder':
JPath = get_gateway().jvm.org.apache.flink.core.fs.Path
JFileSink = get_gateway().jvm.org.apache.flink.connector.file.sink.FileSink
return FileSink.RowFormatBuilder(
JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
class PulsarDeserializationSchema(object):
"""
A schema bridge for deserializing the pulsar's Message into a flink managed instance. We
support both the pulsar's self managed schema and flink managed schema.
"""
def __init__(self, _j_pulsar_deserialization_schema):
self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema
@staticmethod
def flink_schema(deserialization_schema: DeserializationSchema) \
-> 'PulsarDeserializationSchema':
"""
Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would
consume the pulsar message as byte array and decode the message by using flink's logic.
"""
JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
_j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema(
deserialization_schema._j_deserialization_schema)
return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
@staticmethod
def flink_type_info(type_information: TypeInformation,
execution_config: ExecutionConfig = None) -> 'PulsarDeserializationSchema':
"""
Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
only used for treating message that was written into pulsar by TypeInformation.
"""
JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
JExecutionConfig = get_gateway().jvm.org.apache.flink.api.common.ExecutionConfig
_j_execution_config = execution_config._j_execution_config \
if execution_config is not None else JExecutionConfig()
_j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
type_information.get_java_type_info(), _j_execution_config)
return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
class SubscriptionType(Enum):
"""
Types of subscription supported by Pulsar.
:data: `Exclusive`:
There can be only 1 consumer on the same topic with the same subscription name.
:data: `Shared`:
Multiple consumer will be able to use the same subscription name and the messages will be
dispatched according to a round-robin rotation between the connected consumers. In this mode,
the consumption order is not guaranteed.
:data: `Failover`:
Multiple consumer will be able to use the same subscription name but only 1 consumer will
receive the messages. If that consumer disconnects, one of the other connected consumers will
start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of
partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions
assignments will be split across the available consumers. On each partition, at most one
consumer will be active at a given point in time.
:data: `Key_Shared`:
Multiple consumer will be able to use the same subscription and all messages with the same key
will be dispatched to only one consumer. Use ordering_key to overwrite the message key for
message ordering.
"""
Exclusive = 0,
Shared = 1,
Failover = 2,
Key_Shared = 3
def _to_j_subscription_type(self):
JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
return getattr(JSubscriptionType, self.name)
class StartCursor(object):
"""
A factory class for users to specify the start position of a pulsar subscription.
Since it would be serialized into split.
The implementation for this interface should be well considered.
I don't recommend adding extra internal state for this implementation.
This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover.
"""
def __init__(self, _j_start_cursor):
self._j_start_cursor = _j_start_cursor
@staticmethod
def default_start_cursor() -> 'StartCursor':
return StartCursor.earliest()
@staticmethod
def earliest() -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.earliest())
@staticmethod
def latest() -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.latest())
@staticmethod
def from_message_time(timestamp: int) -> 'StartCursor':
JStartCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
return StartCursor(JStartCursor.fromMessageTime(timestamp))
class StopCursor(object):
"""
A factory class for users to specify the stop position of a pulsar subscription. Since it would
be serialized into split. The implementation for this interface should be well considered. I
don't recommend adding extra internal state for this implementation.
"""
def __init__(self, _j_stop_cursor):
self._j_stop_cursor = _j_stop_cursor
@staticmethod
def default_stop_cursor() -> 'StopCursor':
return StopCursor.never()
@staticmethod
def never() -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.never())
@staticmethod
def latest() -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.latest())
@staticmethod
def at_event_time(timestamp: int) -> 'StopCursor':
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atEventTime(timestamp))
class PulsarSource(Source):
"""
The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a
PulsarSource. The following example shows how to create a PulsarSource emitting records of
String type.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_topics(TOPIC1, TOPIC2) \\
... .set_service_url(get_service_url()) \\
... .set_admin_url(get_admin_url()) \\
... .set_subscription_name("test") \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_bounded_stop_cursor(StopCursor.default_stop_cursor()) \\
... .build()
See PulsarSourceBuilder for more details.
"""
def __init__(self, j_pulsar_source):
super(PulsarSource, self).__init__(source=j_pulsar_source)
@staticmethod
def builder() -> 'PulsarSourceBuilder':
"""
Get a PulsarSourceBuilder to builder a PulsarSource.
"""
return PulsarSourceBuilder()
class PulsarSourceBuilder(object):
"""
The builder class for PulsarSource to make it easier for the users to construct a PulsarSource.
The following example shows the minimum setup to create a PulsarSource that reads the String
values from a Pulsar topic.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .build()
The service url, admin url, subscription name, topics to consume, and the record deserializer
are required fields that must be set.
To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor).
By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop
until the Flink job is canceled or fails. To let the PulsarSource run in
Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call
set_unbounded_stop_cursor(StopCursor).
For example the following PulsarSource stops after it consumes up to a event time when the
Flink started.
Example:
::
>>> source = PulsarSource() \\
... .builder() \\
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_bounded_stop_cursor(StopCursor.at_event_time(int(time.time() * 1000)))
... .build()
"""
def __init__(self):
JPulsarSource = \
get_gateway().jvm.org.apache.flink.connector.pulsar.source.PulsarSource
self._j_pulsar_source_builder = JPulsarSource.builder()
def set_admin_url(self, admin_url: str) -> 'PulsarSourceBuilder':
"""
Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
"""
self._j_pulsar_source_builder.setAdminUrl(admin_url)
return self
def set_service_url(self, service_url: str) -> 'PulsarSourceBuilder':
"""
Sets the server's link for the PulsarConsumer of the PulsarSource.
"""
self._j_pulsar_source_builder.setServiceUrl(service_url)
return self
def set_subscription_name(self, subscription_name: str) -> 'PulsarSourceBuilder':
"""
Sets the name for this pulsar subscription.
"""
self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
return self
def set_subscription_type(self, subscription_type: SubscriptionType) -> 'PulsarSourceBuilder':
"""
SubscriptionType is the consuming behavior for pulsar, we would generator different split
by the given subscription type. Please take some time to consider which subscription type
matches your application best. Default is SubscriptionType.Shared.
"""
self._j_pulsar_source_builder.setSubscriptionType(
subscription_type._to_j_subscription_type())
return self
def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':
"""
Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
non-existed topic wouldn't throw any exception. But the best solution is just consuming by
using a topic regex. You can set topics once either with set_topics or set_topic_pattern in
this builder.
"""
if not isinstance(topics, list):
topics = [topics]
self._j_pulsar_source_builder.setTopics(topics)
return self
def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
set_topics or set_topic_pattern in this builder.
"""
self._j_pulsar_source_builder.setTopicPattern(topics_pattern)
return self
def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
set_topics or set_topic_pattern in this builder.
"""
self._j_pulsar_source_builder.setTopicPattern(topic_pattern)
return self
def set_start_cursor(self, start_cursor: StartCursor) -> 'PulsarSourceBuilder':
"""
Specify from which offsets the PulsarSource should start consume from by providing an
StartCursor.
"""
self._j_pulsar_source_builder.setStartCursor(start_cursor._j_start_cursor)
return self
def set_unbounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder':
"""
By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and
thus never stops until the Flink job fails or is canceled. To let the PulsarSource run as a
streaming source but still stops at some point, one can set an StopCursor to specify the
stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the PulsarSource will then exit.
This method is different from set_bounded_stop_cursor(StopCursor) that after setting the
stopping offsets with this method, PulsarSource.getBoundedness() will still return
Boundedness.CONTINUOUS_UNBOUNDED even though it will stop at the stopping offsets specified
by the stopping offsets StopCursor.
"""
self._j_pulsar_source_builder.setUnboundedStopCursor(stop_cursor._j_stop_cursor)
return self
def set_bounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder':
"""
By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and
thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
Boundedness.BOUNDED manner and stops at some point, one can set an StopCursor to specify
the stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the PulsarSource will then exit.
This method is different from set_unbounded_stop_cursor(StopCursor) that after setting the
stopping offsets with this method, PulsarSource.getBoundedness() will return
Boundedness.BOUNDED instead of Boundedness.CONTINUOUS_UNBOUNDED.
"""
self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor)
return self
def set_deserialization_schema(self,
pulsar_deserialization_schema: PulsarDeserializationSchema) \
-> 'PulsarSourceBuilder':
"""
DeserializationSchema is required for getting the Schema for deserialize message from
pulsar and getting the TypeInformation for message serialization in flink.
We have defined a set of implementations, using PulsarDeserializationSchema#flink_type_info
or PulsarDeserializationSchema#flink_schema for creating the desired schema.
"""
self._j_pulsar_source_builder.setDeserializationSchema(
pulsar_deserialization_schema._j_pulsar_deserialization_schema)
return self
def set_config(self, key: ConfigOption, value) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
Make sure the option could be set only once or with same value.
"""
self._j_pulsar_source_builder.setConfig(key._j_config_option, value)
return self
def set_config_with_dict(self, config: Dict) -> 'PulsarSourceBuilder':
"""
Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be
found in PulsarSourceOptions and PulsarOptions.
"""
JConfiguration = get_gateway().jvm.org.apache.flink.configuration.Configuration
self._j_pulsar_source_builder.setConfig(JConfiguration.fromMap(config))
return self
def build(self) -> 'PulsarSource':
"""
Build the PulsarSource.
"""
return PulsarSource(self._j_pulsar_source_builder.build())
class RMQConnectionConfig(object):
"""
Connection Configuration for RMQ.
"""
def __init__(self, j_rmq_connection_config):
self._j_rmq_connection_config = j_rmq_connection_config
def get_host(self) -> str:
return self._j_rmq_connection_config.getHost()
def get_port(self) -> int:
return self._j_rmq_connection_config.getPort()
def get_virtual_host(self) -> str:
return self._j_rmq_connection_config.getVirtualHost()
def get_user_name(self) -> str:
return self._j_rmq_connection_config.getUsername()
def get_password(self) -> str:
return self._j_rmq_connection_config.getPassword()
def get_uri(self) -> str:
return self._j_rmq_connection_config.getUri()
def get_network_recovery_interval(self) -> int:
return self._j_rmq_connection_config.getNetworkRecoveryInterval()
def is_automatic_recovery(self) -> bool:
return self._j_rmq_connection_config.isAutomaticRecovery()
def is_topology_recovery(self) -> bool:
return self._j_rmq_connection_config.isTopologyRecovery()
def get_connection_timeout(self) -> int:
return self._j_rmq_connection_config.getConnectionTimeout()
def get_requested_channel_max(self) -> int:
return self._j_rmq_connection_config.getRequestedChannelMax()
def get_requested_frame_max(self) -> int:
return self._j_rmq_connection_config.getRequestedFrameMax()
def get_requested_heartbeat(self) -> int:
return self._j_rmq_connection_config.getRequestedHeartbeat()
class Builder(object):
"""
Builder for RMQConnectionConfig.
"""
def __init__(self):
self._j_options_builder = get_gateway().jvm.org.apache.flink.streaming.connectors\
.rabbitmq.common.RMQConnectionConfig.Builder()
def set_port(self, port: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setPort(port)
return self
def set_host(self, host: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setHost(host)
return self
def set_virtual_host(self, vhost: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setVirtualHost(vhost)
return self
def set_user_name(self, user_name: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setUserName(user_name)
return self
def set_password(self, password: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setPassword(password)
return self
def set_uri(self, uri: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setUri(uri)
return self
def set_topology_recovery_enabled(
self, topology_recovery_enabled: bool) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setTopologyRecoveryEnabled(topology_recovery_enabled)
return self
def set_requested_heartbeat(
self, requested_heartbeat: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setRequestedHeartbeat(requested_heartbeat)
return self
def set_requested_frame_max(
self, requested_frame_max: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setRequestedFrameMax(requested_frame_max)
return self
def set_requested_channel_max(
self, requested_channel_max: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setRequestedChannelMax(requested_channel_max)
return self
def set_network_recovery_interval(
self, network_recovery_interval: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setNetworkRecoveryInterval(network_recovery_interval)
return self
def set_connection_timeout(self, connection_timeout: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setConnectionTimeout(connection_timeout)
return self
def set_automatic_recovery(self, automatic_recovery: bool) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setAutomaticRecovery(automatic_recovery)
return self
def set_prefetch_count(self, prefetch_count: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setPrefetchCount(prefetch_count)
return self
def build(self) -> 'RMQConnectionConfig':
return RMQConnectionConfig(self._j_options_builder.build())
class RMQSource(SourceFunction):
def __init__(self,
connection_config: 'RMQConnectionConfig',
queue_name: str,
use_correlation_id: bool,
deserialization_schema: DeserializationSchema
):
"""
Creates a new RabbitMQ source.
For exactly-once, you must set the correlation ids of messages at the producer.
The correlation id must be unique. Otherwise the behavior of the source is undefined.
If in doubt, set use_correlation_id to False.
When correlation ids are not used, this source has at-least-once processing semantics
when checkpointing is enabled.
:param connection_config: The RabbiMQ connection configuration.
:param queue_name: The queue to receive messages from.
:param use_correlation_id: Whether the messages received are supplied with a unique id
to deduplicate messages (in case of failed acknowledgments).
Only used when checkpointing is enabled.
:param deserialization_schema: A deserializer used to convert between RabbitMQ's
messages and Flink's objects.
"""
JRMQSource = get_gateway().jvm.org.apache.flink.streaming.connectors.rabbitmq.RMQSource
j_rmq_source = JRMQSource(
connection_config._j_rmq_connection_config,
queue_name,
use_correlation_id,
deserialization_schema._j_deserialization_schema
)
super(RMQSource, self).__init__(source_func=j_rmq_source)
class RMQSink(SinkFunction):
def __init__(self, connection_config: 'RMQConnectionConfig',
queue_name: str, serialization_schema: SerializationSchema):
"""
Creates a new RabbitMQ sink.
:param connection_config: The RabbiMQ connection configuration.
:param queue_name: The queue to publish messages to.
:param serialization_schema: A serializer used to convert Flink objects to bytes.
"""
JRMQSink = get_gateway().jvm.org.apache.flink.streaming.connectors.rabbitmq.RMQSink
j_rmq_sink = JRMQSink(
connection_config._j_rmq_connection_config,
queue_name,
serialization_schema._j_serialization_schema,
)
super(RMQSink, self).__init__(sink_func=j_rmq_sink)