| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "./arrow_types.h" |
| |
| #include <arrow/array/builder_base.h> |
| #include <arrow/array/builder_binary.h> |
| #include <arrow/array/builder_decimal.h> |
| #include <arrow/array/builder_dict.h> |
| #include <arrow/array/builder_nested.h> |
| #include <arrow/array/builder_primitive.h> |
| #include <arrow/array/concatenate.h> |
| #include <arrow/table.h> |
| #include <arrow/type_traits.h> |
| #include <arrow/util/bitmap_writer.h> |
| #include <arrow/util/checked_cast.h> |
| #include <arrow/util/converter.h> |
| #include <arrow/util/logging.h> |
| |
| #include "./r_task_group.h" |
| |
| namespace arrow { |
| |
| using internal::checked_cast; |
| using internal::checked_pointer_cast; |
| |
| using internal::Converter; |
| using internal::DictionaryConverter; |
| using internal::ListConverter; |
| using internal::PrimitiveConverter; |
| using internal::StructConverter; |
| |
| using internal::MakeChunker; |
| using internal::MakeConverter; |
| |
| namespace r { |
| |
| struct RConversionOptions { |
| RConversionOptions() = default; |
| |
| std::shared_ptr<arrow::DataType> type; |
| bool strict; |
| int64_t size; |
| }; |
| |
| enum RVectorType { |
| BOOLEAN, |
| UINT8, |
| INT32, |
| FLOAT64, |
| INT64, |
| COMPLEX, |
| STRING, |
| DATAFRAME, |
| DATE_INT, |
| DATE_DBL, |
| TIME, |
| DURATION, |
| POSIXCT, |
| POSIXLT, |
| BINARY, |
| LIST, |
| FACTOR, |
| |
| OTHER |
| }; |
| |
| // this flattens out a logical type of what an R object is |
| // because TYPEOF() is not detailed enough |
| // we can't use arrow types though as there is no 1-1 mapping |
| RVectorType GetVectorType(SEXP x) { |
| switch (TYPEOF(x)) { |
| case LGLSXP: |
| return BOOLEAN; |
| case RAWSXP: |
| return UINT8; |
| case INTSXP: |
| if (Rf_inherits(x, "factor")) { |
| return FACTOR; |
| } else if (Rf_inherits(x, "Date")) { |
| return DATE_INT; |
| } |
| return INT32; |
| case STRSXP: |
| return STRING; |
| case CPLXSXP: |
| return COMPLEX; |
| case REALSXP: { |
| if (Rf_inherits(x, "Date")) { |
| return DATE_DBL; |
| } else if (Rf_inherits(x, "integer64")) { |
| return INT64; |
| } else if (Rf_inherits(x, "POSIXct")) { |
| return POSIXCT; |
| } else if (Rf_inherits(x, "hms")) { |
| return TIME; |
| } else if (Rf_inherits(x, "difftime")) { |
| return DURATION; |
| } else { |
| return FLOAT64; |
| } |
| } |
| case VECSXP: { |
| if (Rf_inherits(x, "data.frame")) { |
| return DATAFRAME; |
| } |
| |
| if (Rf_inherits(x, "POSIXlt")) { |
| return POSIXLT; |
| } |
| |
| if (Rf_inherits(x, "arrow_binary")) { |
| return BINARY; |
| } |
| |
| return LIST; |
| } |
| default: |
| break; |
| } |
| return OTHER; |
| } |
| |
| template <typename T> |
| bool is_NA(T value); |
| |
| template <> |
| bool is_NA<int>(int value) { |
| return value == NA_INTEGER; |
| } |
| |
| template <> |
| bool is_NA<double>(double value) { |
| return ISNA(value); |
| } |
| |
| template <> |
| bool is_NA<uint8_t>(uint8_t value) { |
| return false; |
| } |
| |
| template <> |
| bool is_NA<cpp11::r_bool>(cpp11::r_bool value) { |
| return value == NA_LOGICAL; |
| } |
| |
| template <> |
| bool is_NA<cpp11::r_string>(cpp11::r_string value) { |
| return value == NA_STRING; |
| } |
| |
| template <> |
| bool is_NA<SEXP>(SEXP value) { |
| return Rf_isNull(value); |
| } |
| |
| template <> |
| bool is_NA<int64_t>(int64_t value) { |
| return value == NA_INT64; |
| } |
| |
| template <typename T> |
| class RVectorIterator { |
| public: |
| using value_type = T; |
| RVectorIterator(SEXP x, int64_t start) |
| : ptr_x_(reinterpret_cast<const T*>(DATAPTR_RO(x)) + start) {} |
| |
| RVectorIterator& operator++() { |
| ++ptr_x_; |
| return *this; |
| } |
| |
| const T operator*() const { return *ptr_x_; } |
| |
| private: |
| const T* ptr_x_; |
| }; |
| |
| template <typename T> |
| class RVectorIterator_ALTREP { |
| public: |
| using value_type = T; |
| using data_type = |
| typename std::conditional<std::is_same<T, int64_t>::value, double, T>::type; |
| using r_vector_type = cpp11::r_vector<data_type>; |
| using r_vector_iterator = typename r_vector_type::const_iterator; |
| |
| RVectorIterator_ALTREP(SEXP x, int64_t start) |
| : vector_(x), it_(vector_.begin() + start) {} |
| |
| RVectorIterator_ALTREP& operator++() { |
| ++it_; |
| return *this; |
| } |
| |
| const T operator*() const { return GetValue(*it_); } |
| |
| static T GetValue(data_type x) { return x; } |
| |
| private: |
| r_vector_type vector_; |
| r_vector_iterator it_; |
| }; |
| |
| template <> |
| int64_t RVectorIterator_ALTREP<int64_t>::GetValue(double x) { |
| int64_t value; |
| memcpy(&value, &x, sizeof(int64_t)); |
| return value; |
| } |
| |
| template <typename Iterator, typename AppendNull, typename AppendValue> |
| Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null, |
| AppendValue&& append_value) { |
| for (R_xlen_t i = 0; i < n; i++, ++it) { |
| auto value = *it; |
| |
| if (is_NA<typename Iterator::value_type>(value)) { |
| RETURN_NOT_OK(append_null()); |
| } else { |
| RETURN_NOT_OK(append_value(value)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| class RConverter : public Converter<SEXP, RConversionOptions> { |
| public: |
| virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } |
| |
| virtual Status Extend(SEXP values, int64_t size, int64_t offset = 0) { |
| return Status::NotImplemented("Extend"); |
| } |
| |
| // by default, just delay the ->Extend(), i.e. not run in parallel |
| // implementations might redefine so that ->Extend() is run in parallel |
| virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(false, task); |
| } |
| |
| virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size, int64_t offset = 0) { |
| return Status::NotImplemented("ExtendMasked"); |
| } |
| |
| virtual Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() { |
| ARROW_ASSIGN_OR_RAISE(auto array, this->ToArray()) |
| return std::make_shared<ChunkedArray>(array); |
| } |
| }; |
| |
| // A Converter that calls as_arrow_array(x, type = options.type) |
| class AsArrowArrayConverter : public RConverter { |
| public: |
| // This is not run in parallel by default, so it's safe to call into R here; |
| // however, it is not safe to throw an exception here because the caller |
| // might be waiting for other conversions to finish in the background. To |
| // avoid this, use StatusUnwindProtect() to communicate the error back to |
| // ValueOrStop() (which reconstructs the exception and re-throws it). |
| Status Extend(SEXP values, int64_t size, int64_t offset = 0) { |
| try { |
| cpp11::sexp as_array_result = cpp11::package("arrow")["as_arrow_array"]( |
| values, cpp11::named_arg("type") = cpp11::as_sexp(options().type), |
| cpp11::named_arg("from_vec_to_array") = cpp11::as_sexp<bool>(true)); |
| |
| // Check that the R method returned an Array |
| if (!Rf_inherits(as_array_result, "Array")) { |
| return Status::Invalid("as_arrow_array() did not return object of type Array"); |
| } |
| |
| auto array = cpp11::as_cpp<std::shared_ptr<arrow::Array>>(as_array_result); |
| |
| // We need the type to be equal because the schema has already been finalized |
| if (!array->type()->Equals(options().type)) { |
| return Status::Invalid( |
| "as_arrow_array() returned an Array with an incorrect type"); |
| } |
| |
| arrays_.push_back(std::move(array)); |
| return Status::OK(); |
| } catch (cpp11::unwind_exception& e) { |
| return StatusUnwindProtect(e.token, "calling as_arrow_array()"); |
| } |
| } |
| |
| // This is sometimes run in parallel so we can't call into R |
| Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() { |
| return std::make_shared<ChunkedArray>(std::move(arrays_)); |
| } |
| |
| private: |
| cpp11::writable::list objects_; |
| std::vector<std::shared_ptr<Array>> arrays_; |
| }; |
| |
| template <typename T, typename Enable = void> |
| class RPrimitiveConverter; |
| |
| template <typename T> |
| Result<T> CIntFromRScalarImpl(int64_t value) { |
| if (value < std::numeric_limits<T>::min() || value > std::numeric_limits<T>::max()) { |
| return Status::Invalid("value outside of range"); |
| } |
| return static_cast<T>(value); |
| } |
| |
| template <> |
| Result<uint64_t> CIntFromRScalarImpl<uint64_t>(int64_t value) { |
| if (value < 0) { |
| return Status::Invalid("value outside of range"); |
| } |
| return static_cast<uint64_t>(value); |
| } |
| |
| // utility to convert R single values from (int, raw, double and int64) vectors |
| // to arrow integers and floating point |
| struct RConvert { |
| // ---- convert to an arrow integer |
| template <typename Type, typename From> |
| static enable_if_integer<Type, Result<typename Type::c_type>> Convert(Type*, |
| From from) { |
| return CIntFromRScalarImpl<typename Type::c_type>(static_cast<int64_t>(from)); |
| } |
| |
| // ---- convert R integer types to double |
| template <typename Type, typename From> |
| static enable_if_t<std::is_same<Type, const DoubleType>::value && |
| !std::is_same<From, double>::value, |
| Result<typename Type::c_type>> |
| Convert(Type*, From from) { |
| constexpr int64_t kDoubleMax = 1LL << 53; |
| constexpr int64_t kDoubleMin = -(1LL << 53); |
| |
| if (from < kDoubleMin || from > kDoubleMax) { |
| return Status::Invalid("Integer value ", from, " is outside of the range exactly", |
| " representable by a IEEE 754 double precision value"); |
| } |
| return static_cast<double>(from); |
| } |
| |
| // ---- convert double to double |
| template <typename Type, typename From> |
| static enable_if_t<std::is_same<Type, const DoubleType>::value && |
| std::is_same<From, double>::value, |
| Result<typename Type::c_type>> |
| Convert(Type*, From from) { |
| return from; |
| } |
| |
| // ---- convert R integer types to float |
| template <typename Type, typename From> |
| static enable_if_t<std::is_same<Type, const FloatType>::value && |
| !std::is_same<From, double>::value, |
| Result<typename Type::c_type>> |
| Convert(Type*, From from) { |
| constexpr int64_t kFloatMax = 1LL << 24; |
| constexpr int64_t kFloatMin = -(1LL << 24); |
| |
| if (from < kFloatMin || from > kFloatMax) { |
| return Status::Invalid("Integer value ", from, " is outside of the range exactly", |
| " representable by a IEEE 754 single precision value"); |
| } |
| return static_cast<float>(from); |
| } |
| |
| // ---- convert double to float |
| template <typename Type, typename From> |
| static enable_if_t<std::is_same<Type, const FloatType>::value && |
| std::is_same<From, double>::value, |
| Result<typename Type::c_type>> |
| Convert(Type*, From from) { |
| return static_cast<float>(from); |
| } |
| |
| // ---- convert to half float: not implemented |
| template <typename Type, typename From> |
| static enable_if_t<std::is_same<Type, const HalfFloatType>::value, |
| Result<typename Type::c_type>> |
| Convert(Type*, From from) { |
| return Status::Invalid("Cannot convert to Half Float"); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_null<T>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP, int64_t size, int64_t offset = 0) override { |
| return this->primitive_builder_->AppendNulls(size - offset); |
| } |
| }; |
| |
| // TODO: extend this to BooleanType, but this needs some work in RConvert |
| template <typename T> |
| class RPrimitiveConverter< |
| T, enable_if_t<is_integer_type<T>::value || is_floating_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| auto rtype = GetVectorType(x); |
| switch (rtype) { |
| case UINT8: |
| return ExtendDispatch<unsigned char>(x, size, offset); |
| case INT32: |
| return ExtendDispatch<int>(x, size, offset); |
| case FLOAT64: |
| return ExtendDispatch<double>(x, size, offset); |
| case INT64: |
| return ExtendDispatch<int64_t>(x, size, offset); |
| |
| default: |
| break; |
| } |
| // TODO: mention T in the error |
| return Status::Invalid("cannot convert"); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| |
| private: |
| template <typename r_value_type> |
| Status ExtendDispatch(SEXP x, int64_t size, int64_t offset) { |
| if (ALTREP(x)) { |
| // `x` is an ALTREP R vector storing `r_value_type` |
| // and that type matches exactly the type of the array this is building |
| return Extend_impl(RVectorIterator_ALTREP<r_value_type>(x, offset), size); |
| } else { |
| // `x` is not an ALTREP vector so we have direct access to a range of values |
| return Extend_impl(RVectorIterator<r_value_type>(x, offset), size); |
| } |
| } |
| |
| template <typename Iterator> |
| Status Extend_impl(Iterator it, int64_t size) { |
| using r_value_type = typename Iterator::value_type; |
| RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| |
| if (std::is_same<typename T::c_type, r_value_type>::value) { |
| auto append_value = [this](r_value_type value) { |
| this->primitive_builder_->UnsafeAppend(static_cast<typename T::c_type>(value)); |
| return Status::OK(); |
| }; |
| return VisitVector(it, size, append_null, append_value); |
| } else { |
| auto append_value = [this](r_value_type value) { |
| ARROW_ASSIGN_OR_RAISE(auto converted, |
| RConvert::Convert(this->primitive_type_, value)); |
| this->primitive_builder_->UnsafeAppend(converted); |
| return Status::OK(); |
| }; |
| return VisitVector(it, size, append_null, append_value); |
| } |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<is_boolean_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| auto rtype = GetVectorType(x); |
| if (rtype != BOOLEAN) { |
| return Status::Invalid("Expecting a logical vector"); |
| } |
| |
| if (ALTREP(x)) { |
| return Extend_impl(RVectorIterator_ALTREP<cpp11::r_bool>(x, offset), size); |
| } else { |
| return Extend_impl(RVectorIterator<cpp11::r_bool>(x, offset), size); |
| } |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| |
| private: |
| template <typename Iterator> |
| Status Extend_impl(Iterator it, int64_t size) { |
| RETURN_NOT_OK(this->Reserve(size)); |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| auto append_value = [this](cpp11::r_bool value) { |
| this->primitive_builder_->UnsafeAppend(value == 1); |
| return Status::OK(); |
| }; |
| return VisitVector(it, size, append_null, append_value); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<is_date_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| switch (GetVectorType(x)) { |
| case DATE_INT: |
| return AppendRange_Date_dispatch<int>(x, size, offset); |
| |
| case DATE_DBL: |
| return AppendRange_Date_dispatch<double>(x, size, offset); |
| |
| case POSIXCT: |
| return AppendRange_Posixct_dispatch(x, size, offset); |
| |
| default: |
| break; |
| } |
| |
| return Status::Invalid("cannot convert to date type "); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| |
| private: |
| template <typename r_value_type> |
| Status AppendRange_Date_dispatch(SEXP x, int64_t size, int64_t offset) { |
| if (ALTREP(x)) { |
| return AppendRange_Date(RVectorIterator_ALTREP<r_value_type>(x, offset), |
| size - offset); |
| } else { |
| return AppendRange_Date(RVectorIterator<r_value_type>(x, offset), size - offset); |
| } |
| } |
| |
| template <typename Iterator> |
| Status AppendRange_Date(Iterator it, int64_t size) { |
| using r_value_type = typename Iterator::value_type; |
| RETURN_NOT_OK(this->Reserve(size)); |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| auto append_value = [this](r_value_type value) { |
| this->primitive_builder_->UnsafeAppend(FromRDate(this->primitive_type_, value)); |
| return Status::OK(); |
| }; |
| return VisitVector(it, size, append_null, append_value); |
| } |
| |
| Status AppendRange_Posixct_dispatch(SEXP x, int64_t size, int64_t offset) { |
| if (ALTREP(x)) { |
| return AppendRange_Posixct(RVectorIterator_ALTREP<double>(x, offset), |
| size - offset); |
| } else { |
| return AppendRange_Posixct(RVectorIterator<double>(x, offset), size - offset); |
| } |
| } |
| |
| template <typename Iterator> |
| Status AppendRange_Posixct(Iterator it, int64_t size) { |
| using r_value_type = typename Iterator::value_type; |
| RETURN_NOT_OK(this->Reserve(size)); |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| auto append_value = [this](r_value_type value) { |
| this->primitive_builder_->UnsafeAppend(FromPosixct(this->primitive_type_, value)); |
| return Status::OK(); |
| }; |
| return VisitVector(it, size, append_null, append_value); |
| } |
| |
| static int FromRDate(const Date32Type*, double from) { return static_cast<int>(from); } |
| |
| static int64_t FromRDate(const Date64Type*, double from) { |
| constexpr int64_t kMilliSecondsPerDay = 86400000; |
| return static_cast<int64_t>(from * kMilliSecondsPerDay); |
| } |
| |
| static int FromPosixct(const Date32Type*, double from) { |
| constexpr int64_t kSecondsPerDay = 86400; |
| return static_cast<int>(from / kSecondsPerDay); |
| } |
| |
| static int64_t FromPosixct(const Date64Type*, double from) { |
| return static_cast<int64_t>(from * 1000); |
| } |
| }; |
| |
| int64_t get_TimeUnit_multiplier(TimeUnit::type unit) { |
| switch (unit) { |
| case TimeUnit::SECOND: |
| return 1; |
| case TimeUnit::MILLI: |
| return 1000; |
| case TimeUnit::MICRO: |
| return 1000000; |
| case TimeUnit::NANO: |
| return 1000000000; |
| default: |
| return 0; |
| } |
| } |
| |
| Result<int> get_difftime_unit_multiplier(SEXP x) { |
| std::string unit(CHAR(STRING_ELT(Rf_getAttrib(x, symbols::units), 0))); |
| if (unit == "secs") { |
| return 1; |
| } else if (unit == "mins") { |
| return 60; |
| } else if (unit == "hours") { |
| return 3600; |
| } else if (unit == "days") { |
| return 86400; |
| } else if (unit == "weeks") { |
| return 604800; |
| } else { |
| return Status::Invalid("unknown difftime unit"); |
| } |
| } |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<is_time_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| auto rtype = GetVectorType(x); |
| if (rtype != TIME) { |
| return Status::Invalid("Invalid conversion to time"); |
| } |
| |
| // multiplier to get the number of seconds from the value stored in the R vector |
| ARROW_ASSIGN_OR_RAISE(int difftime_multiplier, get_difftime_unit_multiplier(x)); |
| |
| // then multiply the seconds by this to match the time unit |
| auto multiplier = |
| get_TimeUnit_multiplier(this->primitive_type_->unit()) * difftime_multiplier; |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| auto append_value = [this, multiplier](double value) { |
| auto converted = static_cast<typename T::c_type>(value * multiplier); |
| this->primitive_builder_->UnsafeAppend(converted); |
| return Status::OK(); |
| }; |
| |
| if (ALTREP(x)) { |
| return VisitVector(RVectorIterator_ALTREP<double>(x, offset), size, append_null, |
| append_value); |
| } else { |
| return VisitVector(RVectorIterator<double>(x, offset), size, append_null, |
| append_value); |
| } |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<is_timestamp_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| |
| RVectorType rtype = GetVectorType(x); |
| if (rtype != POSIXCT) { |
| return Status::Invalid("Invalid conversion to timestamp"); |
| } |
| |
| int64_t multiplier = get_TimeUnit_multiplier(this->primitive_type_->unit()); |
| |
| auto append_value = [this, multiplier](double value) { |
| auto converted = static_cast<typename T::c_type>(value * multiplier); |
| this->primitive_builder_->UnsafeAppend(converted); |
| return Status::OK(); |
| }; |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| |
| if (ALTREP(x)) { |
| return VisitVector(RVectorIterator_ALTREP<double>(x, offset), size, append_null, |
| append_value); |
| } else { |
| return VisitVector(RVectorIterator<double>(x, offset), size, append_null, |
| append_value); |
| } |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<is_decimal_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| using ValueType = typename arrow::TypeTraits<T>::CType; |
| |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| int32_t precision = this->primitive_type_->precision(); |
| int32_t scale = this->primitive_type_->scale(); |
| |
| auto append_value = [this, precision, scale](double value) { |
| ARROW_ASSIGN_OR_RAISE(ValueType converted, |
| ValueType::FromReal(value, precision, scale)); |
| this->primitive_builder_->UnsafeAppend(converted); |
| return Status::OK(); |
| }; |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| |
| switch (TYPEOF(x)) { |
| case REALSXP: |
| if (ALTREP(x)) { |
| return VisitVector(RVectorIterator_ALTREP<double>(x, offset), size, append_null, |
| append_value); |
| } else { |
| return VisitVector(RVectorIterator<double>(x, offset), size, append_null, |
| append_value); |
| } |
| break; |
| case INTSXP: |
| if (ALTREP(x)) { |
| return VisitVector(RVectorIterator_ALTREP<int>(x, offset), size, append_null, |
| append_value); |
| } else { |
| return VisitVector(RVectorIterator<int>(x, offset), size, append_null, |
| append_value); |
| } |
| break; |
| default: |
| return Status::NotImplemented("Conversion to decimal from non-integer/double"); |
| } |
| } |
| }; |
| |
| Status check_binary(SEXP x, int64_t size) { |
| RVectorType rtype = GetVectorType(x); |
| switch (rtype) { |
| case BINARY: |
| break; |
| case LIST: { |
| // check this is a list of raw vectors |
| const SEXP* p_x = VECTOR_PTR_RO(x); |
| for (R_xlen_t i = 0; i < size; i++, ++p_x) { |
| if (TYPEOF(*p_x) != RAWSXP && (*p_x != R_NilValue)) { |
| return Status::Invalid("invalid R type to convert to binary"); |
| } |
| } |
| break; |
| } |
| default: |
| return Status::Invalid("invalid R type to convert to binary"); |
| } |
| return Status::OK(); |
| } |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_binary<T>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| using OffsetType = typename T::offset_type; |
| |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| RETURN_NOT_OK(check_binary(x, size)); |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| |
| auto append_value = [this](SEXP raw) { |
| R_xlen_t n = XLENGTH(raw); |
| ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n)); |
| this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast<OffsetType>(n)); |
| return Status::OK(); |
| }; |
| return VisitVector(RVectorIterator<SEXP>(x, offset), size, append_null, append_value); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<std::is_same<T, FixedSizeBinaryType>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| RETURN_NOT_OK(check_binary(x, size)); |
| |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| |
| auto append_value = [this](SEXP raw) { |
| R_xlen_t n = XLENGTH(raw); |
| |
| if (n != this->primitive_builder_->byte_width()) { |
| return Status::Invalid("invalid size"); |
| } |
| ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n)); |
| this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); |
| return Status::OK(); |
| }; |
| return VisitVector(RVectorIterator<SEXP>(x, offset), size, append_null, append_value); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| tasks.Append(!ALTREP(values), std::move(task)); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_string_like<T>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| using OffsetType = typename T::offset_type; |
| |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RVectorType rtype = GetVectorType(x); |
| if (rtype != STRING) { |
| return Status::Invalid("Expecting a character vector"); |
| } |
| return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size, offset); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| auto task = [this, values, size]() { return this->Extend(values, size); }; |
| // TODO: refine this., e.g. extract setup from Extend() |
| tasks.Append(false, std::move(task)); |
| } |
| |
| private: |
| Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size, int64_t offset) { |
| RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); |
| const SEXP* p_strings = reinterpret_cast<const SEXP*>(DATAPTR_RO(s)); |
| |
| // we know all the R strings are utf8 already, so we can get |
| // a definite size and then use UnsafeAppend*() |
| int64_t total_length = 0; |
| for (R_xlen_t i = offset; i < size; i++, ++p_strings) { |
| SEXP si = *p_strings; |
| total_length += si == NA_STRING ? 0 : LENGTH(si); |
| } |
| RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length)); |
| |
| // append |
| p_strings = reinterpret_cast<const SEXP*>(DATAPTR_RO(s)); |
| for (R_xlen_t i = offset; i < size; i++, ++p_strings) { |
| SEXP si = *p_strings; |
| if (si == NA_STRING) { |
| this->primitive_builder_->UnsafeAppendNull(); |
| } else { |
| this->primitive_builder_->UnsafeAppend(CHAR(si), LENGTH(si)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| template <typename T> |
| class RPrimitiveConverter<T, enable_if_t<is_duration_type<T>::value>> |
| : public PrimitiveConverter<T, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| auto rtype = GetVectorType(x); |
| |
| // only handle <difftime> R objects |
| if (rtype == DURATION) { |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| |
| ARROW_ASSIGN_OR_RAISE(int difftime_multiplier, get_difftime_unit_multiplier(x)); |
| |
| int64_t multiplier = |
| get_TimeUnit_multiplier(this->primitive_type_->unit()) * difftime_multiplier; |
| |
| auto append_value = [this, multiplier](double value) { |
| auto converted = static_cast<typename T::c_type>(value * multiplier); |
| this->primitive_builder_->UnsafeAppend(converted); |
| return Status::OK(); |
| }; |
| auto append_null = [this]() { |
| this->primitive_builder_->UnsafeAppendNull(); |
| return Status::OK(); |
| }; |
| |
| if (ALTREP(x)) { |
| return VisitVector(RVectorIterator_ALTREP<double>(x, offset), size, append_null, |
| append_value); |
| } else { |
| return VisitVector(RVectorIterator<double>(x, offset), size, append_null, |
| append_value); |
| } |
| |
| return Status::OK(); |
| } |
| |
| return Status::NotImplemented("Extend"); |
| } |
| }; |
| |
| template <typename T> |
| class RListConverter; |
| |
| template <typename U, typename Enable = void> |
| class RDictionaryConverter; |
| |
| template <typename U> |
| class RDictionaryConverter<U, enable_if_has_c_type<U>> |
| : public DictionaryConverter<U, RConverter> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| return Status::NotImplemented("Extend"); |
| } |
| }; |
| |
| template <typename ValueType> |
| class RDictionaryConverter<ValueType, enable_if_has_string_view<ValueType>> |
| : public DictionaryConverter<ValueType, RConverter> { |
| public: |
| using BuilderType = DictionaryBuilder<ValueType>; |
| |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(ExtendSetup(x, size, offset)); |
| return ExtendImpl(x, size, offset, GetCharLevels(x)); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| // the setup runs synchronously first |
| Status setup = ExtendSetup(values, size, /*offset=*/0); |
| |
| if (!setup.ok()) { |
| // if that fails, propagate the error |
| tasks.Append(false, [setup]() { return setup; }); |
| } else { |
| auto char_levels = GetCharLevels(values); |
| |
| tasks.Append(true, [this, values, size, char_levels]() { |
| return this->ExtendImpl(values, size, /*offset=*/0, char_levels); |
| }); |
| } |
| } |
| |
| Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() override { |
| ARROW_ASSIGN_OR_RAISE(auto result, this->builder_->Finish()); |
| |
| auto result_type = checked_cast<DictionaryType*>(result->type().get()); |
| if (this->dict_type_->ordered() && !result_type->ordered()) { |
| // TODO: we should not have to do that, there is probably something wrong |
| // in the DictionaryBuilder code |
| result->data()->type = |
| arrow::dictionary(result_type->index_type(), result_type->value_type(), true); |
| } |
| |
| return std::make_shared<ChunkedArray>( |
| std::make_shared<DictionaryArray>(result->data())); |
| } |
| |
| private: |
| std::vector<const char*> GetCharLevels(SEXP x) { |
| SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); |
| R_xlen_t n_levels = XLENGTH(levels); |
| std::vector<const char*> char_levels(XLENGTH(levels)); |
| const SEXP* p_levels = reinterpret_cast<const SEXP*>(DATAPTR_RO(levels)); |
| for (R_xlen_t i = 0; i < n_levels; i++, ++p_levels) { |
| char_levels[i] = CHAR(*p_levels); |
| } |
| |
| return char_levels; |
| } |
| |
| Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { |
| RVectorType rtype = GetVectorType(x); |
| if (rtype != FACTOR) { |
| return Status::Invalid("invalid R type to convert to dictionary"); |
| } |
| |
| // first we need to handle the levels |
| SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); |
| auto memo_chunked_chunked_array = |
| arrow::r::vec_to_arrow_ChunkedArray(levels, utf8(), false); |
| for (const auto& chunk : memo_chunked_chunked_array->chunks()) { |
| RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*chunk)); |
| } |
| |
| // then we can proceed |
| return this->Reserve(size - offset); |
| } |
| |
| Status ExtendImpl(SEXP values, int64_t size, int64_t offset, |
| const std::vector<const char*>& char_levels) { |
| auto append_null = [this]() { return this->value_builder_->AppendNull(); }; |
| auto append_value = [this, &char_levels](int value) { |
| return this->value_builder_->Append(char_levels[value - 1]); |
| }; |
| |
| return VisitVector(RVectorIterator<int>(values, offset), size, append_null, |
| append_value); |
| } |
| }; |
| |
| template <typename T, typename Enable = void> |
| struct RConverterTrait; |
| |
| template <typename T> |
| struct RConverterTrait< |
| T, enable_if_t<!is_nested_type<T>::value && !is_interval_type<T>::value && |
| !is_extension_type<T>::value && !is_binary_view_like_type<T>::value>> { |
| using type = RPrimitiveConverter<T>; |
| }; |
| |
| template <typename T> |
| struct RConverterTrait<T, enable_if_binary_view_like<T>> { |
| // not implemented |
| }; |
| |
| template <typename T> |
| struct RConverterTrait<T, enable_if_list_like<T>> { |
| using type = RListConverter<T>; |
| }; |
| |
| template <typename T> |
| class RListConverter : public ListConverter<T, RConverter, RConverterTrait> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(this->Reserve(size)); |
| |
| RVectorType rtype = GetVectorType(x); |
| if (rtype != LIST) { |
| return Status::Invalid("Cannot convert to list type"); |
| } |
| |
| auto append_null = [this]() { return this->list_builder_->AppendNull(); }; |
| |
| auto append_value = [this](SEXP value) { |
| // TODO: if we decide that this can be run concurrently |
| // we'll have to do vec_size() upfront |
| R_xlen_t n = arrow::r::vec_size(value); |
| |
| RETURN_NOT_OK(this->list_builder_->ValidateOverflow(n)); |
| RETURN_NOT_OK(this->list_builder_->Append()); |
| return this->value_converter_.get()->Extend(value, n); |
| }; |
| |
| return VisitVector(RVectorIterator<SEXP>(x, offset), size, append_null, append_value); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| // NOTE: because Extend::[]append_value() calls Extend() on the |
| // value converter, which might require a setup step, it feels |
| // complicated to run this task concurrently. |
| // |
| // TODO: perhaps allow running concurrently in some cases, e.g. list(int32(!altrep)) |
| tasks.Append(false, [this, values, size]() { return this->Extend(values, size); }); |
| } |
| }; |
| |
| class RStructConverter; |
| |
| template <> |
| struct RConverterTrait<StructType> { |
| using type = RStructConverter; |
| }; |
| |
| class RStructConverter : public StructConverter<RConverter, RConverterTrait> { |
| public: |
| Status Extend(SEXP x, int64_t size, int64_t offset = 0) override { |
| RETURN_NOT_OK(ExtendSetup(x, size, offset)); |
| |
| auto fields = this->struct_type_->fields(); |
| R_xlen_t n_columns = XLENGTH(x); |
| for (R_xlen_t i = offset; i < n_columns; i++) { |
| auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); |
| if (!status.ok()) { |
| return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), |
| "): ", status.ToString()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { |
| // the setup runs synchronously first |
| Status setup = ExtendSetup(values, size, /*offset=*/0); |
| |
| if (!setup.ok()) { |
| // if that fails, propagate the error |
| tasks.Append(false, [setup]() { return setup; }); |
| } else { |
| // otherwise deal with each column, maybe concurrently |
| auto fields = this->struct_type_->fields(); |
| R_xlen_t n_columns = XLENGTH(values); |
| |
| for (R_xlen_t i = 0; i < n_columns; i++) { |
| children_[i]->DelayedExtend(VECTOR_ELT(values, i), size, tasks); |
| } |
| } |
| } |
| |
| protected: |
| Status Init(MemoryPool* pool) override { |
| return StructConverter<RConverter, RConverterTrait>::Init(pool); |
| } |
| |
| Status ExtendSetup(SEXP x, int64_t size, int64_t offset) { |
| // check that x is compatible |
| R_xlen_t n_columns = XLENGTH(x); |
| |
| if (!Rf_inherits(x, "data.frame") && !Rf_inherits(x, "POSIXlt")) { |
| return Status::Invalid("Can only convert data frames to Struct type"); |
| } |
| |
| auto fields = this->struct_type_->fields(); |
| if (n_columns != static_cast<R_xlen_t>(fields.size())) { |
| return Status::RError("Number of fields in struct (", fields.size(), |
| ") incompatible with number of columns in the data frame (", |
| n_columns, ")"); |
| } |
| |
| cpp11::strings x_names = Rf_getAttrib(x, R_NamesSymbol); |
| |
| RETURN_NOT_OK(cpp11::unwind_protect([&] { |
| for (int i = 0; i < n_columns; i++) { |
| const char* name_i = arrow::r::unsafe::utf8_string(x_names[i]); |
| auto field_name = fields[i]->name(); |
| if (field_name != name_i) { |
| return Status::RError( |
| "Field name in position ", i, " (", field_name, |
| ") does not match the name of the column of the data frame (", name_i, ")"); |
| } |
| } |
| |
| return Status::OK(); |
| })); |
| |
| for (R_xlen_t i = 0; i < n_columns; i++) { |
| SEXP x_i = VECTOR_ELT(x, i); |
| if (arrow::r::vec_size(x_i) < size) { |
| return Status::RError("Degenerated data frame"); |
| } |
| } |
| |
| RETURN_NOT_OK(this->Reserve(size - offset)); |
| |
| for (R_xlen_t i = 0; i < size; i++) { |
| RETURN_NOT_OK(struct_builder_->Append()); |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| template <> |
| struct RConverterTrait<DictionaryType> { |
| template <typename T> |
| using dictionary_type = RDictionaryConverter<T>; |
| }; |
| |
| // ---- short circuit the Converter api entirely when we can do zero-copy |
| |
| // in some situations we can just use the memory of the R object in an RBuffer |
| // instead of going through ArrayBuilder, etc ... |
| bool can_reuse_memory(SEXP x, const std::shared_ptr<arrow::DataType>& type) { |
| // TODO: this probably should be disabled when x is an ALTREP object |
| // because MakeSimpleArray below will force materialization |
| switch (type->id()) { |
| case Type::INT32: |
| return TYPEOF(x) == INTSXP && !OBJECT(x); |
| case Type::DOUBLE: |
| return TYPEOF(x) == REALSXP && !OBJECT(x); |
| case Type::INT8: |
| return TYPEOF(x) == RAWSXP && !OBJECT(x); |
| case Type::INT64: |
| return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64"); |
| default: |
| break; |
| } |
| return false; |
| } |
| |
| // this is only used on some special cases when the arrow Array can just use the memory of |
| // the R object, via an RBuffer, hence be zero copy |
| template <int RTYPE, typename RVector, typename Type> |
| std::shared_ptr<Array> MakeSimpleArray(SEXP x) { |
| using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; |
| RVector vec(x); |
| auto n = vec.size(); |
| auto p_vec_start = reinterpret_cast<const value_type*>(DATAPTR_RO(vec)); |
| auto p_vec_end = p_vec_start + n; |
| std::vector<std::shared_ptr<Buffer>> buffers{nullptr, |
| std::make_shared<RBuffer<RVector>>(vec)}; |
| |
| int null_count = 0; |
| |
| auto first_na = std::find_if(p_vec_start, p_vec_end, is_NA<value_type>); |
| if (first_na < p_vec_end) { |
| auto null_bitmap = |
| ValueOrStop(AllocateBuffer(bit_util::BytesForBits(n), gc_memory_pool())); |
| internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); |
| |
| // first loop to clear all the bits before the first NA |
| auto j = std::distance(p_vec_start, first_na); |
| int i = 0; |
| for (; i < j; i++, bitmap_writer.Next()) { |
| bitmap_writer.Set(); |
| } |
| |
| auto p_vec = first_na; |
| // then finish |
| for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { |
| if (is_NA<value_type>(*p_vec)) { |
| bitmap_writer.Clear(); |
| null_count++; |
| } else { |
| bitmap_writer.Set(); |
| } |
| } |
| |
| bitmap_writer.Finish(); |
| buffers[0] = std::move(null_bitmap); |
| } |
| |
| auto data = ArrayData::Make(std::make_shared<Type>(), LENGTH(x), std::move(buffers), |
| null_count, 0 /*offset*/); |
| |
| // return the right Array class |
| return std::make_shared<typename TypeTraits<Type>::ArrayType>(data); |
| } |
| |
| std::shared_ptr<arrow::Array> vec_to_arrow__reuse_memory(SEXP x) { |
| auto type = TYPEOF(x); |
| |
| if (type == INTSXP) { |
| return MakeSimpleArray<INTSXP, cpp11::integers, Int32Type>(x); |
| } else if (type == REALSXP && Rf_inherits(x, "integer64")) { |
| return MakeSimpleArray<REALSXP, cpp11::doubles, Int64Type>(x); |
| } else if (type == REALSXP) { |
| return MakeSimpleArray<REALSXP, cpp11::doubles, DoubleType>(x); |
| } else if (type == RAWSXP) { |
| return MakeSimpleArray<RAWSXP, cpp11::raws, UInt8Type>(x); |
| } |
| |
| cpp11::stop("Unreachable: you might need to fix can_reuse_memory()"); |
| } |
| |
| std::shared_ptr<arrow::ChunkedArray> vec_to_arrow_ChunkedArray( |
| SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_inferred) { |
| // short circuit if `x` is already a chunked array |
| if (Rf_inherits(x, "ChunkedArray")) { |
| return cpp11::as_cpp<std::shared_ptr<arrow::ChunkedArray>>(x); |
| } |
| |
| // short circuit if `x` is an Array |
| if (Rf_inherits(x, "Array")) { |
| return std::make_shared<arrow::ChunkedArray>( |
| cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x)); |
| } |
| |
| RConversionOptions options; |
| options.strict = !type_inferred; |
| options.type = type; |
| options.size = arrow::r::vec_size(x); |
| |
| // If we can handle this in C++ we do so; otherwise we use the |
| // AsArrowArrayConverter, which calls as_arrow_array(). |
| std::unique_ptr<RConverter> converter; |
| if (can_convert_native(x) && type->id() != Type::EXTENSION) { |
| // short circuit if `x` is an altrep vector that shells a chunked Array |
| auto maybe = altrep::vec_to_arrow_altrep_bypass(x); |
| if (maybe.get() && maybe->type()->Equals(type)) { |
| return maybe; |
| } |
| |
| // maybe short circuit when zero-copy is possible |
| if (can_reuse_memory(x, type)) { |
| return std::make_shared<arrow::ChunkedArray>(vec_to_arrow__reuse_memory(x)); |
| } |
| |
| // Otherwise go through the converter API. |
| converter = ValueOrStop(MakeConverter<RConverter, RConverterTrait>( |
| options.type, options, gc_memory_pool())); |
| } else { |
| converter = std::unique_ptr<RConverter>(new AsArrowArrayConverter()); |
| StopIfNotOk(converter->Construct(type, options, gc_memory_pool())); |
| } |
| |
| StopIfNotOk(converter->Extend(x, options.size)); |
| return ValueOrStop(converter->ToChunkedArray()); |
| } |
| |
| std::shared_ptr<arrow::Array> vec_to_arrow_Array( |
| SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_inferred) { |
| auto chunked_array = vec_to_arrow_ChunkedArray(x, type, type_inferred); |
| if (chunked_array->num_chunks() == 1) { |
| return chunked_array->chunk(0); |
| } |
| |
| return ValueOrStop(arrow::Concatenate(chunked_array->chunks())); |
| } |
| |
| // TODO: most of this is very similar to MakeSimpleArray, just adapted to |
| // leverage concurrency. Maybe some refactoring needed. |
| template <typename RVector, typename Type> |
| bool vector_from_r_memory_impl(SEXP x, const std::shared_ptr<DataType>& type, |
| std::vector<std::shared_ptr<arrow::ChunkedArray>>& columns, |
| int j, RTasks& tasks) { |
| RVector vec(x); |
| using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; |
| auto buffer = std::make_shared<RBuffer<RVector>>(vec); |
| |
| tasks.Append(true, [buffer, x, &columns, j]() { |
| std::vector<std::shared_ptr<Buffer>> buffers{nullptr, buffer}; |
| |
| auto n = XLENGTH(x); |
| auto p_x_start = reinterpret_cast<const value_type*>(DATAPTR_RO(x)); |
| auto p_x_end = p_x_start + n; |
| |
| int null_count = 0; |
| auto first_na = std::find_if(p_x_start, p_x_end, is_NA<value_type>); |
| |
| if (first_na < p_x_end) { |
| auto null_bitmap = |
| ValueOrStop(AllocateBuffer(bit_util::BytesForBits(n), gc_memory_pool())); |
| internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); |
| |
| // first loop to clear all the bits before the first NA |
| auto k = std::distance(p_x_start, first_na); |
| int i = 0; |
| for (; i < k; i++, bitmap_writer.Next()) { |
| bitmap_writer.Set(); |
| } |
| |
| auto p_vec = first_na; |
| // then finish |
| for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { |
| if (is_NA<value_type>(*p_vec)) { |
| bitmap_writer.Clear(); |
| null_count++; |
| } else { |
| bitmap_writer.Set(); |
| } |
| } |
| |
| bitmap_writer.Finish(); |
| buffers[0] = std::move(null_bitmap); |
| } |
| |
| auto data = ArrayData::Make(std::make_shared<Type>(), n, std::move(buffers), |
| null_count, 0 /*offset*/); |
| auto array = std::make_shared<typename TypeTraits<Type>::ArrayType>(data); |
| columns[j] = std::make_shared<arrow::ChunkedArray>(array); |
| |
| return Status::OK(); |
| }); |
| |
| return true; |
| } |
| |
| bool vector_from_r_memory(SEXP x, const std::shared_ptr<DataType>& type, |
| std::vector<std::shared_ptr<arrow::ChunkedArray>>& columns, |
| int j, RTasks& tasks) { |
| if (ALTREP(x)) return false; |
| |
| switch (type->id()) { |
| case Type::INT32: |
| return TYPEOF(x) == INTSXP && !OBJECT(x) && |
| vector_from_r_memory_impl<cpp11::integers, Int32Type>(x, type, columns, j, |
| tasks); |
| |
| case Type::DOUBLE: |
| return TYPEOF(x) == REALSXP && !OBJECT(x) && |
| vector_from_r_memory_impl<cpp11::doubles, DoubleType>(x, type, columns, j, |
| tasks); |
| |
| case Type::UINT8: |
| return TYPEOF(x) == RAWSXP && !OBJECT(x) && |
| vector_from_r_memory_impl<cpp11::raws, UInt8Type>(x, type, columns, j, |
| tasks); |
| |
| case Type::INT64: |
| return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64") && |
| vector_from_r_memory_impl<cpp11::doubles, Int64Type>(x, type, columns, j, |
| tasks); |
| default: |
| break; |
| } |
| |
| return false; |
| } |
| |
| } // namespace r |
| } // namespace arrow |
| |
| arrow::Status check_consistent_column_length( |
| const std::vector<std::shared_ptr<arrow::ChunkedArray>>& columns) { |
| if (columns.size()) { |
| int64_t num_rows = columns[0]->length(); |
| |
| for (const auto& column : columns) { |
| if (column->length() != num_rows) { |
| return arrow::Status::Invalid("All columns must have the same length"); |
| } |
| } |
| } |
| |
| return arrow::Status::OK(); |
| } |
| |
| // [[arrow::export]] |
| std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp, |
| bool use_threads) { |
| bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); |
| |
| int num_fields; |
| StopIfNotOk(arrow::r::count_fields(lst, &num_fields)); |
| |
| // schema + metadata |
| std::shared_ptr<arrow::Schema> schema; |
| StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, schema)); |
| StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema)); |
| |
| if (!infer_schema && schema->num_fields() != num_fields) { |
| cpp11::stop("incompatible. schema has %d fields, and %d columns are supplied", |
| schema->num_fields(), num_fields); |
| } |
| |
| // table |
| std::vector<std::shared_ptr<arrow::ChunkedArray>> columns(num_fields); |
| |
| if (!infer_schema) { |
| auto check_name = [&](int j, SEXP, cpp11::r_string name) { |
| std::string cpp_name(name); |
| if (schema->field(j)->name() != cpp_name) { |
| cpp11::stop("field at index %d has name '%s' != '%s'", j + 1, |
| schema->field(j)->name().c_str(), cpp_name.c_str()); |
| } |
| }; |
| arrow::r::TraverseDots(lst, num_fields, check_name); |
| } |
| |
| // must be careful to avoid R stop() until the tasks |
| // are finished, i.e. after tasks.Finish() |
| arrow::r::RTasks tasks(use_threads); |
| |
| arrow::Status status = arrow::Status::OK(); |
| |
| auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); |
| std::vector<std::unique_ptr<arrow::r::RConverter>> converters(num_fields); |
| |
| // init converters |
| for (int j = 0; j < num_fields && status.ok(); j++) { |
| SEXP x = flatten_lst[j]; |
| |
| if (Rf_inherits(x, "ChunkedArray")) { |
| columns[j] = cpp11::as_cpp<std::shared_ptr<arrow::ChunkedArray>>(x); |
| } else if (Rf_inherits(x, "Array")) { |
| columns[j] = std::make_shared<arrow::ChunkedArray>( |
| cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x)); |
| } else if (arrow::r::altrep::is_unmaterialized_arrow_altrep(x)) { |
| columns[j] = arrow::r::altrep::vec_to_arrow_altrep_bypass(x); |
| } else { |
| arrow::r::RConversionOptions options; |
| options.strict = !infer_schema; |
| options.type = schema->field(j)->type(); |
| options.size = arrow::r::vec_size(x); |
| |
| // If we can handle this in C++ we do so; otherwise we use the |
| // AsArrowArrayConverter, which calls as_arrow_array(). |
| std::unique_ptr<arrow::r::RConverter> converter; |
| if (arrow::r::can_convert_native(x) && |
| options.type->id() != arrow::Type::EXTENSION) { |
| // first try to add a task to do a zero copy in parallel |
| if (arrow::r::vector_from_r_memory(x, options.type, columns, j, tasks)) { |
| continue; |
| } |
| |
| // otherwise go through the Converter API |
| auto converter_result = |
| arrow::MakeConverter<arrow::r::RConverter, arrow::r::RConverterTrait>( |
| options.type, options, gc_memory_pool()); |
| if (converter_result.ok()) { |
| converter = std::move(converter_result.ValueUnsafe()); |
| } else { |
| status = converter_result.status(); |
| break; |
| } |
| } else { |
| converter = |
| std::unique_ptr<arrow::r::RConverter>(new arrow::r::AsArrowArrayConverter()); |
| status = converter->Construct(options.type, options, gc_memory_pool()); |
| if (!status.ok()) { |
| break; |
| } |
| } |
| |
| converters[j] = std::move(converter); |
| } |
| } |
| |
| // if the previous loop didn't break early, spawn |
| // tasks to Extend, maybe in parallel |
| if (status.ok()) { |
| for (int j = 0; j < num_fields; j++) { |
| auto& converter = converters[j]; |
| if (converter != nullptr) { |
| converter->DelayedExtend(flatten_lst[j], converter->options().size, tasks); |
| } |
| } |
| } |
| |
| // in any case, this needs to wait until all tasks are finished |
| status &= tasks.Finish(); |
| |
| // nothing is running in parallel here, so we have an opportunity to stop |
| StopIfNotOk(status); |
| |
| // then finally convert to chunked arrays in parallel |
| tasks.Reset(); |
| |
| for (int j = 0; j < num_fields; j++) { |
| tasks.Append(true, [&columns, j, &converters]() { |
| auto& converter = converters[j]; |
| if (converter != nullptr) { |
| ARROW_ASSIGN_OR_RAISE(columns[j], converter->ToChunkedArray()); |
| } |
| return arrow::Status::OK(); |
| }); |
| } |
| status &= tasks.Finish(); |
| StopIfNotOk(status); |
| |
| status &= check_consistent_column_length(columns); |
| StopIfNotOk(status); |
| |
| return arrow::Table::Make(schema, columns); |
| } |
| |
| // [[arrow::export]] |
| SEXP vec_to_Array(SEXP x, SEXP s_type) { |
| if (Rf_inherits(x, "Array")) return x; |
| |
| bool type_inferred = Rf_isNull(s_type); |
| std::shared_ptr<arrow::DataType> type; |
| |
| if (type_inferred) { |
| type = arrow::r::InferArrowType(x); |
| } else { |
| type = cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(s_type); |
| } |
| |
| return cpp11::to_r6(arrow::r::vec_to_arrow_Array(x, type, type_inferred)); |
| } |
| |
| // [[arrow::export]] |
| std::shared_ptr<arrow::Array> DictionaryArray__FromArrays( |
| const std::shared_ptr<arrow::DataType>& type, |
| const std::shared_ptr<arrow::Array>& indices, |
| const std::shared_ptr<arrow::Array>& dict) { |
| return ValueOrStop(arrow::DictionaryArray::FromArrays(type, indices, dict)); |
| } |