blob: 08fca45c31c8afd8a790cdee443d70b36847e7f3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Core windowing data structures.
This module is experimental. No backwards-compatibility guarantees.
"""
# This module is carefully crafted to have optimal performance when
# compiled while still being valid Python. Care needs to be taken when
# editing this file as WindowedValues are created for every element for
# every step in a Beam pipeline.
# cython: profile=True
# cython: language_level=3
# pytype: skip-file
from typing import TYPE_CHECKING
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import TimestampTypes # pylint: disable=unused-import
if TYPE_CHECKING:
from apache_beam.transforms.window import BoundedWindow
class PaneInfoTiming(object):
"""The timing of a PaneInfo."""
EARLY = 0
ON_TIME = 1
LATE = 2
UNKNOWN = 3
@classmethod
def to_string(cls, value):
return {
cls.EARLY: 'EARLY',
cls.ON_TIME: 'ON_TIME',
cls.LATE: 'LATE',
cls.UNKNOWN: 'UNKNOWN',
}[value]
@classmethod
def from_string(cls, value):
return {
'EARLY': cls.EARLY,
'ON_TIME': cls.ON_TIME,
'LATE': cls.LATE,
'UNKNOWN': cls.UNKNOWN
}[value]
class PaneInfo(object):
"""Describes the trigger firing information for a given WindowedValue.
"Panes" represent individual firings on a single window. ``PaneInfo``s are
passed downstream after trigger firings. They contain information about
whether it's an early/on time/late firing, if it's the last or first firing
from a window, and the index of the firing.
"""
def __init__(self, is_first, is_last, timing, index, nonspeculative_index):
self._is_first = is_first
self._is_last = is_last
self._timing = timing
self._index = index
self._nonspeculative_index = nonspeculative_index
self._encoded_byte = self._get_encoded_byte()
def _get_encoded_byte(self):
byte = 0
if self._is_first:
byte |= 1
if self._is_last:
byte |= 2
byte |= self._timing << 2
return byte
@staticmethod
def from_encoded_byte(encoded_byte):
assert encoded_byte in _BYTE_TO_PANE_INFO
return _BYTE_TO_PANE_INFO[encoded_byte]
# Because common PaneInfo objects are cached, it is important that the value
# is immutable. We therefore explicitly enforce this here with read-only
# properties.
@property
def is_first(self):
return self._is_first
@property
def is_last(self):
return self._is_last
@property
def timing(self):
return self._timing
@property
def index(self):
# type: () -> int
return self._index
@property
def nonspeculative_index(self):
# type: () -> int
return self._nonspeculative_index
@property
def encoded_byte(self):
# type: () -> int
return self._encoded_byte
def __repr__(self):
return (
'PaneInfo(first: %r, last: %r, timing: %s, index: %d, '
'nonspeculative_index: %d)') % (
self.is_first,
self.is_last,
PaneInfoTiming.to_string(self.timing),
self.index,
self.nonspeculative_index)
def __eq__(self, other):
if self is other:
return True
return (
self.is_first == other.is_first and self.is_last == other.is_last and
self.timing == other.timing and self.index == other.index and
self.nonspeculative_index == other.nonspeculative_index)
def __hash__(self):
return hash((
self.is_first,
self.is_last,
self.timing,
self.index,
self.nonspeculative_index))
def __reduce__(self):
return PaneInfo, (self._is_first, self._is_last, self._timing, self._index,
self._nonspeculative_index)
def _construct_well_known_pane_infos():
# type: () -> List[PaneInfo]
pane_infos = []
for timing in (PaneInfoTiming.EARLY,
PaneInfoTiming.ON_TIME,
PaneInfoTiming.LATE,
PaneInfoTiming.UNKNOWN):
nonspeculative_index = -1 if timing == PaneInfoTiming.EARLY else 0
pane_infos.append(PaneInfo(True, True, timing, 0, nonspeculative_index))
pane_infos.append(PaneInfo(True, False, timing, 0, nonspeculative_index))
pane_infos.append(PaneInfo(False, True, timing, -1, nonspeculative_index))
pane_infos.append(PaneInfo(False, False, timing, -1, nonspeculative_index))
result = [None] * (
max(p.encoded_byte for p in pane_infos) + 1
) # type: List[PaneInfo] # type: ignore[list-item]
for pane_info in pane_infos:
result[pane_info.encoded_byte] = pane_info
return result
# Cache of well-known PaneInfo objects.
_BYTE_TO_PANE_INFO = _construct_well_known_pane_infos()
# Default PaneInfo descriptor for when a value is not the output of triggering.
PANE_INFO_UNKNOWN = _BYTE_TO_PANE_INFO[0xF]
class WindowedValue(object):
"""A windowed value having a value, a timestamp and set of windows.
Attributes:
value: The underlying value of a windowed value.
timestamp: Timestamp associated with the value as seconds since Unix epoch.
windows: A set (iterable) of window objects for the value. The window
object are descendants of the BoundedWindow class.
pane_info: A PaneInfo descriptor describing the triggering information for
the pane that contained this value. If None, will be set to
PANE_INFO_UNKNOWN.
"""
def __init__(self,
value,
timestamp, # type: TimestampTypes
windows, # type: Tuple[BoundedWindow, ...]
pane_info=PANE_INFO_UNKNOWN # type: PaneInfo
):
# type: (...) -> None
# For performance reasons, only timestamp_micros is stored by default
# (as a C int). The Timestamp object is created on demand below.
self.value = value
if isinstance(timestamp, int):
self.timestamp_micros = timestamp * 1000000
if TYPE_CHECKING:
self.timestamp_object = None # type: Optional[Timestamp]
else:
self.timestamp_object = (
timestamp
if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
self.timestamp_micros = self.timestamp_object.micros
self.windows = windows
self.pane_info = pane_info
@property
def timestamp(self):
# type: () -> Timestamp
if self.timestamp_object is None:
self.timestamp_object = Timestamp(0, self.timestamp_micros)
return self.timestamp_object
def __repr__(self):
return '(%s, %s, %s, %s)' % (
repr(self.value),
'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else 'MAX_TIMESTAMP'
if self.timestamp == MAX_TIMESTAMP else float(self.timestamp),
self.windows,
self.pane_info)
def __eq__(self, other):
return (
type(self) == type(other) and
self.timestamp_micros == other.timestamp_micros and
self.value == other.value and self.windows == other.windows and
self.pane_info == other.pane_info)
def __hash__(self):
return ((hash(self.value) & 0xFFFFFFFFFFFFFFF) + 3 *
(self.timestamp_micros & 0xFFFFFFFFFFFFFF) + 7 *
(hash(self.windows) & 0xFFFFFFFFFFFFF) + 11 *
(hash(self.pane_info) & 0xFFFFFFFFFFFFF))
def with_value(self, new_value):
# type: (Any) -> WindowedValue
"""Creates a new WindowedValue with the same timestamps and windows as this.
This is the fasted way to create a new WindowedValue.
"""
return create(
new_value, self.timestamp_micros, self.windows, self.pane_info)
def __reduce__(self):
return WindowedValue, (
self.value, self.timestamp, self.windows, self.pane_info)
# TODO(robertwb): Move this to a static method.
def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
wv = WindowedValue.__new__(WindowedValue)
wv.value = value
wv.timestamp_micros = timestamp_micros
wv.windows = windows
wv.pane_info = pane_info
return wv
try:
WindowedValue.timestamp_object = None
except TypeError:
# When we're compiled, we can't dynamically add attributes to
# the cdef class, but in this case it's OK as it's already present
# on each instance.
pass
class _IntervalWindowBase(object):
"""Optimized form of IntervalWindow storing only microseconds for endpoints.
"""
def __init__(self, start, end):
# type: (TimestampTypes, TimestampTypes) -> None
if start is not None:
self._start_object = Timestamp.of(start) # type: Optional[Timestamp]
try:
self._start_micros = self._start_object.micros
except OverflowError:
self._start_micros = (
MIN_TIMESTAMP.micros
if self._start_object.micros < 0 else MAX_TIMESTAMP.micros)
else:
# Micros must be populated elsewhere.
self._start_object = None
if end is not None:
self._end_object = Timestamp.of(end) # type: Optional[Timestamp]
try:
self._end_micros = self._end_object.micros
except OverflowError:
self._end_micros = (
MIN_TIMESTAMP.micros
if self._end_object.micros < 0 else MAX_TIMESTAMP.micros)
else:
# Micros must be populated elsewhere.
self._end_object = None
@property
def start(self):
# type: () -> Timestamp
if self._start_object is None:
self._start_object = Timestamp(0, self._start_micros)
return self._start_object
@property
def end(self):
# type: () -> Timestamp
if self._end_object is None:
self._end_object = Timestamp(0, self._end_micros)
return self._end_object
def __hash__(self):
return hash((self._start_micros, self._end_micros))
def __eq__(self, other):
return (
type(self) == type(other) and
self._start_micros == other._start_micros and
self._end_micros == other._end_micros)
def __repr__(self):
return '[%s, %s)' % (float(self.start), float(self.end))