blob: a4c4982ecb38f5287373b5bebbd9a180f8a7c269 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import io
import os
import random
import socket
import subprocess
import sys
import threading
import uuid
import numpy as np
def guid():
return uuid.uuid4().hex
# SKIP categories
SKIP_ARROW = 'arrow'
SKIP_FLIGHT = 'flight'
ARROW_ROOT_DEFAULT = os.environ.get(
'ARROW_ROOT',
os.path.abspath(__file__).rsplit("/", 5)[0]
)
class _Printer:
"""
A print()-providing object that can override the stream output on
a per-thread basis.
"""
def __init__(self):
self._tls = threading.local()
def _get_stdout(self):
try:
return self._tls.stdout
except AttributeError:
self._tls.stdout = sys.stdout
self._tls.corked = False
return self._tls.stdout
def print(self, *args, **kwargs):
"""
A variant of print() that writes to a thread-local stream.
"""
print(*args, file=self._get_stdout(), **kwargs)
@property
def stdout(self):
"""
A thread-local stdout wrapper that may be temporarily buffered
using `cork()`.
"""
return self._get_stdout()
@contextlib.contextmanager
def cork(self):
"""
Temporarily buffer this thread's stream and write out its contents
at the end of the context manager. Useful to avoid interleaved
output when multiple threads output progress information.
"""
outer_stdout = self._get_stdout()
assert not self._tls.corked, "reentrant call"
inner_stdout = self._tls.stdout = io.StringIO()
self._tls.corked = True
try:
yield
finally:
self._tls.stdout = outer_stdout
self._tls.corked = False
outer_stdout.write(inner_stdout.getvalue())
outer_stdout.flush()
printer = _Printer()
log = printer.print
_RAND_CHARS = np.array(list("abcdefghijklmnop123456Ârrôwµ£°€矢"), dtype="U")
def random_utf8(nchars):
"""
Generate one random UTF8 string.
"""
return ''.join(np.random.choice(_RAND_CHARS, nchars))
def random_bytes(nbytes):
"""
Generate one random binary string.
"""
# NOTE getrandbits(0) fails
if nbytes > 0:
return random.getrandbits(nbytes * 8).to_bytes(nbytes,
byteorder='little')
else:
return b""
def tobytes(o):
if isinstance(o, str):
return o.encode('utf8')
return o
def frombytes(o):
if isinstance(o, bytes):
return o.decode('utf8')
return o
def run_cmd(cmd):
if isinstance(cmd, str):
cmd = cmd.split(' ')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
# this avoids hiding the stdout / stderr of failed processes
sio = io.StringIO()
print('Command failed:', " ".join(cmd), file=sio)
print('With output:', file=sio)
print('--------------', file=sio)
print(frombytes(e.output), file=sio)
print('--------------', file=sio)
raise RuntimeError(sio.getvalue())
return frombytes(output)
# Adapted from CPython
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
the specified host address (defaults to 0.0.0.0) with the port set to 0,
eliciting an unused ephemeral port from the OS. The temporary socket is
then closed and deleted, and the ephemeral port is returned.
"""
with socket.socket(family, socktype) as tempsock:
tempsock.bind(('', 0))
port = tempsock.getsockname()[1]
del tempsock
return port