| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "./arrow_types.h" |
| #if defined(ARROW_R_WITH_ARROW) |
| |
| #include <arrow/array.h> |
| #include <arrow/builder.h> |
| #include <arrow/datum.h> |
| #include <arrow/table.h> |
| #include <arrow/util/bitmap_reader.h> |
| #include <arrow/util/bitmap_writer.h> |
| #include <arrow/util/int_util.h> |
| #include <arrow/util/parallel.h> |
| #include <arrow/util/task_group.h> |
| |
| namespace arrow { |
| |
| using internal::checked_cast; |
| using internal::IntegersCanFit; |
| |
| namespace r { |
| |
| using Rcpp::default_value; |
| using Rcpp::IntegerVector; |
| using Rcpp::LogicalVector_; |
| using Rcpp::no_init; |
| using Rcpp::Shield; |
| using Rcpp::StringVector_; |
| |
| class Converter { |
| public: |
| explicit Converter(ArrayVector arrays) : arrays_(std::move(arrays)) {} |
| |
| virtual ~Converter() {} |
| |
| // Allocate a vector of the right R type for this converter |
| virtual SEXP Allocate(R_xlen_t n) const = 0; |
| |
| // data[ start:(start + n) ] = NA |
| virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const = 0; |
| |
| // ingest the values from the array into data[ start : (start + n)] |
| // |
| // chunk_index indicates which of the chunk is being ingested into data. This is |
| // ignored by most implementations and currently only used with Dictionary |
| // arrays. |
| virtual Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, |
| size_t chunk_index) const = 0; |
| |
| // ingest one array |
| Status IngestOne(SEXP data, const std::shared_ptr<arrow::Array>& array, R_xlen_t start, |
| R_xlen_t n, size_t chunk_index) const { |
| if (array->null_count() == n) { |
| return Ingest_all_nulls(data, start, n); |
| } else { |
| return Ingest_some_nulls(data, array, start, n, chunk_index); |
| } |
| } |
| |
| // can this run in parallel ? |
| virtual bool Parallel() const { return true; } |
| |
| // Ingest all the arrays serially |
| Status IngestSerial(SEXP data) { |
| R_xlen_t k = 0, i = 0; |
| for (const auto& array : arrays_) { |
| auto n_chunk = array->length(); |
| RETURN_NOT_OK(IngestOne(data, array, k, n_chunk, i)); |
| k += n_chunk; |
| i++; |
| } |
| return Status::OK(); |
| } |
| |
| // ingest the arrays in parallel |
| // |
| // for each array, add a task to the task group |
| // |
| // The task group is Finish() iun the caller |
| void IngestParallel(SEXP data, const std::shared_ptr<arrow::internal::TaskGroup>& tg) { |
| R_xlen_t k = 0, i = 0; |
| for (const auto& array : arrays_) { |
| auto n_chunk = array->length(); |
| tg->Append([=] { return IngestOne(data, array, k, n_chunk, i); }); |
| k += n_chunk; |
| i++; |
| } |
| } |
| |
| // Converter factory |
| static std::shared_ptr<Converter> Make(const std::shared_ptr<DataType>& type, |
| ArrayVector arrays); |
| |
| protected: |
| ArrayVector arrays_; |
| }; |
| |
| // data[start:(start+n)] = NA |
| template <int RTYPE> |
| Status AllNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n) { |
| auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start; |
| std::fill_n(p_data, n, Rcpp::default_value<RTYPE>()); |
| return Status::OK(); |
| } |
| |
| // ingest the data from `array` into a slice of `data` |
| // |
| // each element goes through `lambda` when some conversion is needed |
| template <int RTYPE, typename array_value_type, typename Lambda> |
| Status SomeNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n, |
| const array_value_type* p_values, |
| const std::shared_ptr<arrow::Array>& array, Lambda lambda) { |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start; |
| |
| if (array->null_count()) { |
| arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), |
| array->offset(), n); |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data, ++p_values) { |
| *p_data = bitmap_reader.IsSet() ? lambda(*p_values) : default_value<RTYPE>(); |
| } |
| } else { |
| std::transform(p_values, p_values + n, p_data, lambda); |
| } |
| |
| return Status::OK(); |
| } |
| |
| template <typename Lambda> |
| Status IngestSome(const std::shared_ptr<arrow::Array>& array, R_xlen_t n, Lambda lambda) { |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next()) { |
| if (bitmap_reader.IsSet()) RETURN_NOT_OK(lambda(i)); |
| } |
| |
| } else { |
| for (R_xlen_t i = 0; i < n; i++) { |
| RETURN_NOT_OK(lambda(i)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| // Allocate + Ingest |
| SEXP ArrayVector__as_vector(R_xlen_t n, const std::shared_ptr<DataType>& type, |
| const ArrayVector& arrays) { |
| auto converter = Converter::Make(type, arrays); |
| Shield<SEXP> data(converter->Allocate(n)); |
| StopIfNotOk(converter->IngestSerial(data)); |
| return data; |
| } |
| |
| template <int RTYPE> |
| class Converter_SimpleArray : public Converter { |
| using Vector = Rcpp::Vector<RTYPE, Rcpp::NoProtectStorage>; |
| using value_type = typename Vector::stored_type; |
| |
| public: |
| explicit Converter_SimpleArray(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Vector(no_init(n)); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<RTYPE>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<value_type>(1); |
| auto echo = [](value_type value) { return value; }; |
| return SomeNull_Ingest<RTYPE, value_type>(data, start, n, p_values, array, echo); |
| } |
| }; |
| |
| class Converter_Date32 : public Converter_SimpleArray<REALSXP> { |
| public: |
| explicit Converter_Date32(const ArrayVector& arrays) |
| : Converter_SimpleArray<REALSXP>(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::NumericVector data(no_init(n)); |
| data.attr("class") = "Date"; |
| return data; |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto convert = [](int days) { return static_cast<double>(days); }; |
| return SomeNull_Ingest<REALSXP, int>(data, start, n, array->data()->GetValues<int>(1), |
| array, convert); |
| } |
| }; |
| |
| template <typename StringArrayType> |
| struct Converter_String : public Converter { |
| public: |
| explicit Converter_String(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return StringVector_(no_init(n)); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<STRSXP>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_offset = array->data()->GetValues<int32_t>(1); |
| if (!p_offset) { |
| return Status::Invalid("Invalid offset buffer"); |
| } |
| auto p_strings = array->data()->GetValues<char>(2, *p_offset); |
| if (!p_strings) { |
| // There is an offset buffer, but the data buffer is null |
| // There is at least one value in the array and not all the values are null |
| // That means all values are either empty strings or nulls so there is nothing to do |
| |
| if (array->null_count()) { |
| arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), |
| array->offset(), n); |
| for (int i = 0; i < n; i++, null_reader.Next()) { |
| if (null_reader.IsNotSet()) { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| StringArrayType* string_array = static_cast<StringArrayType*>(array.get()); |
| if (array->null_count()) { |
| // need to watch for nulls |
| arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), |
| array->offset(), n); |
| for (int i = 0; i < n; i++, null_reader.Next()) { |
| if (null_reader.IsSet()) { |
| SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i))); |
| } else { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| |
| } else { |
| for (int i = 0; i < n; i++) { |
| SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i))); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| bool Parallel() const { return false; } |
| |
| inline SEXP r_string(const arrow::util::string_view& view) const { |
| return Rf_mkCharLenCE(view.data(), view.size(), CE_UTF8); |
| } |
| }; |
| |
| class Converter_Boolean : public Converter { |
| public: |
| explicit Converter_Boolean(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return LogicalVector_(no_init(n)); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<LGLSXP>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = Rcpp::internal::r_vector_start<LGLSXP>(data) + start; |
| auto p_bools = array->data()->GetValues<uint8_t>(1, 0); |
| if (!p_bools) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| |
| arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n); |
| if (array->null_count()) { |
| arrow::internal::BitmapReader null_reader(array->null_bitmap()->data(), |
| array->offset(), n); |
| |
| for (R_xlen_t i = 0; i < n; i++, data_reader.Next(), null_reader.Next(), ++p_data) { |
| *p_data = null_reader.IsSet() ? data_reader.IsSet() : NA_LOGICAL; |
| } |
| } else { |
| for (R_xlen_t i = 0; i < n; i++, data_reader.Next(), ++p_data) { |
| *p_data = data_reader.IsSet(); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| template <typename ArrayType> |
| class Converter_Binary : public Converter { |
| public: |
| using offset_type = typename ArrayType::offset_type; |
| explicit Converter_Binary(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); |
| if (std::is_same<ArrayType, BinaryArray>::value) { |
| Rf_classgets(res, data::classes_arrow_binary); |
| } else { |
| Rf_classgets(res, data::classes_arrow_large_binary); |
| } |
| UNPROTECT(1); |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const ArrayType* binary_array = checked_cast<const ArrayType*>(array.get()); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| offset_type ni; |
| auto value = binary_array->GetValue(i, &ni); |
| if (ni > R_XLEN_T_MAX) { |
| return Status::RError("Array too big to be represented as a raw vector"); |
| } |
| SEXP raw = PROTECT(Rf_allocVector(RAWSXP, ni)); |
| std::copy(value, value + ni, RAW(raw)); |
| |
| SET_VECTOR_ELT(data, i, raw); |
| UNPROTECT(1); |
| |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| }; |
| |
| class Converter_FixedSizeBinary : public Converter { |
| public: |
| explicit Converter_FixedSizeBinary(const ArrayVector& arrays, int byte_width) |
| : Converter(arrays), byte_width_(byte_width) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); |
| Rf_classgets(res, data::classes_arrow_fixed_size_binary); |
| Rf_setAttrib(res, symbols::byte_width, Rf_ScalarInteger(byte_width_)); |
| UNPROTECT(1); |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const FixedSizeBinaryArray* binary_array = |
| checked_cast<const FixedSizeBinaryArray*>(array.get()); |
| |
| int byte_width = binary_array->byte_width(); |
| auto ingest_one = [&, byte_width](R_xlen_t i) { |
| auto value = binary_array->GetValue(i); |
| SEXP raw = PROTECT(Rf_allocVector(RAWSXP, byte_width)); |
| std::copy(value, value + byte_width, RAW(raw)); |
| |
| SET_VECTOR_ELT(data, i, raw); |
| UNPROTECT(1); |
| |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| private: |
| int byte_width_; |
| }; |
| |
| class Converter_Dictionary : public Converter { |
| private: |
| bool need_unification_; |
| std::unique_ptr<arrow::DictionaryUnifier> unifier_; |
| std::vector<std::shared_ptr<Buffer>> arrays_transpose_; |
| std::shared_ptr<DataType> out_type_; |
| std::shared_ptr<Array> dictionary_; |
| |
| public: |
| explicit Converter_Dictionary(const ArrayVector& arrays) |
| : Converter(arrays), need_unification_(NeedUnification()) { |
| if (need_unification_) { |
| const auto& arr_first = checked_cast<const DictionaryArray&>(*arrays[0]); |
| const auto& arr_type = checked_cast<const DictionaryType&>(*arr_first.type()); |
| unifier_ = ValueOrStop(DictionaryUnifier::Make(arr_type.value_type())); |
| |
| size_t n_arrays = arrays.size(); |
| arrays_transpose_.resize(n_arrays); |
| |
| for (size_t i = 0; i < n_arrays; i++) { |
| const auto& dict_i = |
| *checked_cast<const DictionaryArray&>(*arrays[i]).dictionary(); |
| StopIfNotOk(unifier_->Unify(dict_i, &arrays_transpose_[i])); |
| } |
| |
| StopIfNotOk(unifier_->GetResult(&out_type_, &dictionary_)); |
| } else { |
| const auto& dict_array = checked_cast<const DictionaryArray&>(*arrays_[0]); |
| |
| auto indices = dict_array.indices(); |
| switch (indices->type_id()) { |
| case Type::UINT8: |
| case Type::INT8: |
| case Type::UINT16: |
| case Type::INT16: |
| case Type::INT32: |
| // TODO: also add int64, uint32, uint64 downcasts, if possible |
| break; |
| default: |
| Rcpp::stop("Cannot convert Dictionary Array of type `%s` to R", |
| dict_array.type()->ToString()); |
| } |
| |
| dictionary_ = dict_array.dictionary(); |
| } |
| } |
| |
| SEXP Allocate(R_xlen_t n) const { |
| IntegerVector data(no_init(n)); |
| data.attr("levels") = GetLevels(); |
| if (GetOrdered()) { |
| Rf_classgets(data, arrow::r::data::classes_ordered); |
| } else { |
| Rf_classgets(data, arrow::r::data::classes_factor); |
| } |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<INTSXP>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const DictionaryArray& dict_array = |
| checked_cast<const DictionaryArray&>(*array.get()); |
| auto indices = dict_array.indices(); |
| switch (indices->type_id()) { |
| case Type::UINT8: |
| return Ingest_some_nulls_Impl<arrow::UInt8Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT8: |
| return Ingest_some_nulls_Impl<arrow::Int8Type>(data, array, start, n, |
| chunk_index); |
| case Type::UINT16: |
| return Ingest_some_nulls_Impl<arrow::UInt16Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT16: |
| return Ingest_some_nulls_Impl<arrow::Int16Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT32: |
| return Ingest_some_nulls_Impl<arrow::Int32Type>(data, array, start, n, |
| chunk_index); |
| default: |
| break; |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| template <typename Type> |
| Status Ingest_some_nulls_Impl(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| using index_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; |
| auto indices = checked_cast<const DictionaryArray&>(*array).indices(); |
| auto raw_indices = indices->data()->GetValues<index_type>(1); |
| |
| // convert the 0-based indices from the arrow Array |
| // to 1-based indices used in R factors |
| if (need_unification_) { |
| // transpose the indices before converting |
| auto transposed = |
| reinterpret_cast<const int32_t*>(arrays_transpose_[chunk_index]->data()); |
| auto transpose_convert = [=](index_type i) { return transposed[i] + 1; }; |
| |
| return SomeNull_Ingest<INTSXP>(data, start, n, raw_indices, indices, |
| transpose_convert); |
| } else { |
| auto convert = [](index_type i) { return static_cast<int>(i) + 1; }; |
| |
| return SomeNull_Ingest<INTSXP>(data, start, n, raw_indices, indices, convert); |
| } |
| } |
| |
| bool NeedUnification() { |
| int n = arrays_.size(); |
| if (n < 2) { |
| return false; |
| } |
| const auto& arr_first = checked_cast<const DictionaryArray&>(*arrays_[0]); |
| for (int i = 1; i < n; i++) { |
| const auto& arr = checked_cast<const DictionaryArray&>(*arrays_[i]); |
| if (!(arr_first.dictionary()->Equals(arr.dictionary()))) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| bool GetOrdered() const { |
| return checked_cast<const DictionaryArray&>(*arrays_[0]).dict_type()->ordered(); |
| } |
| |
| SEXP GetLevels() const { |
| // R factor levels must be type "character" so coerce `dict` to STRSXP |
| // TODO (npr): this coercion should be optional, "dictionariesAsFactors" ;) |
| // Alternative: preserve the logical type of the dictionary values |
| // (e.g. if dict is timestamp, return a POSIXt R vector, not factor) |
| if (dictionary_->type_id() != Type::STRING) { |
| Rcpp::warning( |
| "Coercing dictionary values from type %s to R character factor levels", |
| dictionary_->type()->ToString()); |
| } |
| SEXP vec = PROTECT(ArrayVector__as_vector(dictionary_->length(), dictionary_->type(), |
| {dictionary_})); |
| SEXP strings_vec = PROTECT(Rf_coerceVector(vec, STRSXP)); |
| UNPROTECT(2); |
| return strings_vec; |
| } |
| }; |
| |
| class Converter_Struct : public Converter { |
| public: |
| explicit Converter_Struct(const ArrayVector& arrays) : Converter(arrays), converters() { |
| auto first_array = checked_cast<const arrow::StructArray*>(this->arrays_[0].get()); |
| int nf = first_array->num_fields(); |
| for (int i = 0; i < nf; i++) { |
| converters.push_back( |
| Converter::Make(first_array->field(i)->type(), {first_array->field(i)})); |
| } |
| } |
| |
| SEXP Allocate(R_xlen_t n) const { |
| // allocate a data frame column to host each array |
| auto first_array = checked_cast<const arrow::StructArray*>(this->arrays_[0].get()); |
| auto type = first_array->struct_type(); |
| int nf = first_array->num_fields(); |
| Rcpp::List out(nf); |
| Rcpp::CharacterVector colnames(nf); |
| for (int i = 0; i < nf; i++) { |
| out[i] = converters[i]->Allocate(n); |
| colnames[i] = Rcpp::String(type->field(i)->name(), CE_UTF8); |
| } |
| |
| IntegerVector rn(2); |
| rn[0] = NA_INTEGER; |
| rn[1] = -n; |
| Rf_setAttrib(out, symbols::row_names, rn); |
| Rf_setAttrib(out, R_NamesSymbol, colnames); |
| Rf_setAttrib(out, R_ClassSymbol, |
| Rcpp::CharacterVector::create("tbl_df", "tbl", "data.frame")); |
| return out; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| int nf = converters.size(); |
| for (int i = 0; i < nf; i++) { |
| StopIfNotOk(converters[i]->Ingest_all_nulls(VECTOR_ELT(data, i), start, n)); |
| } |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto struct_array = checked_cast<const arrow::StructArray*>(array.get()); |
| int nf = converters.size(); |
| // Flatten() deals with merging of nulls |
| auto arrays = ValueOrStop(struct_array->Flatten(default_memory_pool())); |
| for (int i = 0; i < nf; i++) { |
| StopIfNotOk(converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], start, |
| n, chunk_index)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<std::shared_ptr<Converter>> converters; |
| }; |
| |
| double ms_to_seconds(int64_t ms) { return static_cast<double>(ms) / 1000; } |
| |
| class Converter_Date64 : public Converter { |
| public: |
| explicit Converter_Date64(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::NumericVector data(no_init(n)); |
| Rf_classgets(data, arrow::r::data::classes_POSIXct); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<REALSXP>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto convert = [](int64_t ms) { return static_cast<double>(ms / 1000); }; |
| return SomeNull_Ingest<REALSXP, int64_t>( |
| data, start, n, array->data()->GetValues<int64_t>(1), array, convert); |
| } |
| }; |
| |
| template <int RTYPE, typename Type> |
| class Converter_Promotion : public Converter { |
| using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type; |
| using value_type = typename TypeTraits<Type>::ArrayType::value_type; |
| |
| public: |
| explicit Converter_Promotion(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| return Rcpp::Vector<RTYPE, Rcpp::NoProtectStorage>(no_init(n)); |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<RTYPE>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto convert = [](value_type value) { return static_cast<r_stored_type>(value); }; |
| return SomeNull_Ingest<RTYPE, value_type>( |
| data, start, n, array->data()->GetValues<value_type>(1), array, convert); |
| } |
| |
| private: |
| static r_stored_type value_convert(value_type value) { |
| return static_cast<r_stored_type>(value); |
| } |
| }; |
| |
| template <typename value_type, typename unit_type = TimeType> |
| class Converter_Time : public Converter { |
| public: |
| explicit Converter_Time(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::NumericVector data(no_init(n)); |
| data.attr("class") = Rcpp::CharacterVector::create("hms", "difftime"); |
| // hms difftime is always stored as "seconds" |
| data.attr("units") = Rcpp::CharacterVector::create("secs"); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<REALSXP>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| int multiplier = TimeUnit_multiplier(array); |
| auto convert = [=](value_type value) { |
| return static_cast<double>(value) / multiplier; |
| }; |
| return SomeNull_Ingest<REALSXP, value_type>( |
| data, start, n, array->data()->GetValues<value_type>(1), array, convert); |
| } |
| |
| private: |
| int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const { |
| // hms difftime is always "seconds", so multiply based on the Array's TimeUnit |
| switch (static_cast<unit_type*>(array->type().get())->unit()) { |
| case TimeUnit::SECOND: |
| return 1; |
| case TimeUnit::MILLI: |
| return 1000; |
| case TimeUnit::MICRO: |
| return 1000000; |
| case TimeUnit::NANO: |
| return 1000000000; |
| default: |
| return 0; |
| } |
| } |
| }; |
| |
| template <typename value_type> |
| class Converter_Timestamp : public Converter_Time<value_type, TimestampType> { |
| public: |
| explicit Converter_Timestamp(const ArrayVector& arrays) |
| : Converter_Time<value_type, TimestampType>(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::NumericVector data(no_init(n)); |
| Rf_classgets(data, arrow::r::data::classes_POSIXct); |
| auto array = checked_cast<const TimestampArray*>(this->arrays_[0].get()); |
| auto array_type = checked_cast<const TimestampType*>(array->type().get()); |
| std::string tzone = array_type->timezone(); |
| if (tzone.size() > 0) { |
| data.attr("tzone") = tzone; |
| } |
| return data; |
| } |
| }; |
| |
| class Converter_Decimal : public Converter { |
| public: |
| explicit Converter_Decimal(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rcpp::NumericVector_(no_init(n)); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return AllNull_Ingest<REALSXP>(data, start, n); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = Rcpp::internal::r_vector_start<REALSXP>(data) + start; |
| const auto& decimals_arr = checked_cast<const arrow::Decimal128Array&>(*array); |
| |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { |
| *p_data = bitmap_reader.IsSet() ? std::stod(decimals_arr.FormatValue(i).c_str()) |
| : NA_REAL; |
| } |
| } else { |
| for (R_xlen_t i = 0; i < n; i++, ++p_data) { |
| *p_data = std::stod(decimals_arr.FormatValue(i).c_str()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| template <typename ListArrayType> |
| class Converter_List : public Converter { |
| private: |
| std::shared_ptr<arrow::DataType> value_type_; |
| |
| public: |
| explicit Converter_List(const ArrayVector& arrays, |
| const std::shared_ptr<arrow::DataType>& value_type) |
| : Converter(arrays), value_type_(value_type) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::List res(no_init(n)); |
| if (std::is_same<ListArrayType, ListArray>::value) { |
| Rf_setAttrib(res, R_ClassSymbol, arrow::r::data::classes_arrow_list); |
| } else { |
| Rf_setAttrib(res, R_ClassSymbol, arrow::r::data::classes_arrow_large_list); |
| } |
| |
| // Build an empty array to match value_type |
| std::unique_ptr<arrow::ArrayBuilder> builder; |
| StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), value_type_, &builder)); |
| |
| std::shared_ptr<arrow::Array> array; |
| StopIfNotOk(builder->Finish(&array)); |
| |
| // convert to an R object to store as the list' ptype |
| SEXP ptype = Array__as_vector(array); |
| Rf_setAttrib(res, arrow::r::symbols::ptype, ptype); |
| |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| // nothing to do, list contain NULL by default |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto list_array = checked_cast<const ListArrayType*>(array.get()); |
| auto values_array = list_array->values(); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| auto slice = list_array->value_slice(i); |
| SET_VECTOR_ELT(data, i + start, Array__as_vector(slice)); |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| bool Parallel() const { return false; } |
| }; |
| |
| class Converter_FixedSizeList : public Converter { |
| private: |
| std::shared_ptr<arrow::DataType> value_type_; |
| int list_size_; |
| |
| public: |
| explicit Converter_FixedSizeList(const ArrayVector& arrays, |
| const std::shared_ptr<arrow::DataType>& value_type, |
| int list_size) |
| : Converter(arrays), value_type_(value_type), list_size_(list_size) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::List res(no_init(n)); |
| Rf_classgets(res, arrow::r::data::classes_arrow_fixed_size_list); |
| Rf_setAttrib(res, arrow::r::symbols::list_size, Rf_ScalarInteger(list_size_)); |
| |
| // Build an empty array to match value_type |
| std::unique_ptr<arrow::ArrayBuilder> builder; |
| StopIfNotOk(arrow::MakeBuilder(arrow::default_memory_pool(), value_type_, &builder)); |
| |
| std::shared_ptr<arrow::Array> array; |
| StopIfNotOk(builder->Finish(&array)); |
| |
| // convert to an R object to store as the list' ptype |
| SEXP ptype = Array__as_vector(array); |
| Rf_setAttrib(res, arrow::r::symbols::ptype, ptype); |
| |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| // nothing to do, list contain NULL by default |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const auto& fixed_size_list_array = checked_cast<const FixedSizeListArray&>(*array); |
| auto values_array = fixed_size_list_array.values(); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| auto slice = fixed_size_list_array.value_slice(i); |
| SET_VECTOR_ELT(data, i + start, Array__as_vector(slice)); |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| bool Parallel() const { return false; } |
| }; |
| |
| class Converter_Int64 : public Converter { |
| public: |
| explicit Converter_Int64(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::NumericVector data(no_init(n)); |
| data.attr("class") = "integer64"; |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; |
| std::fill_n(p_data, n, NA_INT64); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<int64_t>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| |
| auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; |
| |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { |
| *p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64; |
| } |
| } else { |
| std::copy_n(p_values, n, p_data); |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| class Converter_Null : public Converter { |
| public: |
| explicit Converter_Null(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| Rcpp::LogicalVector data(n, NA_LOGICAL); |
| data.attr("class") = "vctrs_unspecified"; |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| return Status::OK(); |
| } |
| }; |
| |
| bool ArraysCanFitInteger(ArrayVector arrays) { |
| bool out = false; |
| auto i32 = arrow::int32(); |
| for (const auto& array : arrays) { |
| if (!out) { |
| out = arrow::IntegersCanFit(arrow::Datum(array), *i32).ok(); |
| } |
| } |
| return out; |
| } |
| |
| std::shared_ptr<Converter> Converter::Make(const std::shared_ptr<DataType>& type, |
| ArrayVector arrays) { |
| if (arrays.empty()) { |
| // slight hack for the 0-row case since the converters expect at least one |
| // chunk to process. |
| arrays.push_back(ValueOrStop(arrow::MakeArrayOfNull(type, 0))); |
| } |
| |
| switch (type->id()) { |
| // direct support |
| case Type::INT32: |
| return std::make_shared<arrow::r::Converter_SimpleArray<INTSXP>>(std::move(arrays)); |
| |
| case Type::DOUBLE: |
| return std::make_shared<arrow::r::Converter_SimpleArray<REALSXP>>( |
| std::move(arrays)); |
| |
| // need to handle 1-bit case |
| case Type::BOOL: |
| return std::make_shared<arrow::r::Converter_Boolean>(std::move(arrays)); |
| |
| case Type::BINARY: |
| return std::make_shared<arrow::r::Converter_Binary<arrow::BinaryArray>>( |
| std::move(arrays)); |
| |
| case Type::LARGE_BINARY: |
| return std::make_shared<arrow::r::Converter_Binary<arrow::LargeBinaryArray>>( |
| std::move(arrays)); |
| |
| case Type::FIXED_SIZE_BINARY: |
| return std::make_shared<arrow::r::Converter_FixedSizeBinary>( |
| std::move(arrays), |
| checked_cast<const FixedSizeBinaryType&>(*type).byte_width()); |
| |
| // handle memory dense strings |
| case Type::STRING: |
| return std::make_shared<arrow::r::Converter_String<arrow::StringArray>>( |
| std::move(arrays)); |
| |
| case Type::LARGE_STRING: |
| return std::make_shared<arrow::r::Converter_String<arrow::LargeStringArray>>( |
| std::move(arrays)); |
| |
| case Type::DICTIONARY: |
| return std::make_shared<arrow::r::Converter_Dictionary>(std::move(arrays)); |
| |
| case Type::DATE32: |
| return std::make_shared<arrow::r::Converter_Date32>(std::move(arrays)); |
| |
| case Type::DATE64: |
| return std::make_shared<arrow::r::Converter_Date64>(std::move(arrays)); |
| |
| // promotions to integer vector |
| case Type::INT8: |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::Int8Type>>( |
| std::move(arrays)); |
| |
| case Type::UINT8: |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::UInt8Type>>( |
| std::move(arrays)); |
| |
| case Type::INT16: |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::Int16Type>>( |
| std::move(arrays)); |
| |
| case Type::UINT16: |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::UInt16Type>>( |
| std::move(arrays)); |
| |
| // promotions to numeric vector, if they don't fit into int32 |
| case Type::UINT32: |
| if (ArraysCanFitInteger(arrays)) { |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::UInt32Type>>( |
| std::move(arrays)); |
| } else { |
| return std::make_shared< |
| arrow::r::Converter_Promotion<REALSXP, arrow::UInt32Type>>(std::move(arrays)); |
| } |
| |
| case Type::UINT64: |
| if (ArraysCanFitInteger(arrays)) { |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::UInt64Type>>( |
| std::move(arrays)); |
| } else { |
| return std::make_shared< |
| arrow::r::Converter_Promotion<REALSXP, arrow::UInt64Type>>(std::move(arrays)); |
| } |
| |
| case Type::HALF_FLOAT: |
| return std::make_shared< |
| arrow::r::Converter_Promotion<REALSXP, arrow::HalfFloatType>>( |
| std::move(arrays)); |
| |
| case Type::FLOAT: |
| return std::make_shared<arrow::r::Converter_Promotion<REALSXP, arrow::FloatType>>( |
| std::move(arrays)); |
| |
| // time32 and time64 |
| case Type::TIME32: |
| return std::make_shared<arrow::r::Converter_Time<int32_t>>(std::move(arrays)); |
| |
| case Type::TIME64: |
| return std::make_shared<arrow::r::Converter_Time<int64_t>>(std::move(arrays)); |
| |
| case Type::TIMESTAMP: |
| return std::make_shared<arrow::r::Converter_Timestamp<int64_t>>(std::move(arrays)); |
| |
| case Type::INT64: |
| // Prefer integer if it fits |
| if (ArraysCanFitInteger(arrays)) { |
| return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::Int64Type>>( |
| std::move(arrays)); |
| } else { |
| return std::make_shared<arrow::r::Converter_Int64>(std::move(arrays)); |
| } |
| |
| case Type::DECIMAL: |
| return std::make_shared<arrow::r::Converter_Decimal>(std::move(arrays)); |
| |
| // nested |
| case Type::STRUCT: |
| return std::make_shared<arrow::r::Converter_Struct>(std::move(arrays)); |
| |
| case Type::LIST: |
| return std::make_shared<arrow::r::Converter_List<arrow::ListArray>>( |
| std::move(arrays), |
| checked_cast<const arrow::ListType*>(type.get())->value_type()); |
| |
| case Type::LARGE_LIST: |
| return std::make_shared<arrow::r::Converter_List<arrow::LargeListArray>>( |
| std::move(arrays), |
| checked_cast<const arrow::LargeListType*>(type.get())->value_type()); |
| |
| case Type::FIXED_SIZE_LIST: |
| return std::make_shared<arrow::r::Converter_FixedSizeList>( |
| std::move(arrays), |
| checked_cast<const arrow::FixedSizeListType&>(*type).value_type(), |
| checked_cast<const arrow::FixedSizeListType&>(*type).list_size()); |
| |
| case Type::NA: |
| return std::make_shared<arrow::r::Converter_Null>(std::move(arrays)); |
| |
| default: |
| break; |
| } |
| |
| Rcpp::stop(tfm::format("cannot handle Array of type %s", type->name())); |
| return nullptr; |
| } |
| |
| Rcpp::List to_dataframe_serial( |
| int64_t nr, int64_t nc, const Rcpp::CharacterVector& names, |
| const std::vector<std::shared_ptr<Converter>>& converters) { |
| Rcpp::List tbl(nc); |
| |
| for (int i = 0; i < nc; i++) { |
| SEXP column = tbl[i] = converters[i]->Allocate(nr); |
| StopIfNotOk(converters[i]->IngestSerial(column)); |
| } |
| tbl.attr("names") = names; |
| tbl.attr("class") = Rcpp::CharacterVector::create("tbl_df", "tbl", "data.frame"); |
| tbl.attr("row.names") = Rcpp::IntegerVector::create(NA_INTEGER, -nr); |
| return tbl; |
| } |
| |
| Rcpp::List to_dataframe_parallel( |
| int64_t nr, int64_t nc, const Rcpp::CharacterVector& names, |
| const std::vector<std::shared_ptr<Converter>>& converters) { |
| Rcpp::List tbl(nc); |
| |
| // task group to ingest data in parallel |
| auto tg = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); |
| |
| // allocate and start ingesting immediately the columns that |
| // can be ingested in parallel, i.e. when ingestion no longer |
| // need to happen on the main thread |
| for (int i = 0; i < nc; i++) { |
| // allocate data for column i |
| SEXP column = tbl[i] = converters[i]->Allocate(nr); |
| |
| // add a task to ingest data of that column if that can be done in parallel |
| if (converters[i]->Parallel()) { |
| converters[i]->IngestParallel(column, tg); |
| } |
| } |
| |
| arrow::Status status = arrow::Status::OK(); |
| |
| // ingest the columns that cannot be dealt with in parallel |
| for (int i = 0; i < nc; i++) { |
| if (!converters[i]->Parallel()) { |
| status &= converters[i]->IngestSerial(tbl[i]); |
| } |
| } |
| |
| // wait for the ingestion to be finished |
| status &= tg->Finish(); |
| |
| StopIfNotOk(status); |
| |
| tbl.attr("names") = names; |
| tbl.attr("class") = Rcpp::CharacterVector::create("tbl_df", "tbl", "data.frame"); |
| tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr); |
| |
| return tbl; |
| } |
| |
| } // namespace r |
| } // namespace arrow |
| |
| // [[arrow::export]] |
| SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) { |
| return arrow::r::ArrayVector__as_vector(array->length(), array->type(), {array}); |
| } |
| |
| // [[arrow::export]] |
| SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array) { |
| return arrow::r::ArrayVector__as_vector(chunked_array->length(), chunked_array->type(), |
| chunked_array->chunks()); |
| } |
| |
| // [[arrow::export]] |
| Rcpp::List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch, |
| bool use_threads) { |
| int64_t nc = batch->num_columns(); |
| int64_t nr = batch->num_rows(); |
| Rcpp::CharacterVector names(nc); |
| std::vector<arrow::ArrayVector> arrays(nc); |
| std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc); |
| |
| for (int64_t i = 0; i < nc; i++) { |
| names[i] = Rcpp::String(batch->column_name(i), CE_UTF8); |
| arrays[i] = {batch->column(i)}; |
| converters[i] = arrow::r::Converter::Make(batch->column(i)->type(), arrays[i]); |
| } |
| |
| if (use_threads) { |
| return arrow::r::to_dataframe_parallel(nr, nc, names, converters); |
| } else { |
| return arrow::r::to_dataframe_serial(nr, nc, names, converters); |
| } |
| } |
| |
| // [[arrow::export]] |
| Rcpp::List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, |
| bool use_threads) { |
| int64_t nc = table->num_columns(); |
| int64_t nr = table->num_rows(); |
| Rcpp::CharacterVector names(nc); |
| std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc); |
| |
| for (int64_t i = 0; i < nc; i++) { |
| converters[i] = |
| arrow::r::Converter::Make(table->column(i)->type(), table->column(i)->chunks()); |
| names[i] = Rcpp::String(table->field(i)->name(), CE_UTF8); |
| } |
| |
| if (use_threads) { |
| return arrow::r::to_dataframe_parallel(nr, nc, names, converters); |
| } else { |
| return arrow::r::to_dataframe_serial(nr, nc, names, converters); |
| } |
| } |
| |
| #endif |