blob: 319d46d11f0d6af9c808feaa72e71b240d690196 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef SAFE_CALL_INTO_R_INCLUDED
#define SAFE_CALL_INTO_R_INCLUDED
#include "./arrow_types.h"
#include <arrow/io/interfaces.h>
#include <arrow/util/cancel.h>
#include <arrow/util/future.h>
#include <arrow/util/thread_pool.h>
#include <csignal>
#include <functional>
#include <thread>
// Unwind protection was added in R 3.5 and some calls here use it
// and crash R in older versions (ARROW-16201). Implementation provided
// in safe-call-into-r-impl.cpp so that we can skip some tests
// when this feature is not provided. This also checks that there
// is not already an event loop registered (via MainRThread::Executor()),
// because only one of these can exist at any given time.
bool CanRunWithCapturedR();
// The MainRThread class keeps track of the thread on which it is safe
// to call the R API to facilitate its safe use (or erroring
// if it is not safe). The MainRThread singleton can be accessed from
// any thread using MainRThread::GetInstance(); the preferred way to call
// the R API where it may not be safe to do so is to use
// SafeCallIntoR<cpp_type>([&]() { ... }).
class MainRThread {
public:
// Return a reference to the MainRThread singleton
static MainRThread& GetInstance();
// Call this method from the R thread (e.g., on package load)
// to save an internal copy of the thread id.
void Initialize() {
thread_id_ = std::this_thread::get_id();
initialized_ = true;
ResetError();
}
bool IsInitialized() { return initialized_; }
void Deinitialize() {
initialized_ = false;
DisableSignalStopSource();
}
// Check if the current thread is the main R thread
bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
arrow::StopToken GetStopToken() {
if (SignalStopSourceEnabled()) {
return stop_source_->token();
} else {
return arrow::StopToken::Unstoppable();
}
}
bool SignalStopSourceEnabled() { return stop_source_ != nullptr; }
void EnableSignalStopSource() {
// Try to set up the stop source. If another library linking to
// the same libarrow shared object has already done this, this call
// will fail (which is OK, we just don't get the ability to cancel)
if (!SignalStopSourceEnabled()) {
auto maybe_stop_source = arrow::SetSignalStopSource();
if (maybe_stop_source.ok()) {
stop_source_ = maybe_stop_source.ValueUnsafe();
} else {
cpp11::warning("Failed to enable user cancellation: %s",
maybe_stop_source.status().message().c_str());
}
}
}
void DisableSignalStopSource() {
if (SignalStopSourceEnabled()) {
arrow::ResetSignalStopSource();
stop_source_ = nullptr;
}
}
arrow::io::IOContext CancellableIOContext() {
return arrow::io::IOContext(gc_memory_pool(),
MainRThread::GetInstance().GetStopToken());
}
// Check if a SafeCallIntoR call is able to execute
bool CanExecuteSafeCallIntoR() { return IsMainThread() || executor_ != nullptr; }
// The Executor that is running on the main R thread, if it exists
arrow::internal::Executor*& Executor() { return executor_; }
// Save an error (possibly with an error token generated from
// a cpp11::unwind_exception) so that it can be properly handled
// after some cleanup code has run (e.g., cancelling some futures
// or waiting for them to finish).
void SetError(arrow::Status status) { status_ = status; }
void ResetError() { status_ = arrow::Status::OK(); }
// Check if there is a saved error
bool HasError() { return !status_.ok(); }
// Resets this object after a RunWithCapturedR is about to return
// to the R interpreter.
arrow::Status ReraiseErrorIfExists() {
if (SignalStopSourceEnabled()) {
stop_source_->Reset();
}
arrow::Status maybe_error_status = status_;
ResetError();
return maybe_error_status;
}
private:
bool initialized_;
std::thread::id thread_id_;
arrow::Status status_;
arrow::internal::Executor* executor_;
arrow::StopSource* stop_source_;
MainRThread() : initialized_(false), executor_(nullptr), stop_source_(nullptr) {}
};
// This object is used to ensure that signal hanlders are registered when
// RunWithCapturedR launches its background thread to call Arrow and is
// cleaned up however this exits. Note that the lifecycle of the StopSource,
// which is registered at package load, is not necessarily tied to the
// lifecycle of the signal handlers. The general approach is to register
// the signal handlers only when we are evaluating code outside the R thread
// (when we are evaluating code *on* the R thread, R's signal handlers are
// sufficient and will signal an interupt condition that will propagate
// via a cpp11::unwind_excpetion).
class WithSignalHandlerContext {
public:
WithSignalHandlerContext() : signal_handler_registered_(false) {
if (MainRThread::GetInstance().SignalStopSourceEnabled()) {
arrow::Status result = arrow::RegisterCancellingSignalHandler({SIGINT});
// If this result was not OK we don't get cancellation for the
// lifecycle of this object; however, we can still carry on. This
// can occur when forking the R process (e.g., using parallel::mclapply()).
if (result.ok()) {
signal_handler_registered_ = true;
} else {
result.Warn();
}
}
}
~WithSignalHandlerContext() {
if (signal_handler_registered_) {
arrow::UnregisterCancellingSignalHandler();
}
}
private:
bool signal_handler_registered_;
};
// This is an object whose scope ensures we do not register signal handlers when
// evaluating R code when that evaluation happens via SafeCallIntoR.
class WithoutSignalHandlerContext {
public:
WithoutSignalHandlerContext() : signal_handler_unregistered_(false) {
if (MainRThread::GetInstance().SignalStopSourceEnabled()) {
arrow::UnregisterCancellingSignalHandler();
signal_handler_unregistered_ = true;
}
}
~WithoutSignalHandlerContext() {
if (signal_handler_unregistered_) {
arrow::Status result = arrow::RegisterCancellingSignalHandler({SIGINT});
// This is unlikely because the signal handlers were previously registered;
// however, it's better to warn here instead of error because it doesn't
// affect what the user tried to do (it probably just means we didn't
// anticipate a use case).
if (!result.ok()) {
result.Warn();
}
}
}
private:
bool signal_handler_unregistered_;
};
// Call into R and return a C++ object. Note that you can't return
// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
// `fun`).
template <typename T>
arrow::Future<T> SafeCallIntoRAsync(std::function<arrow::Result<T>(void)> fun,
std::string reason = "unspecified") {
MainRThread& main_r_thread = MainRThread::GetInstance();
if (main_r_thread.IsMainThread()) {
// If we're on the main thread, run the task immediately and let
// the cpp11::unwind_exception be thrown since it will be caught
// at the top level.
return fun();
} else if (main_r_thread.CanExecuteSafeCallIntoR()) {
// If we are not on the main thread and have an Executor,
// use it to run the task on the main R thread. We can't throw
// a cpp11::unwind_exception here, so we need to propagate it back
// to RunWithCapturedR through the MainRThread instance.
return DeferNotOk(main_r_thread.Executor()->Submit([fun,
reason]() -> arrow::Result<T> {
// This occurs when some other R code that was previously scheduled to run
// has errored, in which case we skip execution and let the original
// error surface.
if (MainRThread::GetInstance().HasError()) {
return arrow::Status::Cancelled("Previous R code execution error (", reason, ")");
}
try {
WithoutSignalHandlerContext context;
return fun();
} catch (cpp11::unwind_exception& e) {
// Set the MainRThread error so that subsequent calls to SafeCallIntoR
// know not to execute R code.
MainRThread::GetInstance().SetError(arrow::StatusUnwindProtect(e.token, reason));
// Return an error Status (which is unlikely to surface since RunWithCapturedR
// will preferentially return the MainRThread error).
return arrow::Status::Invalid("R code execution error (", reason, ")");
}
}));
} else {
return arrow::Status::NotImplemented(
"Call to R (", reason, ") from a non-R thread from an unsupported context");
}
}
template <typename T>
arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun,
std::string reason = "unspecified") {
arrow::Future<T> future = SafeCallIntoRAsync<T>(std::move(fun), reason);
return future.result();
}
static inline arrow::Status SafeCallIntoRVoid(std::function<void(void)> fun,
std::string reason = "unspecified") {
arrow::Future<bool> future = SafeCallIntoRAsync<bool>(
[&fun]() {
fun();
return true;
},
reason);
return future.status();
}
// Performs an Arrow call (e.g., run an exec plan) in such a way that background threads
// can use SafeCallIntoR(). This version is useful for Arrow calls that already
// return a Future<>.
template <typename T>
arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_call) {
if (!CanRunWithCapturedR()) {
return arrow::Status::NotImplemented("RunWithCapturedR() without UnwindProtect");
}
if (MainRThread::GetInstance().Executor() != nullptr) {
return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()");
}
MainRThread::GetInstance().ResetError();
WithSignalHandlerContext context;
arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
[make_arrow_call](arrow::internal::Executor* executor) {
MainRThread::GetInstance().Executor() = executor;
return make_arrow_call();
});
MainRThread::GetInstance().Executor() = nullptr;
// A StatusUnwindProtect error, if it was thrown, lives in the MainRThread and
// should be returned if possible.
ARROW_RETURN_NOT_OK(MainRThread::GetInstance().ReraiseErrorIfExists());
return result;
}
// Performs an Arrow call (e.g., run an exec plan) in such a way that background threads
// can use SafeCallIntoR(). This version is useful for Arrow calls that do not already
// return a Future<>(). If it is not possible to use RunWithCapturedR() (i.e.,
// CanRunWithCapturedR() returns false), this will run make_arrow_call on the main
// R thread (which will cause background threads that try to SafeCallIntoR() to
// error).
template <typename T>
arrow::Result<T> RunWithCapturedRIfPossible(
std::function<arrow::Result<T>()> make_arrow_call) {
if (CanRunWithCapturedR()) {
// Note that the use of the io_context here is arbitrary (i.e. we could use
// any construct that launches a background thread).
const auto& io_context = arrow::io::default_io_context();
return RunWithCapturedR<T>([&]() {
return DeferNotOk(io_context.executor()->Submit(std::move(make_arrow_call)));
});
} else {
return make_arrow_call();
}
}
// Like RunWithCapturedRIfPossible<>() but for arrow calls that don't return
// a Result.
static inline arrow::Status RunWithCapturedRIfPossibleVoid(
std::function<arrow::Status()> make_arrow_call) {
auto result = RunWithCapturedRIfPossible<bool>([&]() -> arrow::Result<bool> {
ARROW_RETURN_NOT_OK(make_arrow_call());
return true;
});
return result.status();
}
#endif