blob: a25aba034355371f4c43885bc4a395456de9708c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Module to condition how Interactive Beam stops capturing data.
For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import
import threading
from apache_beam.runners.interactive import interactive_environment as ie
class Limiter:
"""Limits an aspect of the caching layer."""
def is_triggered(self):
# type: () -> bool
"""Returns True if the limiter has triggered, and caching should stop."""
raise NotImplementedError
class SizeLimiter(Limiter):
"""Limits the cache size to a specified byte limit."""
def __init__(
self,
size_limit # type: int
):
self._size_limit = size_limit
def is_triggered(self):
total_capture_size = 0
ie.current_env().track_user_pipelines()
for user_pipeline in ie.current_env().tracked_user_pipelines:
cache_manager = ie.current_env().get_cache_manager(user_pipeline)
if hasattr(cache_manager, 'capture_size'):
total_capture_size += cache_manager.capture_size
return total_capture_size >= self._size_limit
class DurationLimiter(Limiter):
"""Limits the duration of the capture."""
def __init__(
self,
duration_limit # type: datetime.timedelta
):
self._duration_limit = duration_limit
self._timer = threading.Timer(duration_limit.total_seconds(), self._trigger)
self._timer.daemon = True
self._triggered = False
self._timer.start()
def _trigger(self):
self._triggered = True
def is_triggered(self):
return self._triggered