blob: 60d6404ade348f4b17601a46c441bd70b7ad1562 [file] [log] [blame]
"""license: Apache License 2.0, see LICENSE for more details."""
"""Kazoo handler helpers"""
HAS_FNCTL = True
try:
import fcntl
except ImportError: # pragma: nocover
HAS_FNCTL = False
import functools
import os
def _set_fd_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def _set_default_tcpsock_options(module, sock):
sock.setsockopt(module.IPPROTO_TCP, module.TCP_NODELAY, 1)
if HAS_FNCTL:
_set_fd_cloexec(sock)
return sock
def create_pipe():
"""Create a non-blocking read/write pipe.
"""
r, w = os.pipe()
if HAS_FNCTL:
fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
_set_fd_cloexec(r)
_set_fd_cloexec(w)
return r, w
def create_tcp_socket(module):
"""Create a TCP socket with the CLOEXEC flag set.
"""
type_ = module.SOCK_STREAM
if hasattr(module, 'SOCK_CLOEXEC'): # pragma: nocover
# if available, set cloexec flag during socket creation
type_ |= module.SOCK_CLOEXEC
sock = module.socket(module.AF_INET, type_)
_set_default_tcpsock_options(module, sock)
return sock
def create_tcp_connection(module, address, timeout=None):
if timeout is None:
# thanks to create_connection() developers for
# this ugliness...
timeout = module._GLOBAL_DEFAULT_TIMEOUT
sock = module.create_connection(address, timeout)
_set_default_tcpsock_options(module, sock)
return sock
def capture_exceptions(async_result):
"""Return a new decorated function that propagates the exceptions of the
wrapped function to an async_result.
:param async_result: An async result implementing :class:`IAsyncResult`
"""
def capture(function):
@functools.wraps(function)
def captured_function(*args, **kwargs):
try:
return function(*args, **kwargs)
except Exception as exc:
async_result.set_exception(exc)
return captured_function
return capture
def wrap(async_result):
"""Return a new decorated function that propagates the return value or
exception of wrapped function to an async_result. NOTE: Only propagates a
non-None return value.
:param async_result: An async result implementing :class:`IAsyncResult`
"""
def capture(function):
@capture_exceptions(async_result)
def captured_function(*args, **kwargs):
value = function(*args, **kwargs)
if value is not None:
async_result.set(value)
return value
return captured_function
return capture