| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import abc |
| from enum import Enum |
| from typing import Dict, List, Union |
| |
| from pyflink.common import typeinfo |
| from pyflink.common.serialization import DeserializationSchema, Encoder, SerializationSchema |
| from pyflink.common.typeinfo import RowTypeInfo, WrapperTypeInfo, TypeInformation |
| from pyflink.datastream.functions import SourceFunction, SinkFunction |
| from pyflink.java_gateway import get_gateway |
| from pyflink.util.utils import load_java_class, to_jarray |
| |
| from py4j.java_gateway import java_import |
| |
| |
| __all__ = [ |
| 'FlinkKafkaConsumer', |
| 'FlinkKafkaProducer', |
| 'JdbcSink', |
| 'JdbcConnectionOptions', |
| 'JdbcExecutionOptions', |
| 'RollingPolicy', |
| 'DefaultRollingPolicy', |
| 'StreamingFileSink', |
| 'OutputFileConfig'] |
| |
| |
| class FlinkKafkaConsumerBase(SourceFunction, abc.ABC): |
| """ |
| Base class of all Flink Kafka Consumer data sources. This implements the common behavior across |
| all kafka versions. |
| |
| The Kafka version specific behavior is defined mainly in the specific subclasses. |
| """ |
| |
| def __init__(self, j_flink_kafka_consumer): |
| super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer) |
| |
| def set_commit_offsets_on_checkpoints(self, |
| commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase': |
| """ |
| Specifies whether or not the consumer should commit offsets back to kafka on checkpoints. |
| This setting will only have effect if checkpointing is enabled for the job. If checkpointing |
| isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+) |
| property settings will be used. |
| """ |
| self._j_function = self._j_function \ |
| .setCommitOffsetsOnCheckpoints(commit_on_checkpoints) |
| return self |
| |
| def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase': |
| """ |
| Specifies the consumer to start reading from the earliest offset for all partitions. This |
| lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers. |
| |
| This method does not affect where partitions are read from when the consumer is restored |
| from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
| savepoint, only the offsets in the restored state will be used. |
| """ |
| self._j_function = self._j_function.setStartFromEarliest() |
| return self |
| |
| def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase': |
| """ |
| Specifies the consuer to start reading from the latest offset for all partitions. This lets |
| the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. |
| |
| This method does not affect where partitions are read from when the consumer is restored |
| from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
| savepoint, only the offsets in the restored state will be used. |
| """ |
| self._j_function = self._j_function.setStartFromLatest() |
| return self |
| |
| def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase': |
| """ |
| Specifies the consumer to start reading partitions from a specified timestamp. The specified |
| timestamp must be before the current timestamp. This lets the consumer ignore any committed |
| group offsets in Zookeeper / Kafka brokers. |
| |
| The consumer will look up the earliest offset whose timestamp is greater than or equal to |
| the specific timestamp from Kafka. If there's no such offset, the consumer will use the |
| latest offset to read data from Kafka. |
| |
| This method does not affect where partitions are read from when the consumer is restored |
| from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
| savepoint, only the offsets in the restored state will be used. |
| |
| :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for |
| epoch. |
| """ |
| self._j_function = self._j_function.setStartFromTimestamp( |
| startup_offsets_timestamp) |
| return self |
| |
| def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase': |
| """ |
| Specifies the consumer to start reading from any committed group offsets found in Zookeeper/ |
| Kafka brokers. The 'group.id' property must be set in the configuration properties. If no |
| offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the |
| configuration properties will be used for the partition. |
| |
| This method does not affect where partitions are read from when the consumer is restored |
| from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
| savepoint, only the offsets in the restored state will be used. |
| """ |
| self._j_function = self._j_function.setStartFromGroupOffsets() |
| return self |
| |
| def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase': |
| """ |
| By default, when restoring from a checkpoint / savepoint, the consumer always ignores |
| restored partitions that are no longer associated with the current specified topics or topic |
| pattern to subscribe to. |
| |
| This method does not affect where partitions are read from when the consumer is restored |
| from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
| savepoint, only the offsets in the restored state will be used. |
| """ |
| self._j_function = self._j_function \ |
| .disableFilterRestoredPartitionsWithSubscribedTopics() |
| return self |
| |
| def get_produced_type(self) -> TypeInformation: |
| return typeinfo._from_java_type(self._j_function.getProducedType()) |
| |
| |
| class FlinkKafkaConsumer(FlinkKafkaConsumerBase): |
| """ |
| The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from |
| Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will |
| pull data from one or more Kafka partitions. |
| |
| The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost |
| during a failure, and taht the computation processes elements 'exactly once. (These guarantees |
| naturally assume that Kafka itself does not loose any data.) |
| |
| Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. |
| The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in |
| sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of |
| how far the Flink Kafka consumer has consumed a topic. |
| |
| Please refer to Kafka's documentation for the available configuration properties: |
| http://kafka.apache.org/documentation.html#newconsumerconfigs |
| """ |
| |
| def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, |
| properties: Dict): |
| """ |
| Creates a new Kafka streaming source consumer for Kafka 0.10.x. |
| |
| This constructor allows passing multiple topics to the consumer. |
| |
| :param topics: The Kafka topics to read from. |
| :param deserialization_schema: The de-/serializer used to convert between Kafka's byte |
| messages and Flink's objects. |
| :param properties: The properties that are used to configure both the fetcher and the offset |
| handler. |
| """ |
| |
| JFlinkKafkaConsumer = get_gateway().jvm \ |
| .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer |
| j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, |
| JFlinkKafkaConsumer) |
| super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer) |
| |
| |
| class FlinkKafkaProducerBase(SinkFunction, abc.ABC): |
| """ |
| Flink Sink to produce data into a Kafka topic. |
| |
| Please note that this producer provides at-least-once reliability guarantees when checkpoints |
| are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any |
| reliability guarantees. |
| """ |
| |
| def __init__(self, j_flink_kafka_producer): |
| super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) |
| |
| def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': |
| """ |
| Defines whether the producer should fail on errors, or only log them. If this is set to |
| true, then exceptions will be only logged, if set to false, exceptions will be eventually |
| thrown and cause the streaming program to fail (and enter recovery). |
| |
| :param log_failures_only: The flag to indicate logging-only on exceptions. |
| """ |
| self._j_function.setLogFailuresOnly(log_failures_only) |
| return self |
| |
| def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': |
| """ |
| If set to true, the Flink producer will wait for all outstanding messages in the Kafka |
| buffers to be acknowledged by the Kafka producer on a checkpoint. |
| |
| This way, the producer can guarantee that messages in the Kafka buffers are part of the |
| checkpoint. |
| |
| :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) |
| """ |
| self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) |
| return self |
| |
| def set_write_timestamp_to_kafka(self, |
| write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': |
| """ |
| If set to true, Flink will write the (event time) timestamp attached to each record into |
| Kafka. Timestamps must be positive for Kafka to accept them. |
| |
| :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written |
| to Kafka. |
| """ |
| self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) |
| return self |
| |
| |
| class Semantic(Enum): |
| """ |
| Semantics that can be chosen. |
| :data: `EXACTLY_ONCE`: |
| The Flink producer will write all messages in a Kafka transaction that will be committed to |
| the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of |
| FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is |
| being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint |
| complete notifications are running late, FlinkKafkaProducer can run out of |
| FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- |
| State() requests will fail and the FlinkKafkaProducer will keep using the |
| FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints |
| there are four options: |
| 1. decrease number of max concurrent checkpoints |
| 2. make checkpoints mre reliable (so that they complete faster) |
| 3. increase delay between checkpoints |
| 4. increase size of FlinkKafkaProducers pool |
| |
| :data: `AT_LEAST_ONCE`: |
| The Flink producer will wait for all outstanding messages in the Kafka buffers to be |
| acknowledged by the Kafka producer on a checkpoint. |
| |
| :data: `NONE`: |
| Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of |
| failure. |
| """ |
| |
| EXACTLY_ONCE = 0, |
| AT_LEAST_ONCE = 1, |
| NONE = 2 |
| |
| def _to_j_semantic(self): |
| JSemantic = get_gateway().jvm \ |
| .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic |
| return getattr(JSemantic, self.name) |
| |
| |
| class FlinkKafkaProducer(FlinkKafkaProducerBase): |
| """ |
| Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By |
| default producer will use AT_LEAST_ONCE sematic. Before using EXACTLY_ONCE please refer to |
| Flink's Kafka connector documentation. |
| """ |
| |
| def __init__(self, topic: str, serialization_schema: SerializationSchema, |
| producer_config: Dict, kafka_producer_pool_size: int = 5, |
| semantic=Semantic.AT_LEAST_ONCE): |
| """ |
| Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. |
| |
| Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. |
| This default partitioner maps each sink subtask to a single Kafka partition (i.e. all |
| records received by a sink subtask will end up in the same Kafka partition). |
| |
| :param topic: ID of the Kafka topic. |
| :param serialization_schema: User defined key-less serialization schema. |
| :param producer_config: Properties with the producer configuration. |
| """ |
| gateway = get_gateway() |
| j_properties = gateway.jvm.java.util.Properties() |
| for key, value in producer_config.items(): |
| j_properties.setProperty(key, value) |
| |
| JFlinkKafkaProducer = gateway.jvm \ |
| .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer |
| |
| j_flink_kafka_producer = JFlinkKafkaProducer( |
| topic, serialization_schema._j_serialization_schema, j_properties, None, |
| semantic._to_j_semantic(), kafka_producer_pool_size) |
| super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer) |
| |
| def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': |
| """ |
| Disables the propagation of exceptions thrown when committing presumably timed out Kafka |
| transactions during recovery of the job. If a Kafka transaction is timed out, a commit will |
| never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions |
| will still be logged to inform the user that data loss might have occurred. |
| |
| Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, |
| only exceptions thrown during the recovery are caught, i.e., the producer will attempt at |
| least one commit of the transaction before giving up. |
| |
| :return: This FlinkKafkaProducer. |
| """ |
| self._j_function.ignoreFailuresAfterTransactionTimeout() |
| return self |
| |
| |
| def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz): |
| if not isinstance(topics, list): |
| topics = [topics] |
| gateway = get_gateway() |
| j_properties = gateway.jvm.java.util.Properties() |
| for key, value in properties.items(): |
| j_properties.setProperty(key, value) |
| |
| j_flink_kafka_consumer = j_consumer_clz(topics, |
| deserialization_schema._j_deserialization_schema, |
| j_properties) |
| return j_flink_kafka_consumer |
| |
| |
| class JdbcSink(SinkFunction): |
| |
| def __init__(self, j_jdbc_sink): |
| super(JdbcSink, self).__init__(sink_func=j_jdbc_sink) |
| |
| @staticmethod |
| def sink(sql: str, type_info: RowTypeInfo, jdbc_connection_options: 'JdbcConnectionOptions', |
| jdbc_execution_options: 'JdbcExecutionOptions' = None): |
| """ |
| Create a JDBC sink. |
| |
| :param sql: arbitrary DML query (e.g. insert, update, upsert) |
| :param type_info: A RowTypeInfo for query field types. |
| :param jdbc_execution_options: parameters of execution, such as batch size and maximum |
| retries. |
| :param jdbc_connection_options: parameters of connection, such as JDBC URL. |
| :return: A JdbcSink. |
| """ |
| sql_types = [] |
| gateway = get_gateway() |
| JJdbcTypeUtil = gateway.jvm.org.apache.flink.connector.jdbc.utils.JdbcTypeUtil |
| for field_type in type_info.get_field_types(): |
| if isinstance(field_type, WrapperTypeInfo): |
| sql_types.append(JJdbcTypeUtil |
| .typeInformationToSqlType(field_type.get_java_type_info())) |
| else: |
| raise ValueError('field_type must be WrapperTypeInfo') |
| j_sql_type = to_jarray(gateway.jvm.int, sql_types) |
| output_format_clz = gateway.jvm.Class\ |
| .forName('org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat', False, |
| get_gateway().jvm.Thread.currentThread().getContextClassLoader()) |
| j_int_array_type = to_jarray(gateway.jvm.int, []).getClass() |
| j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder', |
| to_jarray(gateway.jvm.Class, |
| [j_int_array_type])) |
| j_builder_method.setAccessible(True) |
| j_statement_builder = j_builder_method.invoke(None, to_jarray(gateway.jvm.Object, |
| [j_sql_type])) |
| |
| jdbc_execution_options = jdbc_execution_options if jdbc_execution_options is not None \ |
| else JdbcExecutionOptions.defaults() |
| j_jdbc_sink = gateway.jvm.org.apache.flink.connector.jdbc.JdbcSink\ |
| .sink(sql, j_statement_builder, jdbc_execution_options._j_jdbc_execution_options, |
| jdbc_connection_options._j_jdbc_connection_options) |
| return JdbcSink(j_jdbc_sink=j_jdbc_sink) |
| |
| |
| class JdbcConnectionOptions(object): |
| """ |
| JDBC connection options. |
| """ |
| def __init__(self, j_jdbc_connection_options): |
| self._j_jdbc_connection_options = j_jdbc_connection_options |
| |
| def get_db_url(self) -> str: |
| return self._j_jdbc_connection_options.getDbURL() |
| |
| def get_driver_name(self) -> str: |
| return self._j_jdbc_connection_options.getDriverName() |
| |
| def get_user_name(self) -> str: |
| return self._j_jdbc_connection_options.getUsername() |
| |
| def get_password(self) -> str: |
| return self._j_jdbc_connection_options.getPassword() |
| |
| class JdbcConnectionOptionsBuilder(object): |
| """ |
| Builder for JdbcConnectionOptions. |
| """ |
| def __init__(self): |
| self._j_options_builder = get_gateway().jvm.org.apache.flink.connector\ |
| .jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder() |
| |
| def with_url(self, url: str) -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': |
| self._j_options_builder.withUrl(url) |
| return self |
| |
| def with_driver_name(self, driver_name: str) \ |
| -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': |
| self._j_options_builder.withDriverName(driver_name) |
| return self |
| |
| def with_user_name(self, user_name: str) \ |
| -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': |
| self._j_options_builder.withUsername(user_name) |
| return self |
| |
| def with_password(self, password: str) \ |
| -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': |
| self._j_options_builder.withPassword(password) |
| return self |
| |
| def build(self) -> 'JdbcConnectionOptions': |
| return JdbcConnectionOptions(j_jdbc_connection_options=self._j_options_builder.build()) |
| |
| |
| class JdbcExecutionOptions(object): |
| """ |
| JDBC sink batch options. |
| """ |
| def __init__(self, j_jdbc_execution_options): |
| self._j_jdbc_execution_options = j_jdbc_execution_options |
| |
| def get_batch_interval_ms(self) -> int: |
| return self._j_jdbc_execution_options.getBatchIntervalMs() |
| |
| def get_batch_size(self) -> int: |
| return self._j_jdbc_execution_options.getBatchSize() |
| |
| def get_max_retries(self) -> int: |
| return self._j_jdbc_execution_options.getMaxRetries() |
| |
| @staticmethod |
| def defaults() -> 'JdbcExecutionOptions': |
| return JdbcExecutionOptions( |
| j_jdbc_execution_options=get_gateway().jvm |
| .org.apache.flink.connector.jdbc.JdbcExecutionOptions.defaults()) |
| |
| @staticmethod |
| def builder() -> 'Builder': |
| return JdbcExecutionOptions.Builder() |
| |
| class Builder(object): |
| """ |
| Builder for JdbcExecutionOptions. |
| """ |
| def __init__(self): |
| self._j_builder = get_gateway().jvm\ |
| .org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder() |
| |
| def with_batch_size(self, size: int) -> 'JdbcExecutionOptions.Builder': |
| self._j_builder.withBatchSize(size) |
| return self |
| |
| def with_batch_interval_ms(self, interval_ms: int) -> 'JdbcExecutionOptions.Builder': |
| self._j_builder.withBatchIntervalMs(interval_ms) |
| return self |
| |
| def with_max_retries(self, max_retries: int) -> 'JdbcExecutionOptions.Builder': |
| self._j_builder.withMaxRetries(max_retries) |
| return self |
| |
| def build(self) -> 'JdbcExecutionOptions': |
| return JdbcExecutionOptions(j_jdbc_execution_options=self._j_builder.build()) |
| |
| |
| class RollingPolicy(object): |
| """ |
| The policy based on which a `Bucket` in the `StreamingFileSink` |
| rolls its currently open part file and opens a new one. |
| """ |
| |
| def __init__(self, j_policy): |
| self.j_policy = j_policy |
| |
| |
| class DefaultRollingPolicy(RollingPolicy): |
| """ |
| The default implementation of the `RollingPolicy`. |
| """ |
| |
| def __init__(self, j_policy): |
| super(DefaultRollingPolicy, self).__init__(j_policy) |
| |
| @staticmethod |
| def builder() -> 'DefaultRollingPolicy.PolicyBuilder': |
| """ |
| Creates a new `PolicyBuilder` that is used to configure and build |
| an instance of `DefaultRollingPolicy`. |
| """ |
| return DefaultRollingPolicy.PolicyBuilder() |
| |
| class PolicyBuilder(object): |
| """ |
| A helper class that holds the configuration properties for the `DefaultRollingPolicy`. |
| The `PolicyBuilder.build()` method must be called to instantiate the policy. |
| """ |
| |
| def __init__(self): |
| self.part_size = 1024 * 1024 * 128 |
| self.rollover_interval = 60 * 1000 |
| self.inactivity_interval = 60 * 1000 |
| |
| def with_max_part_size(self, size: int) -> 'DefaultRollingPolicy.PolicyBuilder': |
| """ |
| Sets the part size above which a part file will have to roll. |
| |
| :param size: the allowed part size. |
| """ |
| assert size > 0 |
| self.part_size = size |
| return self |
| |
| def with_inactivity_interval(self, interval: int) -> 'DefaultRollingPolicy.PolicyBuilder': |
| """ |
| Sets the interval of allowed inactivity after which a part file will have to roll. |
| |
| :param interval: the allowed inactivity interval. |
| """ |
| assert interval > 0 |
| self.inactivity_interval = interval |
| return self |
| |
| def with_rollover_interval(self, interval) -> 'DefaultRollingPolicy.PolicyBuilder': |
| """ |
| Sets the max time a part file can stay open before having to roll. |
| |
| :param interval: the desired rollover interval. |
| """ |
| self.rollover_interval = interval |
| return self |
| |
| def build(self) -> 'DefaultRollingPolicy': |
| """ |
| Creates the actual policy. |
| """ |
| j_builder = get_gateway().jvm.org.apache.flink.streaming.api.\ |
| functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.create() |
| j_builder = j_builder.withMaxPartSize(self.part_size) |
| j_builder = j_builder.withInactivityInterval(self.inactivity_interval) |
| j_builder = j_builder.withRolloverInterval(self.rollover_interval) |
| return DefaultRollingPolicy(j_builder.build()) |
| |
| |
| class StreamingFileSink(SinkFunction): |
| """ |
| Sink that emits its input elements to `FileSystem` files within buckets. This is |
| integrated with the checkpointing mechanism to provide exactly once semantics. |
| |
| |
| When creating the sink a `basePath` must be specified. The base directory contains |
| one directory for every bucket. The bucket directories themselves contain several part files, |
| with at least one for each parallel subtask of the sink which is writing data to that bucket. |
| These part files contain the actual output data. |
| """ |
| |
| def __init__(self, j_obj): |
| super(StreamingFileSink, self).__init__(j_obj) |
| |
| class DefaultRowFormatBuilder(object): |
| """ |
| Builder for the vanilla `StreamingFileSink` using a row format. |
| """ |
| |
| def __init__(self, j_default_row_format_builder): |
| self.j_default_row_format_builder = j_default_row_format_builder |
| |
| def with_bucket_check_interval( |
| self, interval: int) -> 'StreamingFileSink.DefaultRowFormatBuilder': |
| self.j_default_row_format_builder.withBucketCheckInterval(interval) |
| return self |
| |
| def with_bucket_assigner( |
| self, |
| assigner_class_name: str) -> 'StreamingFileSink.DefaultRowFormatBuilder': |
| gateway = get_gateway() |
| java_import(gateway.jvm, assigner_class_name) |
| j_record_class = load_java_class(assigner_class_name) |
| self.j_default_row_format_builder.withBucketAssigner(j_record_class) |
| return self |
| |
| def with_rolling_policy( |
| self, |
| policy: RollingPolicy) -> 'StreamingFileSink.DefaultRowFormatBuilder': |
| self.j_default_row_format_builder.withRollingPolicy(policy.j_policy) |
| return self |
| |
| def with_output_file_config( |
| self, |
| output_file_config: 'OutputFileConfig') \ |
| -> 'StreamingFileSink.DefaultRowFormatBuilder': |
| self.j_default_row_format_builder.withOutputFileConfig(output_file_config.j_obj) |
| return self |
| |
| def build(self) -> 'StreamingFileSink': |
| j_stream_file_sink = self.j_default_row_format_builder.build() |
| return StreamingFileSink(j_stream_file_sink) |
| |
| @staticmethod |
| def for_row_format(base_path: str, encoder: Encoder) -> 'DefaultRowFormatBuilder': |
| j_path = get_gateway().jvm.org.apache.flink.core.fs.Path(base_path) |
| j_default_row_format_builder = get_gateway().jvm.org.apache.flink.streaming.api.\ |
| functions.sink.filesystem.StreamingFileSink.forRowFormat(j_path, encoder.j_encoder) |
| return StreamingFileSink.DefaultRowFormatBuilder(j_default_row_format_builder) |
| |
| |
| class OutputFileConfig(object): |
| """ |
| Part file name configuration. |
| This allow to define a prefix and a suffix to the part file name. |
| """ |
| |
| @staticmethod |
| def builder(): |
| return OutputFileConfig.OutputFileConfigBuilder() |
| |
| def __init__(self, part_prefix: str, part_suffix: str): |
| self.j_obj = get_gateway().jvm.org.apache.flink.streaming.api.\ |
| functions.sink.filesystem.OutputFileConfig(part_prefix, part_suffix) |
| |
| def get_part_prefix(self) -> str: |
| """ |
| The prefix for the part name. |
| """ |
| return self.j_obj.getPartPrefix() |
| |
| def get_part_suffix(self) -> str: |
| """ |
| The suffix for the part name. |
| """ |
| return self.j_obj.getPartSuffix() |
| |
| class OutputFileConfigBuilder(object): |
| """ |
| A builder to create the part file configuration. |
| """ |
| |
| def __init__(self): |
| self.part_prefix = "part" |
| self.part_suffix = "" |
| |
| def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder': |
| self.part_prefix = prefix |
| return self |
| |
| def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder': |
| self.part_suffix = suffix |
| return self |
| |
| def build(self) -> 'OutputFileConfig': |
| return OutputFileConfig(self.part_prefix, self.part_suffix) |