blob: bd86cbab09224c5dacf6af9cf8cde02b8cdc1b6c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import functools
import os
import subprocess
from typing import Optional
def retry_check(max_tries=5, retry_interval=1):
def retry_check_func(func):
@functools.wraps(func)
def retry_wrapper(*args, **kwargs):
for _ in range(max_tries):
if func(*args, **kwargs):
return True
time.sleep(retry_interval)
return False
return retry_wrapper
return retry_check_func
def decode_escaped_str(str):
special = {"n": "\n", "v": "\v", "t": "\t", "f": "\f", "r": "\r", "a": "\a", "\\": "\\"}
escaped = False
result = ""
for ch in str:
if escaped:
if ch in special:
result += special[ch]
else:
result += "\\" + ch
escaped = False
elif ch == "\\":
escaped = True
else:
result += ch
if escaped:
result += "\\"
return result
def is_temporary_output_file(filepath):
return filepath.split(os.path.sep)[-1][0] == '.'
def get_minifi_pid() -> int:
return int(subprocess.run(["pidof", "-s", "minifi"], capture_output=True).stdout)
def get_peak_memory_usage(pid: int) -> Optional[int]:
with open("/proc/" + str(pid) + "/status") as stat_file:
for line in stat_file:
if "VmHWM" in line:
peak_resident_set_size = [int(s) for s in line.split() if s.isdigit()].pop()
return peak_resident_set_size * 1024
return None
def get_memory_usage(pid: int) -> Optional[int]:
with open("/proc/" + str(pid) + "/status") as stat_file:
for line in stat_file:
if "VmRSS" in line:
resident_set_size = [int(s) for s in line.split() if s.isdigit()].pop()
return resident_set_size * 1024
return None
def wait_for(action, timeout_seconds, check_period=1, *args, **kwargs):
start_time = time.perf_counter()
while True:
result = action(*args, **kwargs)
if result:
return result
time.sleep(check_period)
if timeout_seconds < (time.perf_counter() - start_time):
break
return False