blob: a955adf37517b967455c6b99d0d14ad160507f50 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from typing import Any
from pyflink.common.time import Duration
from pyflink.java_gateway import get_gateway
class WatermarkStrategy(object):
"""
The WatermarkStrategy defines how to generate Watermarks in the stream sources. The
WatermarkStrategy is a builder/factory for the WatermarkGenerator that generates the watermarks
and the TimestampAssigner which assigns the internal timestamp of a record.
The convenience methods, for example forBoundedOutOfOrderness(Duration), create a
WatermarkStrategy for common built in strategies.
"""
def __init__(self, j_watermark_strategy):
self._j_watermark_strategy = j_watermark_strategy
self._timestamp_assigner = None
def with_timestamp_assigner(self, timestamp_assigner: 'TimestampAssigner') -> \
'WatermarkStrategy':
"""
Creates a new WatermarkStrategy that wraps this strategy but instead uses the given a
TimestampAssigner by implementing TimestampAssigner interface.
::
>>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()
>>> .with_timestamp_assigner(MyTimestampAssigner())
:param timestamp_assigner: The given TimestampAssigner.
:return: A WaterMarkStrategy that wraps a TimestampAssigner.
"""
self._timestamp_assigner = timestamp_assigner
return self
def with_idleness(self, idle_timeout: Duration) -> 'WatermarkStrategy':
"""
Creates a new enriched WatermarkStrategy that also does idleness detection in the created
WatermarkGenerator.
:param idle_timeout: The idle timeout.
:return: A new WatermarkStrategy with idle detection configured.
"""
self._j_watermark_strategy = self._j_watermark_strategy\
.withIdleness(idle_timeout._j_duration)
return self
@staticmethod
def for_monotonous_timestamps():
"""
Creates a watermark strategy for situations with monotonously ascending timestamps.
The watermarks are generated periodically and tightly follow the latest timestamp in the
data. The delay introduced by this strategy is mainly the periodic interval in which the
watermarks are generated.
"""
JWaterMarkStrategy = get_gateway().jvm\
.org.apache.flink.api.common.eventtime.WatermarkStrategy
return WatermarkStrategy(JWaterMarkStrategy.forMonotonousTimestamps())
@staticmethod
def for_bounded_out_of_orderness(max_out_of_orderness: Duration):
"""
Creates a watermark strategy for situations where records are out of order, but you can
place an upper bound on how far the events are out of order. An out-of-order bound B means
that once the an event with timestamp T was encountered, no events older than (T - B) will
follow any more.
"""
JWaterMarkStrategy = get_gateway().jvm \
.org.apache.flink.api.common.eventtime.WatermarkStrategy
return WatermarkStrategy(
JWaterMarkStrategy.forBoundedOutOfOrderness(max_out_of_orderness._j_duration))
class TimestampAssigner(abc.ABC):
"""
A TimestampAssigner assigns event time timestamps to elements. These timestamps are used by all
functions that operate on event time, for example event time windows.
Timestamps can be an arbitrary int value, but all built-in implementations represent it as the
milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way as time.time() does
it.
"""
@abc.abstractmethod
def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
"""
Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
any particular time zone or calendar.
The method is passed the previously assigned timestamp of the element.
That previous timestamp may have been assigned from a previous assigner. If the element did
not carry a timestamp before, this value is the minimum value of int type.
:param value: The element that the timestamp will be assigned to.
:param record_timestamp: The current internal timestamp of the element, or a negative value,
if no timestamp has been assigned yet.
:return: The new timestamp.
"""
pass