| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "./arrow_types.h" |
| #if defined(ARROW_R_WITH_ARROW) |
| |
| #include <arrow/array.h> |
| #include <arrow/builder.h> |
| #include <arrow/datum.h> |
| #include <arrow/table.h> |
| #include <arrow/util/bitmap_reader.h> |
| #include <arrow/util/bitmap_writer.h> |
| #include <arrow/util/int_util.h> |
| #include <arrow/util/parallel.h> |
| #include <arrow/util/task_group.h> |
| |
| #include <type_traits> |
| |
| namespace arrow { |
| |
| using internal::checked_cast; |
| using internal::IntegersCanFit; |
| |
| namespace r { |
| |
| class Converter { |
| public: |
| explicit Converter(ArrayVector arrays) : arrays_(std::move(arrays)) {} |
| |
| virtual ~Converter() {} |
| |
| // Allocate a vector of the right R type for this converter |
| virtual SEXP Allocate(R_xlen_t n) const = 0; |
| |
| // data[ start:(start + n) ] = NA |
| virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const = 0; |
| |
| // ingest the values from the array into data[ start : (start + n)] |
| // |
| // chunk_index indicates which of the chunk is being ingested into data. This is |
| // ignored by most implementations and currently only used with Dictionary |
| // arrays. |
| virtual Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, |
| size_t chunk_index) const = 0; |
| |
| // ingest one array |
| Status IngestOne(SEXP data, const std::shared_ptr<arrow::Array>& array, R_xlen_t start, |
| R_xlen_t n, size_t chunk_index) const { |
| if (array->null_count() == n) { |
| return Ingest_all_nulls(data, start, n); |
| } else { |
| return Ingest_some_nulls(data, array, start, n, chunk_index); |
| } |
| } |
| |
| // can this run in parallel ? |
| virtual bool Parallel() const { return true; } |
| |
| // Ingest all the arrays serially |
| Status IngestSerial(SEXP data) { |
| R_xlen_t k = 0, i = 0; |
| for (const auto& array : arrays_) { |
| auto n_chunk = array->length(); |
| RETURN_NOT_OK(IngestOne(data, array, k, n_chunk, i)); |
| k += n_chunk; |
| i++; |
| } |
| return Status::OK(); |
| } |
| |
| // ingest the arrays in parallel |
| // |
| // for each array, add a task to the task group |
| // |
| // The task group is Finish() in the caller |
| // The converter itself is passed as `self` so that if one of the parallel ops |
| // hits `stop()`, we don't bail before `tg` is destroyed, which would cause a crash |
| void IngestParallel(SEXP data, const std::shared_ptr<arrow::internal::TaskGroup>& tg, |
| std::shared_ptr<Converter> self) { |
| R_xlen_t k = 0, i = 0; |
| for (const auto& array : arrays_) { |
| auto n_chunk = array->length(); |
| tg->Append([=] { return self->IngestOne(data, array, k, n_chunk, i); }); |
| k += n_chunk; |
| i++; |
| } |
| } |
| |
| // Converter factory |
| static std::shared_ptr<Converter> Make(const std::shared_ptr<DataType>& type, |
| ArrayVector arrays); |
| |
| protected: |
| ArrayVector arrays_; |
| }; |
| |
| template <typename SetNonNull, typename SetNull> |
| Status IngestSome(const std::shared_ptr<arrow::Array>& array, R_xlen_t n, |
| SetNonNull&& set_non_null, SetNull&& set_null) { |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next()) { |
| if (bitmap_reader.IsSet()) { |
| RETURN_NOT_OK(set_non_null(i)); |
| } else { |
| RETURN_NOT_OK(set_null(i)); |
| } |
| } |
| |
| } else { |
| for (R_xlen_t i = 0; i < n; i++) { |
| RETURN_NOT_OK(set_non_null(i)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| template <typename SetNonNull> |
| Status IngestSome(const std::shared_ptr<arrow::Array>& array, R_xlen_t n, |
| SetNonNull&& set_non_null) { |
| auto nothing = [](R_xlen_t i) { return Status::OK(); }; |
| return IngestSome(array, n, std::forward<SetNonNull>(set_non_null), nothing); |
| } |
| |
| // Allocate + Ingest |
| SEXP ArrayVector__as_vector(R_xlen_t n, const std::shared_ptr<DataType>& type, |
| const ArrayVector& arrays) { |
| auto converter = Converter::Make(type, arrays); |
| SEXP data = PROTECT(converter->Allocate(n)); |
| StopIfNotOk(converter->IngestSerial(data)); |
| UNPROTECT(1); |
| return data; |
| } |
| |
| template <typename Type> |
| class Converter_Int : public Converter { |
| using value_type = typename TypeTraits<Type>::ArrayType::value_type; |
| |
| public: |
| explicit Converter_Int(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(INTSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(INTEGER(data) + start, n, NA_INTEGER); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<value_type>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = INTEGER(data) + start; |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<int>(p_values[i]); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_INTEGER; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename Type> |
| class Converter_Double : public Converter { |
| using value_type = typename TypeTraits<Type>::ArrayType::value_type; |
| |
| public: |
| explicit Converter_Double(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<value_type>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = REAL(data) + start; |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<value_type>(p_values[i]); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| class Converter_Date32 : public Converter { |
| public: |
| explicit Converter_Date32(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP data = PROTECT(Rf_allocVector(REALSXP, n)); |
| Rf_classgets(data, Rf_mkString("Date")); |
| UNPROTECT(1); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<int>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = REAL(data) + start; |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i]); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename StringArrayType> |
| struct Converter_String : public Converter { |
| public: |
| explicit Converter_String(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(STRSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| for (R_xlen_t i = 0; i < n; i++) { |
| SET_STRING_ELT(data, i + start, NA_STRING); |
| } |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_offset = array->data()->GetValues<int32_t>(1); |
| if (!p_offset) { |
| return Status::Invalid("Invalid offset buffer"); |
| } |
| auto p_strings = array->data()->GetValues<char>(2, *p_offset); |
| if (!p_strings) { |
| // There is an offset buffer, but the data buffer is null |
| // There is at least one value in the array and not all the values are null |
| // That means all values are either empty strings or nulls so there is nothing to do |
| |
| if (array->null_count()) { |
| arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), |
| array->offset(), n); |
| for (int i = 0; i < n; i++, null_reader.Next()) { |
| if (null_reader.IsNotSet()) { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| StringArrayType* string_array = static_cast<StringArrayType*>(array.get()); |
| |
| const bool all_valid = array->null_count() == 0; |
| const bool strip_out_nuls = GetBoolOption("arrow.skip_nul", false); |
| |
| bool nul_was_stripped = false; |
| |
| if (all_valid) { |
| // no need to watch for missing strings |
| cpp11::unwind_protect([&] { |
| if (strip_out_nuls) { |
| for (int i = 0; i < n; i++) { |
| SET_STRING_ELT(data, start + i, |
| r_string_from_view_strip_nul(string_array->GetView(i), |
| &nul_was_stripped)); |
| } |
| return; |
| } |
| |
| for (int i = 0; i < n; i++) { |
| SET_STRING_ELT(data, start + i, r_string_from_view(string_array->GetView(i))); |
| } |
| }); |
| } else { |
| cpp11::unwind_protect([&] { |
| arrow::internal::BitmapReader validity_reader(array->null_bitmap_data(), |
| array->offset(), n); |
| |
| if (strip_out_nuls) { |
| for (int i = 0; i < n; i++, validity_reader.Next()) { |
| if (validity_reader.IsSet()) { |
| SET_STRING_ELT(data, start + i, |
| r_string_from_view_strip_nul(string_array->GetView(i), |
| &nul_was_stripped)); |
| } else { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| return; |
| } |
| |
| for (int i = 0; i < n; i++, validity_reader.Next()) { |
| if (validity_reader.IsSet()) { |
| SET_STRING_ELT(data, start + i, r_string_from_view(string_array->GetView(i))); |
| } else { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| }); |
| } |
| |
| if (nul_was_stripped) { |
| cpp11::warning("Stripping '\\0' (nul) from character vector"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| bool Parallel() const { return false; } |
| |
| private: |
| static SEXP r_string_from_view(arrow::util::string_view view) { |
| return Rf_mkCharLenCE(view.data(), view.size(), CE_UTF8); |
| } |
| |
| static SEXP r_string_from_view_strip_nul(arrow::util::string_view view, |
| bool* nul_was_stripped) { |
| const char* old_string = view.data(); |
| |
| std::string stripped_string; |
| size_t stripped_len = 0, nul_count = 0; |
| |
| for (size_t i = 0; i < view.size(); i++) { |
| if (old_string[i] == '\0') { |
| ++nul_count; |
| |
| if (nul_count == 1) { |
| // first nul spotted: allocate stripped string storage |
| stripped_string = view.to_string(); |
| stripped_len = i; |
| } |
| |
| // don't copy old_string[i] (which is \0) into stripped_string |
| continue; |
| } |
| |
| if (nul_count > 0) { |
| stripped_string[stripped_len++] = old_string[i]; |
| } |
| } |
| |
| if (nul_count > 0) { |
| *nul_was_stripped = true; |
| stripped_string.resize(stripped_len); |
| return r_string_from_view(stripped_string); |
| } |
| |
| return r_string_from_view(view); |
| } |
| }; |
| |
| class Converter_Boolean : public Converter { |
| public: |
| explicit Converter_Boolean(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(LGLSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(LOGICAL(data) + start, n, NA_LOGICAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = LOGICAL(data) + start; |
| auto p_bools = array->data()->GetValues<uint8_t>(1, 0); |
| if (!p_bools) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| |
| arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = data_reader.IsSet(); |
| data_reader.Next(); |
| return Status::OK(); |
| }; |
| |
| auto null_one = [&](R_xlen_t i) { |
| data_reader.Next(); |
| p_data[i] = NA_LOGICAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename ArrayType> |
| class Converter_Binary : public Converter { |
| public: |
| using offset_type = typename ArrayType::offset_type; |
| explicit Converter_Binary(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); |
| if (std::is_same<ArrayType, BinaryArray>::value) { |
| Rf_classgets(res, data::classes_arrow_binary); |
| } else { |
| Rf_classgets(res, data::classes_arrow_large_binary); |
| } |
| UNPROTECT(1); |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const ArrayType* binary_array = checked_cast<const ArrayType*>(array.get()); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| offset_type ni; |
| auto value = binary_array->GetValue(i, &ni); |
| if (ni > R_XLEN_T_MAX) { |
| return Status::RError("Array too big to be represented as a raw vector"); |
| } |
| SEXP raw = PROTECT(Rf_allocVector(RAWSXP, ni)); |
| std::copy(value, value + ni, RAW(raw)); |
| |
| SET_VECTOR_ELT(data, i + start, raw); |
| UNPROTECT(1); |
| |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| virtual bool Parallel() const { return false; } |
| }; |
| |
| class Converter_FixedSizeBinary : public Converter { |
| public: |
| explicit Converter_FixedSizeBinary(const ArrayVector& arrays, int byte_width) |
| : Converter(arrays), byte_width_(byte_width) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); |
| Rf_classgets(res, data::classes_arrow_fixed_size_binary); |
| Rf_setAttrib(res, symbols::byte_width, Rf_ScalarInteger(byte_width_)); |
| UNPROTECT(1); |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const FixedSizeBinaryArray* binary_array = |
| checked_cast<const FixedSizeBinaryArray*>(array.get()); |
| |
| int byte_width = binary_array->byte_width(); |
| auto ingest_one = [&, byte_width](R_xlen_t i) { |
| auto value = binary_array->GetValue(i); |
| SEXP raw = PROTECT(Rf_allocVector(RAWSXP, byte_width)); |
| std::copy(value, value + byte_width, RAW(raw)); |
| |
| SET_VECTOR_ELT(data, i + start, raw); |
| UNPROTECT(1); |
| |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| virtual bool Parallel() const { return false; } |
| |
| private: |
| int byte_width_; |
| }; |
| |
| class Converter_Dictionary : public Converter { |
| private: |
| bool need_unification_; |
| std::unique_ptr<arrow::DictionaryUnifier> unifier_; |
| std::vector<std::shared_ptr<Buffer>> arrays_transpose_; |
| std::shared_ptr<DataType> out_type_; |
| std::shared_ptr<Array> dictionary_; |
| |
| public: |
| explicit Converter_Dictionary(const ArrayVector& arrays) |
| : Converter(arrays), need_unification_(NeedUnification()) { |
| if (need_unification_) { |
| const auto& arr_first = checked_cast<const DictionaryArray&>(*arrays[0]); |
| const auto& arr_type = checked_cast<const DictionaryType&>(*arr_first.type()); |
| unifier_ = ValueOrStop(DictionaryUnifier::Make(arr_type.value_type())); |
| |
| size_t n_arrays = arrays.size(); |
| arrays_transpose_.resize(n_arrays); |
| |
| for (size_t i = 0; i < n_arrays; i++) { |
| const auto& dict_i = |
| *checked_cast<const DictionaryArray&>(*arrays[i]).dictionary(); |
| StopIfNotOk(unifier_->Unify(dict_i, &arrays_transpose_[i])); |
| } |
| |
| StopIfNotOk(unifier_->GetResult(&out_type_, &dictionary_)); |
| } else { |
| const auto& dict_array = checked_cast<const DictionaryArray&>(*arrays_[0]); |
| |
| auto indices = dict_array.indices(); |
| switch (indices->type_id()) { |
| case Type::UINT8: |
| case Type::INT8: |
| case Type::UINT16: |
| case Type::INT16: |
| case Type::INT32: |
| // TODO: also add int64, uint32, uint64 downcasts, if possible |
| break; |
| default: |
| cpp11::stop("Cannot convert Dictionary Array of type `%s` to R", |
| dict_array.type()->ToString().c_str()); |
| } |
| |
| dictionary_ = dict_array.dictionary(); |
| } |
| } |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::integers data(n); |
| data.attr("levels") = GetLevels(); |
| if (GetOrdered()) { |
| Rf_classgets(data, arrow::r::data::classes_ordered); |
| } else { |
| Rf_classgets(data, arrow::r::data::classes_factor); |
| } |
| return data; |
| } |
| |
| virtual bool Parallel() const { return false; } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(INTEGER(data) + start, n, NA_INTEGER); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const DictionaryArray& dict_array = |
| checked_cast<const DictionaryArray&>(*array.get()); |
| auto indices = dict_array.indices(); |
| switch (indices->type_id()) { |
| case Type::UINT8: |
| return Ingest_some_nulls_Impl<arrow::UInt8Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT8: |
| return Ingest_some_nulls_Impl<arrow::Int8Type>(data, array, start, n, |
| chunk_index); |
| case Type::UINT16: |
| return Ingest_some_nulls_Impl<arrow::UInt16Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT16: |
| return Ingest_some_nulls_Impl<arrow::Int16Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT32: |
| return Ingest_some_nulls_Impl<arrow::Int32Type>(data, array, start, n, |
| chunk_index); |
| default: |
| break; |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| template <typename Type> |
| Status Ingest_some_nulls_Impl(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| using index_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; |
| auto indices = checked_cast<const DictionaryArray&>(*array).indices(); |
| auto raw_indices = indices->data()->GetValues<index_type>(1); |
| |
| auto p_data = INTEGER(data) + start; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_INTEGER; |
| return Status::OK(); |
| }; |
| |
| // convert the 0-based indices from the arrow Array |
| // to 1-based indices used in R factors |
| if (need_unification_) { |
| // transpose the indices before converting |
| auto transposed = |
| reinterpret_cast<const int32_t*>(arrays_transpose_[chunk_index]->data()); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = transposed[raw_indices[i]] + 1; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } else { |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<int>(raw_indices[i]) + 1; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| } |
| |
| bool NeedUnification() { |
| int n = arrays_.size(); |
| if (n < 2) { |
| return false; |
| } |
| const auto& arr_first = checked_cast<const DictionaryArray&>(*arrays_[0]); |
| for (int i = 1; i < n; i++) { |
| const auto& arr = checked_cast<const DictionaryArray&>(*arrays_[i]); |
| if (!(arr_first.dictionary()->Equals(arr.dictionary()))) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| bool GetOrdered() const { |
| return checked_cast<const DictionaryArray&>(*arrays_[0]).dict_type()->ordered(); |
| } |
| |
| SEXP GetLevels() const { |
| // R factor levels must be type "character" so coerce `dict` to STRSXP |
| // TODO (npr): this coercion should be optional, "dictionariesAsFactors" ;) |
| // Alternative: preserve the logical type of the dictionary values |
| // (e.g. if dict is timestamp, return a POSIXt R vector, not factor) |
| if (dictionary_->type_id() != Type::STRING) { |
| cpp11::warning("Coercing dictionary values to R character factor levels"); |
| } |
| |
| SEXP vec = PROTECT(ArrayVector__as_vector(dictionary_->length(), dictionary_->type(), |
| {dictionary_})); |
| SEXP strings_vec = PROTECT(Rf_coerceVector(vec, STRSXP)); |
| UNPROTECT(2); |
| return strings_vec; |
| } |
| }; |
| |
| class Converter_Struct : public Converter { |
| public: |
| explicit Converter_Struct(const ArrayVector& arrays) : Converter(arrays), converters() { |
| auto first_array = checked_cast<const arrow::StructArray*>(this->arrays_[0].get()); |
| int nf = first_array->num_fields(); |
| for (int i = 0; i < nf; i++) { |
| converters.push_back( |
| Converter::Make(first_array->field(i)->type(), {first_array->field(i)})); |
| } |
| } |
| |
| SEXP Allocate(R_xlen_t n) const { |
| // allocate a data frame column to host each array |
| auto first_array = checked_cast<const arrow::StructArray*>(this->arrays_[0].get()); |
| auto type = first_array->struct_type(); |
| auto out = |
| arrow::r::to_r_list(converters, [n](const std::shared_ptr<Converter>& converter) { |
| return converter->Allocate(n); |
| }); |
| auto colnames = arrow::r::to_r_strings( |
| type->fields(), |
| [](const std::shared_ptr<Field>& field) { return field->name(); }); |
| out.attr(symbols::row_names) = arrow::r::short_row_names(n); |
| out.attr(R_NamesSymbol) = colnames; |
| out.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; |
| |
| return out; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| int nf = converters.size(); |
| for (int i = 0; i < nf; i++) { |
| StopIfNotOk(converters[i]->Ingest_all_nulls(VECTOR_ELT(data, i), start, n)); |
| } |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto struct_array = checked_cast<const arrow::StructArray*>(array.get()); |
| int nf = converters.size(); |
| // Flatten() deals with merging of nulls |
| auto arrays = ValueOrStop(struct_array->Flatten(gc_memory_pool())); |
| for (int i = 0; i < nf; i++) { |
| StopIfNotOk(converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], start, |
| n, chunk_index)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| virtual bool Parallel() const { |
| // this can only run in parallel if all the |
| // inner converters can |
| for (const auto& converter : converters) { |
| if (!converter->Parallel()) return false; |
| } |
| return true; |
| } |
| |
| private: |
| std::vector<std::shared_ptr<Converter>> converters; |
| }; |
| |
| double ms_to_seconds(int64_t ms) { return static_cast<double>(ms) / 1000; } |
| |
| class Converter_Date64 : public Converter { |
| public: |
| explicit Converter_Date64(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| Rf_classgets(data, arrow::r::data::classes_POSIXct); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = REAL(data) + start; |
| auto p_values = array->data()->GetValues<int64_t>(1); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i] / 1000); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename value_type, typename unit_type = TimeType> |
| class Converter_Time : public Converter { |
| public: |
| explicit Converter_Time(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| data.attr("class") = cpp11::writable::strings({"hms", "difftime"}); |
| |
| // hms difftime is always stored as "seconds" |
| data.attr("units") = cpp11::writable::strings({"secs"}); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| int multiplier = TimeUnit_multiplier(array); |
| |
| auto p_data = REAL(data) + start; |
| auto p_values = array->data()->GetValues<value_type>(1); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i]) / multiplier; |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| |
| private: |
| int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const { |
| // hms difftime is always "seconds", so multiply based on the Array's TimeUnit |
| switch (static_cast<unit_type*>(array->type().get())->unit()) { |
| case TimeUnit::SECOND: |
| return 1; |
| case TimeUnit::MILLI: |
| return 1000; |
| case TimeUnit::MICRO: |
| return 1000000; |
| case TimeUnit::NANO: |
| return 1000000000; |
| default: |
| return 0; |
| } |
| } |
| }; |
| |
| template <typename value_type> |
| class Converter_Timestamp : public Converter_Time<value_type, TimestampType> { |
| public: |
| explicit Converter_Timestamp(const ArrayVector& arrays) |
| : Converter_Time<value_type, TimestampType>(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| Rf_classgets(data, arrow::r::data::classes_POSIXct); |
| auto array = checked_cast<const TimestampArray*>(this->arrays_[0].get()); |
| auto array_type = checked_cast<const TimestampType*>(array->type().get()); |
| std::string tzone = array_type->timezone(); |
| if (tzone.size() > 0) { |
| data.attr("tzone") = tzone; |
| } |
| return data; |
| } |
| }; |
| |
| class Converter_Decimal : public Converter { |
| public: |
| explicit Converter_Decimal(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = REAL(data) + start; |
| const auto& decimals_arr = checked_cast<const arrow::Decimal128Array&>(*array); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = std::stod(decimals_arr.FormatValue(i).c_str()); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename ListArrayType> |
| class Converter_List : public Converter { |
| private: |
| std::shared_ptr<arrow::DataType> value_type_; |
| |
| public: |
| explicit Converter_List(const ArrayVector& arrays, |
| const std::shared_ptr<arrow::DataType>& value_type) |
| : Converter(arrays), value_type_(value_type) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::list res(n); |
| res.attr(R_ClassSymbol) = std::is_same<ListArrayType, ListArray>::value |
| ? arrow::r::data::classes_arrow_list |
| : arrow::r::data::classes_arrow_large_list; |
| |
| // Build an empty array to match value_type |
| std::unique_ptr<arrow::ArrayBuilder> builder; |
| StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), value_type_, &builder)); |
| |
| std::shared_ptr<arrow::Array> array; |
| StopIfNotOk(builder->Finish(&array)); |
| |
| // convert to an R object to store as the list' ptype |
| res.attr(arrow::r::symbols::ptype) = Array__as_vector(array); |
| |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| // nothing to do, list contain NULL by default |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto list_array = checked_cast<const ListArrayType*>(array.get()); |
| auto values_array = list_array->values(); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| auto slice = list_array->value_slice(i); |
| SET_VECTOR_ELT(data, i + start, Array__as_vector(slice)); |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| bool Parallel() const { return false; } |
| }; |
| |
| class Converter_FixedSizeList : public Converter { |
| private: |
| std::shared_ptr<arrow::DataType> value_type_; |
| int list_size_; |
| |
| public: |
| explicit Converter_FixedSizeList(const ArrayVector& arrays, |
| const std::shared_ptr<arrow::DataType>& value_type, |
| int list_size) |
| : Converter(arrays), value_type_(value_type), list_size_(list_size) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::list res(n); |
| Rf_classgets(res, arrow::r::data::classes_arrow_fixed_size_list); |
| res.attr(arrow::r::symbols::list_size) = Rf_ScalarInteger(list_size_); |
| |
| // Build an empty array to match value_type |
| std::unique_ptr<arrow::ArrayBuilder> builder; |
| StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), value_type_, &builder)); |
| |
| std::shared_ptr<arrow::Array> array; |
| StopIfNotOk(builder->Finish(&array)); |
| |
| // convert to an R object to store as the list' ptype |
| res.attr(arrow::r::symbols::ptype) = Array__as_vector(array); |
| |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| // nothing to do, list contain NULL by default |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const auto& fixed_size_list_array = checked_cast<const FixedSizeListArray&>(*array); |
| auto values_array = fixed_size_list_array.values(); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| auto slice = fixed_size_list_array.value_slice(i); |
| SET_VECTOR_ELT(data, i + start, Array__as_vector(slice)); |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| bool Parallel() const { return false; } |
| }; |
| |
| class Converter_Int64 : public Converter { |
| public: |
| explicit Converter_Int64(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| data.attr("class") = "integer64"; |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; |
| std::fill_n(p_data, n, NA_INT64); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<int64_t>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| |
| auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; |
| |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { |
| *p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64; |
| } |
| } else { |
| std::copy_n(p_values, n, p_data); |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| class Converter_Null : public Converter { |
| public: |
| explicit Converter_Null(const ArrayVector& arrays) : Converter(arrays) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP data = PROTECT(Rf_allocVector(LGLSXP, n)); |
| std::fill_n(LOGICAL(data), n, NA_LOGICAL); |
| Rf_classgets(data, Rf_mkString("vctrs_unspecified")); |
| UNPROTECT(1); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| return Status::OK(); |
| } |
| }; |
| |
| bool ArraysCanFitInteger(ArrayVector arrays) { |
| bool all_can_fit = true; |
| auto i32 = arrow::int32(); |
| for (const auto& array : arrays) { |
| if (all_can_fit) { |
| all_can_fit = arrow::IntegersCanFit(arrow::Datum(array), *i32).ok(); |
| } |
| } |
| return all_can_fit; |
| } |
| |
| bool GetBoolOption(const std::string& name, bool default_) { |
| SEXP getOption = Rf_install("getOption"); |
| cpp11::sexp call = Rf_lang2(getOption, Rf_mkString(name.c_str())); |
| cpp11::sexp res = Rf_eval(call, R_BaseEnv); |
| if (TYPEOF(res) == LGLSXP) { |
| return LOGICAL(res)[0] == TRUE; |
| } else { |
| return default_; |
| } |
| } |
| |
| std::shared_ptr<Converter> Converter::Make(const std::shared_ptr<DataType>& type, |
| ArrayVector arrays) { |
| if (arrays.empty()) { |
| // slight hack for the 0-row case since the converters expect at least one |
| // chunk to process. |
| arrays.push_back(ValueOrStop(arrow::MakeArrayOfNull(type, 0))); |
| } |
| |
| switch (type->id()) { |
| // direct support |
| case Type::INT32: |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int32Type>>( |
| std::move(arrays)); |
| |
| case Type::DOUBLE: |
| return std::make_shared<arrow::r::Converter_Double<arrow::DoubleType>>( |
| std::move(arrays)); |
| |
| // need to handle 1-bit case |
| case Type::BOOL: |
| return std::make_shared<arrow::r::Converter_Boolean>(std::move(arrays)); |
| |
| case Type::BINARY: |
| return std::make_shared<arrow::r::Converter_Binary<arrow::BinaryArray>>( |
| std::move(arrays)); |
| |
| case Type::LARGE_BINARY: |
| return std::make_shared<arrow::r::Converter_Binary<arrow::LargeBinaryArray>>( |
| std::move(arrays)); |
| |
| case Type::FIXED_SIZE_BINARY: |
| return std::make_shared<arrow::r::Converter_FixedSizeBinary>( |
| std::move(arrays), |
| checked_cast<const FixedSizeBinaryType&>(*type).byte_width()); |
| |
| // handle memory dense strings |
| case Type::STRING: |
| return std::make_shared<arrow::r::Converter_String<arrow::StringArray>>( |
| std::move(arrays)); |
| |
| case Type::LARGE_STRING: |
| return std::make_shared<arrow::r::Converter_String<arrow::LargeStringArray>>( |
| std::move(arrays)); |
| |
| case Type::DICTIONARY: |
| return std::make_shared<arrow::r::Converter_Dictionary>(std::move(arrays)); |
| |
| case Type::DATE32: |
| return std::make_shared<arrow::r::Converter_Date32>(std::move(arrays)); |
| |
| case Type::DATE64: |
| return std::make_shared<arrow::r::Converter_Date64>(std::move(arrays)); |
| |
| // promotions to integer vector |
| case Type::INT8: |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int8Type>>( |
| std::move(arrays)); |
| |
| case Type::UINT8: |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt8Type>>( |
| std::move(arrays)); |
| |
| case Type::INT16: |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int16Type>>( |
| std::move(arrays)); |
| |
| case Type::UINT16: |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt16Type>>( |
| std::move(arrays)); |
| |
| // promotions to numeric vector, if they don't fit into int32 |
| case Type::UINT32: |
| if (ArraysCanFitInteger(arrays)) { |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt32Type>>( |
| std::move(arrays)); |
| } else { |
| return std::make_shared<arrow::r::Converter_Double<arrow::UInt32Type>>( |
| std::move(arrays)); |
| } |
| |
| case Type::UINT64: |
| if (ArraysCanFitInteger(arrays)) { |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt64Type>>( |
| std::move(arrays)); |
| } else { |
| return std::make_shared<arrow::r::Converter_Double<arrow::UInt64Type>>( |
| std::move(arrays)); |
| } |
| |
| case Type::HALF_FLOAT: |
| return std::make_shared<arrow::r::Converter_Double<arrow::HalfFloatType>>( |
| std::move(arrays)); |
| |
| case Type::FLOAT: |
| return std::make_shared<arrow::r::Converter_Double<arrow::FloatType>>( |
| std::move(arrays)); |
| |
| // time32 and time64 |
| case Type::TIME32: |
| return std::make_shared<arrow::r::Converter_Time<int32_t>>(std::move(arrays)); |
| |
| case Type::TIME64: |
| return std::make_shared<arrow::r::Converter_Time<int64_t>>(std::move(arrays)); |
| |
| case Type::TIMESTAMP: |
| return std::make_shared<arrow::r::Converter_Timestamp<int64_t>>(std::move(arrays)); |
| |
| case Type::INT64: |
| // Prefer integer if it fits, unless option arrow.int64_downcast is `false` |
| if (GetBoolOption("arrow.int64_downcast", true) && ArraysCanFitInteger(arrays)) { |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int64Type>>( |
| std::move(arrays)); |
| } else { |
| return std::make_shared<arrow::r::Converter_Int64>(std::move(arrays)); |
| } |
| |
| case Type::DECIMAL: |
| return std::make_shared<arrow::r::Converter_Decimal>(std::move(arrays)); |
| |
| // nested |
| case Type::STRUCT: |
| return std::make_shared<arrow::r::Converter_Struct>(std::move(arrays)); |
| |
| case Type::LIST: |
| return std::make_shared<arrow::r::Converter_List<arrow::ListArray>>( |
| std::move(arrays), |
| checked_cast<const arrow::ListType*>(type.get())->value_type()); |
| |
| case Type::LARGE_LIST: |
| return std::make_shared<arrow::r::Converter_List<arrow::LargeListArray>>( |
| std::move(arrays), |
| checked_cast<const arrow::LargeListType*>(type.get())->value_type()); |
| |
| case Type::FIXED_SIZE_LIST: |
| return std::make_shared<arrow::r::Converter_FixedSizeList>( |
| std::move(arrays), |
| checked_cast<const arrow::FixedSizeListType&>(*type).value_type(), |
| checked_cast<const arrow::FixedSizeListType&>(*type).list_size()); |
| |
| case Type::NA: |
| return std::make_shared<arrow::r::Converter_Null>(std::move(arrays)); |
| |
| default: |
| break; |
| } |
| |
| cpp11::stop("cannot handle Array of type ", type->name().c_str()); |
| } |
| |
| cpp11::writable::list to_dataframe_serial( |
| int64_t nr, int64_t nc, const cpp11::writable::strings& names, |
| const std::vector<std::shared_ptr<Converter>>& converters) { |
| cpp11::writable::list tbl(nc); |
| for (int i = 0; i < nc; i++) { |
| SEXP column = tbl[i] = converters[i]->Allocate(nr); |
| StopIfNotOk(converters[i]->IngestSerial(column)); |
| } |
| tbl.attr(R_NamesSymbol) = names; |
| tbl.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; |
| tbl.attr(R_RowNamesSymbol) = arrow::r::short_row_names(nr); |
| return tbl; |
| } |
| |
| cpp11::writable::list to_dataframe_parallel( |
| int64_t nr, int64_t nc, const cpp11::writable::strings& names, |
| const std::vector<std::shared_ptr<Converter>>& converters) { |
| cpp11::writable::list tbl(nc); |
| |
| // task group to ingest data in parallel |
| auto tg = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); |
| |
| // allocate and start ingesting immediately the columns that |
| // can be ingested in parallel, i.e. when ingestion no longer |
| // need to happen on the main thread |
| for (int i = 0; i < nc; i++) { |
| // allocate data for column i |
| SEXP column = tbl[i] = converters[i]->Allocate(nr); |
| |
| // add a task to ingest data of that column if that can be done in parallel |
| if (converters[i]->Parallel()) { |
| converters[i]->IngestParallel(column, tg, converters[i]); |
| } |
| } |
| |
| arrow::Status status = arrow::Status::OK(); |
| |
| // ingest the columns that cannot be dealt with in parallel |
| for (int i = 0; i < nc; i++) { |
| if (!converters[i]->Parallel()) { |
| status &= converters[i]->IngestSerial(tbl[i]); |
| } |
| } |
| |
| // wait for the ingestion to be finished |
| status &= tg->Finish(); |
| |
| StopIfNotOk(status); |
| |
| tbl.attr(R_NamesSymbol) = names; |
| tbl.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; |
| tbl.attr(R_RowNamesSymbol) = arrow::r::short_row_names(nr); |
| |
| return tbl; |
| } |
| |
| } // namespace r |
| } // namespace arrow |
| |
| // [[arrow::export]] |
| SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) { |
| return arrow::r::ArrayVector__as_vector(array->length(), array->type(), {array}); |
| } |
| |
| // [[arrow::export]] |
| SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array) { |
| return arrow::r::ArrayVector__as_vector(chunked_array->length(), chunked_array->type(), |
| chunked_array->chunks()); |
| } |
| |
| // [[arrow::export]] |
| cpp11::writable::list RecordBatch__to_dataframe( |
| const std::shared_ptr<arrow::RecordBatch>& batch, bool use_threads) { |
| int64_t nc = batch->num_columns(); |
| int64_t nr = batch->num_rows(); |
| cpp11::writable::strings names(nc); |
| std::vector<arrow::ArrayVector> arrays(nc); |
| std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc); |
| |
| for (R_xlen_t i = 0; i < nc; i++) { |
| names[i] = batch->column_name(i); |
| arrays[i] = {batch->column(i)}; |
| converters[i] = arrow::r::Converter::Make(batch->column(i)->type(), arrays[i]); |
| } |
| |
| if (use_threads) { |
| return arrow::r::to_dataframe_parallel(nr, nc, names, converters); |
| } else { |
| return arrow::r::to_dataframe_serial(nr, nc, names, converters); |
| } |
| } |
| |
| // [[arrow::export]] |
| cpp11::writable::list Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, |
| bool use_threads) { |
| int64_t nc = table->num_columns(); |
| int64_t nr = table->num_rows(); |
| cpp11::writable::strings names(nc); |
| std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc); |
| |
| for (R_xlen_t i = 0; i < nc; i++) { |
| converters[i] = |
| arrow::r::Converter::Make(table->column(i)->type(), table->column(i)->chunks()); |
| names[i] = table->field(i)->name(); |
| } |
| |
| if (use_threads) { |
| return arrow::r::to_dataframe_parallel(nr, nc, names, converters); |
| } else { |
| return arrow::r::to_dataframe_serial(nr, nc, names, converters); |
| } |
| } |
| |
| #endif |