blob: 748de97c36395495543c6a8e891f56073c5cc02c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import contextlib
import os
import pyarrow as pa
import shutil
import subprocess
import sys
import tempfile
import time
from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa
PlasmaBuffer, PlasmaClient, connect)
# The Plasma TensorFlow Operator needs to be compiled on the end user's
# machine since the TensorFlow ABI is not stable between versions.
# The following code checks if the operator is already present. If not,
# the function build_plasma_tensorflow_op can be used to compile it.
TF_PLASMA_OP_PATH = os.path.join(pa.__path__[0], "tensorflow", "plasma_op.so")
tf_plasma_op = None
def load_plasma_tensorflow_op():
global tf_plasma_op
import tensorflow as tf
tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH)
def build_plasma_tensorflow_op():
global tf_plasma_op
try:
import tensorflow as tf
print("TensorFlow version: " + tf.__version__)
except ImportError:
pass
else:
print("Compiling Plasma TensorFlow Op...")
dir_path = os.path.dirname(os.path.realpath(__file__))
cc_path = os.path.join(dir_path, "tensorflow", "plasma_op.cc")
so_path = os.path.join(dir_path, "tensorflow", "plasma_op.so")
tf_cflags = tf.sysconfig.get_compile_flags()
if sys.platform == 'darwin':
tf_cflags = ["-undefined", "dynamic_lookup"] + tf_cflags
cmd = ["g++", "-std=c++11", "-g", "-shared", cc_path,
"-o", so_path, "-DNDEBUG", "-I" + pa.get_include()]
cmd += ["-L" + dir for dir in pa.get_library_dirs()]
cmd += ["-lplasma", "-larrow_python", "-larrow", "-fPIC"]
cmd += tf_cflags
cmd += tf.sysconfig.get_link_flags()
cmd += ["-O2"]
if tf.test.is_built_with_cuda():
cmd += ["-DGOOGLE_CUDA"]
print("Running command " + str(cmd))
subprocess.check_call(cmd)
tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH)
@contextlib.contextmanager
def start_plasma_store(plasma_store_memory,
use_valgrind=False, use_profiler=False,
plasma_directory=None, use_hugepages=False,
external_store=None):
"""Start a plasma store process.
Args:
plasma_store_memory (int): Capacity of the plasma store in bytes.
use_valgrind (bool): True if the plasma store should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside
a profiler. If this is True, use_valgrind must be False.
plasma_directory (str): Directory where plasma memory mapped files
will be stored.
use_hugepages (bool): True if the plasma store should use huge pages.
external_store (str): External store to use for evicted objects.
Return:
A tuple of the name of the plasma store socket and the process ID of
the plasma store process.
"""
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
try:
plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
plasma_store_executable = os.path.join(
pa.__path__[0], "plasma_store_server")
command = [plasma_store_executable,
"-s", plasma_store_name,
"-m", str(plasma_store_memory)]
if plasma_directory:
command += ["-d", plasma_directory]
if use_hugepages:
command += ["-h"]
if external_store is not None:
command += ["-e", external_store]
stdout_file = None
stderr_file = None
if use_valgrind:
command = ["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--leak-check-heuristics=stdstring",
"--error-exitcode=1"] + command
proc = subprocess.Popen(command, stdout=stdout_file,
stderr=stderr_file)
time.sleep(1.0)
elif use_profiler:
command = ["valgrind", "--tool=callgrind"] + command
proc = subprocess.Popen(command, stdout=stdout_file,
stderr=stderr_file)
time.sleep(1.0)
else:
proc = subprocess.Popen(command, stdout=stdout_file,
stderr=stderr_file)
time.sleep(0.1)
rc = proc.poll()
if rc is not None:
raise RuntimeError("plasma_store exited unexpectedly with "
"code %d" % (rc,))
yield plasma_store_name, proc
finally:
if proc.poll() is None:
proc.kill()
shutil.rmtree(tmpdir)