| // Licensed to the Apache Software Foundation (ASF) under one |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // or more contributor license agreements. See the NOTICE file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "./arrow_types.h" |
| |
| #include <arrow/array.h> |
| #include <arrow/builder.h> |
| #include <arrow/datum.h> |
| #include <arrow/table.h> |
| #include <arrow/util/bitmap_reader.h> |
| #include <arrow/util/bitmap_writer.h> |
| #include <arrow/util/int_util.h> |
| |
| #include <type_traits> |
| |
| #include "./extension.h" |
| #include "./r_task_group.h" |
| |
| namespace arrow { |
| |
| using internal::checked_cast; |
| using internal::IntegersCanFit; |
| |
| namespace r { |
| |
| class Converter { |
| public: |
| explicit Converter(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : chunked_array_(std::move(chunked_array)) {} |
| |
| virtual ~Converter() {} |
| |
| // Allocate a vector of the right R type for this converter |
| virtual SEXP Allocate(R_xlen_t n) const = 0; |
| |
| // data[ start:(start + n) ] = NA |
| virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const = 0; |
| |
| // ingest the values from the array into data[ start : (start + n)] |
| // |
| // chunk_index indicates which of the chunk is being ingested into data. This is |
| // ignored by most implementations and currently only used with Dictionary |
| // arrays. |
| virtual Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, |
| size_t chunk_index) const = 0; |
| |
| // can this run in parallel ? |
| virtual bool Parallel() const { return true; } |
| |
| // converter is passed as self to outlive the scope of Converter::Convert() |
| SEXP ScheduleConvertTasks(RTasks& tasks, std::shared_ptr<Converter> self) { |
| // try altrep first |
| SEXP alt = altrep::MakeAltrepVector(chunked_array_); |
| if (!Rf_isNull(alt)) { |
| return alt; |
| } |
| |
| // otherwise use the Converter api: |
| |
| // allocating the R vector upfront |
| SEXP out = PROTECT(Allocate(chunked_array_->length())); |
| |
| // for each array, fill the relevant slice of `out`, potentially in parallel |
| R_xlen_t k = 0, i = 0; |
| for (const auto& array : chunked_array_->chunks()) { |
| auto n_chunk = array->length(); |
| |
| tasks.Append(Parallel(), [=] { |
| if (array->null_count() == n_chunk) { |
| return self->Ingest_all_nulls(out, k, n_chunk); |
| } else { |
| return self->Ingest_some_nulls(out, array, k, n_chunk, i); |
| } |
| }); |
| |
| k += n_chunk; |
| i++; |
| } |
| |
| UNPROTECT(1); |
| return out; |
| } |
| |
| // Converter factory |
| static std::shared_ptr<Converter> Make( |
| const std::shared_ptr<ChunkedArray>& chunked_array); |
| |
| static SEXP LazyConvert(const std::shared_ptr<ChunkedArray>& chunked_array, |
| RTasks& tasks) { |
| auto converter = Make(chunked_array); |
| return converter->ScheduleConvertTasks(tasks, converter); |
| } |
| |
| static SEXP Convert(const std::shared_ptr<ChunkedArray>& chunked_array, |
| bool use_threads) { |
| RTasks tasks(use_threads); |
| SEXP out = PROTECT(Converter::LazyConvert(chunked_array, tasks)); |
| StopIfNotOk(tasks.Finish()); |
| |
| UNPROTECT(1); |
| return out; |
| } |
| |
| static SEXP Convert(const std::shared_ptr<Array>& array) { |
| return Convert(std::make_shared<ChunkedArray>(array), false); |
| } |
| |
| SEXP MaybeAltrep() { return altrep::MakeAltrepVector(chunked_array_); } |
| |
| protected: |
| std::shared_ptr<ChunkedArray> chunked_array_; |
| }; |
| |
| template <typename SetNonNull, typename SetNull> |
| Status IngestSome(const std::shared_ptr<arrow::Array>& array, R_xlen_t n, |
| SetNonNull&& set_non_null, SetNull&& set_null) { |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next()) { |
| if (bitmap_reader.IsSet()) { |
| RETURN_NOT_OK(set_non_null(i)); |
| } else { |
| RETURN_NOT_OK(set_null(i)); |
| } |
| } |
| |
| } else { |
| for (R_xlen_t i = 0; i < n; i++) { |
| RETURN_NOT_OK(set_non_null(i)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| template <typename SetNonNull> |
| Status IngestSome(const std::shared_ptr<arrow::Array>& array, R_xlen_t n, |
| SetNonNull&& set_non_null) { |
| auto nothing = [](R_xlen_t i) { return Status::OK(); }; |
| return IngestSome(array, n, std::forward<SetNonNull>(set_non_null), nothing); |
| } |
| |
| std::shared_ptr<Array> CreateEmptyArray(const std::shared_ptr<DataType>& array_type) { |
| std::unique_ptr<arrow::ArrayBuilder> builder; |
| StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), array_type, &builder)); |
| |
| std::shared_ptr<arrow::Array> array; |
| StopIfNotOk(builder->Finish(&array)); |
| return array; |
| } |
| |
| template <typename Type> |
| class Converter_Int : public Converter { |
| using value_type = typename TypeTraits<Type>::ArrayType::value_type; |
| |
| public: |
| explicit Converter_Int(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(INTSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(INTEGER(data) + start, n, NA_INTEGER); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<value_type>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = INTEGER(data) + start; |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<int>(p_values[i]); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_INTEGER; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename Type> |
| class Converter_Double : public Converter { |
| using value_type = typename TypeTraits<Type>::ArrayType::value_type; |
| |
| public: |
| explicit Converter_Double(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<value_type>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = REAL(data) + start; |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<value_type>(p_values[i]); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| class Converter_Date32 : public Converter { |
| public: |
| explicit Converter_Date32(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP data = PROTECT(Rf_allocVector(REALSXP, n)); |
| Rf_classgets(data, Rf_mkString("Date")); |
| UNPROTECT(1); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<int>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| auto p_data = REAL(data) + start; |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i]); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename StringArrayType> |
| struct Converter_String : public Converter { |
| public: |
| explicit Converter_String(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(STRSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| for (R_xlen_t i = 0; i < n; i++) { |
| SET_STRING_ELT(data, i + start, NA_STRING); |
| } |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_offset = array->data()->GetValues<int32_t>(1); |
| if (!p_offset) { |
| return Status::Invalid("Invalid offset buffer"); |
| } |
| auto p_strings = array->data()->GetValues<char>(2, *p_offset); |
| if (!p_strings) { |
| // There is an offset buffer, but the data buffer is null |
| // There is at least one value in the array and not all the values are null |
| // That means all values are either empty strings or nulls so there is nothing to do |
| |
| if (array->null_count()) { |
| arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), |
| array->offset(), n); |
| for (int i = 0; i < n; i++, null_reader.Next()) { |
| if (null_reader.IsNotSet()) { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| StringArrayType* string_array = static_cast<StringArrayType*>(array.get()); |
| |
| const bool all_valid = array->null_count() == 0; |
| const bool strip_out_nuls = GetBoolOption("arrow.skip_nul", false); |
| |
| bool nul_was_stripped = false; |
| |
| if (all_valid) { |
| // no need to watch for missing strings |
| cpp11::unwind_protect([&] { |
| if (strip_out_nuls) { |
| for (int i = 0; i < n; i++) { |
| SET_STRING_ELT(data, start + i, |
| r_string_from_view_strip_nul(string_array->GetView(i), |
| &nul_was_stripped)); |
| } |
| return; |
| } |
| |
| for (int i = 0; i < n; i++) { |
| SET_STRING_ELT(data, start + i, r_string_from_view(string_array->GetView(i))); |
| } |
| }); |
| } else { |
| cpp11::unwind_protect([&] { |
| arrow::internal::BitmapReader validity_reader(array->null_bitmap_data(), |
| array->offset(), n); |
| |
| if (strip_out_nuls) { |
| for (int i = 0; i < n; i++, validity_reader.Next()) { |
| if (validity_reader.IsSet()) { |
| SET_STRING_ELT(data, start + i, |
| r_string_from_view_strip_nul(string_array->GetView(i), |
| &nul_was_stripped)); |
| } else { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| return; |
| } |
| |
| for (int i = 0; i < n; i++, validity_reader.Next()) { |
| if (validity_reader.IsSet()) { |
| SET_STRING_ELT(data, start + i, r_string_from_view(string_array->GetView(i))); |
| } else { |
| SET_STRING_ELT(data, start + i, NA_STRING); |
| } |
| } |
| }); |
| } |
| |
| if (nul_was_stripped) { |
| cpp11::safe[Rf_warning]("Stripping '\\0' (nul) from character vector"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| bool Parallel() const { return false; } |
| |
| private: |
| static SEXP r_string_from_view(std::string_view view) { |
| return Rf_mkCharLenCE(view.data(), static_cast<int>(view.size()), CE_UTF8); |
| } |
| |
| static SEXP r_string_from_view_strip_nul(std::string_view view, |
| bool* nul_was_stripped) { |
| const char* old_string = view.data(); |
| |
| std::string stripped_string; |
| size_t stripped_len = 0, nul_count = 0; |
| |
| for (size_t i = 0; i < view.size(); i++) { |
| if (old_string[i] == '\0') { |
| ++nul_count; |
| |
| if (nul_count == 1) { |
| // first nul spotted: allocate stripped string storage |
| stripped_string = std::string(view); |
| stripped_len = i; |
| } |
| |
| // don't copy old_string[i] (which is \0) into stripped_string |
| continue; |
| } |
| |
| if (nul_count > 0) { |
| stripped_string[stripped_len++] = old_string[i]; |
| } |
| } |
| |
| if (nul_count > 0) { |
| *nul_was_stripped = true; |
| stripped_string.resize(stripped_len); |
| return r_string_from_view(stripped_string); |
| } |
| |
| return r_string_from_view(view); |
| } |
| }; |
| |
| class Converter_Boolean : public Converter { |
| public: |
| explicit Converter_Boolean(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(LGLSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(LOGICAL(data) + start, n, NA_LOGICAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = LOGICAL(data) + start; |
| auto p_bools = array->data()->GetValues<uint8_t>(1, 0); |
| if (!p_bools) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| |
| arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = data_reader.IsSet(); |
| data_reader.Next(); |
| return Status::OK(); |
| }; |
| |
| auto null_one = [&](R_xlen_t i) { |
| data_reader.Next(); |
| p_data[i] = NA_LOGICAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename ArrayType> |
| class Converter_Binary : public Converter { |
| public: |
| using offset_type = typename ArrayType::offset_type; |
| explicit Converter_Binary(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); |
| if (std::is_same<ArrayType, BinaryArray>::value) { |
| Rf_classgets(res, data::classes_arrow_binary); |
| } else { |
| Rf_classgets(res, data::classes_arrow_large_binary); |
| } |
| UNPROTECT(1); |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const ArrayType* binary_array = checked_cast<const ArrayType*>(array.get()); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| offset_type ni; |
| auto value = binary_array->GetValue(i, &ni); |
| if (ni > R_XLEN_T_MAX) { |
| return Status::RError("Array too big to be represented as a raw vector"); |
| } |
| SEXP raw = PROTECT(Rf_allocVector(RAWSXP, ni)); |
| std::copy(value, value + ni, RAW(raw)); |
| |
| SET_VECTOR_ELT(data, i + start, raw); |
| UNPROTECT(1); |
| |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| virtual bool Parallel() const { return false; } |
| }; |
| |
| class Converter_FixedSizeBinary : public Converter { |
| public: |
| explicit Converter_FixedSizeBinary(const std::shared_ptr<ChunkedArray>& chunked_array, |
| int byte_width) |
| : Converter(chunked_array), byte_width_(byte_width) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); |
| Rf_classgets(res, data::classes_arrow_fixed_size_binary); |
| Rf_setAttrib(res, symbols::byte_width, Rf_ScalarInteger(byte_width_)); |
| UNPROTECT(1); |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const FixedSizeBinaryArray* binary_array = |
| checked_cast<const FixedSizeBinaryArray*>(array.get()); |
| |
| int byte_width = binary_array->byte_width(); |
| auto ingest_one = [&, byte_width](R_xlen_t i) { |
| auto value = binary_array->GetValue(i); |
| SEXP raw = PROTECT(Rf_allocVector(RAWSXP, byte_width)); |
| std::copy(value, value + byte_width, RAW(raw)); |
| |
| SET_VECTOR_ELT(data, i + start, raw); |
| UNPROTECT(1); |
| |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| virtual bool Parallel() const { return false; } |
| |
| private: |
| int byte_width_; |
| }; |
| |
| bool DictionaryChunkArrayNeedUnification( |
| const std::shared_ptr<ChunkedArray>& chunked_array) { |
| int n = chunked_array->num_chunks(); |
| if (n < 2) { |
| return false; |
| } |
| const auto& arr_first = |
| internal::checked_cast<const DictionaryArray&>(*chunked_array->chunk(0)); |
| for (int i = 1; i < n; i++) { |
| const auto& arr = |
| internal::checked_cast<const DictionaryArray&>(*chunked_array->chunk(i)); |
| if (!(arr_first.dictionary()->Equals(arr.dictionary()))) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| class Converter_Dictionary : public Converter { |
| private: |
| bool need_unification_; |
| std::unique_ptr<arrow::DictionaryUnifier> unifier_; |
| std::vector<std::shared_ptr<Buffer>> arrays_transpose_; |
| std::shared_ptr<DataType> out_type_; |
| std::shared_ptr<Array> dictionary_; |
| |
| public: |
| explicit Converter_Dictionary(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array), |
| need_unification_(DictionaryChunkArrayNeedUnification(chunked_array)) { |
| if (need_unification_) { |
| const auto& arr_type = checked_cast<const DictionaryType&>(*chunked_array->type()); |
| unifier_ = ValueOrStop(DictionaryUnifier::Make(arr_type.value_type())); |
| |
| int n_arrays = chunked_array->num_chunks(); |
| arrays_transpose_.resize(n_arrays); |
| |
| for (int i = 0; i < n_arrays; i++) { |
| const auto& dict_i = |
| *checked_cast<const DictionaryArray&>(*chunked_array->chunk(i)).dictionary(); |
| StopIfNotOk(unifier_->Unify(dict_i, &arrays_transpose_[i])); |
| } |
| |
| StopIfNotOk(unifier_->GetResult(&out_type_, &dictionary_)); |
| } else { |
| const auto& dict_type = checked_cast<const DictionaryType&>(*chunked_array->type()); |
| |
| const auto& indices_type = *dict_type.index_type(); |
| switch (indices_type.id()) { |
| case Type::UINT8: |
| case Type::INT8: |
| case Type::UINT16: |
| case Type::INT16: |
| case Type::INT32: |
| // TODO: also add int64, uint32, uint64 downcasts, if possible |
| break; |
| default: |
| cpp11::stop("Cannot convert Dictionary Array of type `%s` to R", |
| dict_type.ToString().c_str()); |
| } |
| |
| if (chunked_array->num_chunks() > 0) { |
| // DictionaryChunkArrayNeedUnification() returned false so we can safely assume |
| // the dictionary of the first chunk applies everywhere |
| const auto& dict_array = |
| checked_cast<const DictionaryArray&>(*chunked_array->chunk(0)); |
| dictionary_ = dict_array.dictionary(); |
| } else { |
| dictionary_ = CreateEmptyArray(dict_type.value_type()); |
| } |
| } |
| } |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::integers data(n); |
| data.attr("levels") = GetLevels(); |
| if (GetOrdered()) { |
| Rf_classgets(data, arrow::r::data::classes_ordered); |
| } else { |
| Rf_classgets(data, arrow::r::data::classes_factor); |
| } |
| return data; |
| } |
| |
| virtual bool Parallel() const { return false; } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(INTEGER(data) + start, n, NA_INTEGER); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const DictionaryArray& dict_array = |
| checked_cast<const DictionaryArray&>(*array.get()); |
| auto indices = dict_array.indices(); |
| switch (indices->type_id()) { |
| case Type::UINT8: |
| return Ingest_some_nulls_Impl<arrow::UInt8Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT8: |
| return Ingest_some_nulls_Impl<arrow::Int8Type>(data, array, start, n, |
| chunk_index); |
| case Type::UINT16: |
| return Ingest_some_nulls_Impl<arrow::UInt16Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT16: |
| return Ingest_some_nulls_Impl<arrow::Int16Type>(data, array, start, n, |
| chunk_index); |
| case Type::INT32: |
| return Ingest_some_nulls_Impl<arrow::Int32Type>(data, array, start, n, |
| chunk_index); |
| default: |
| break; |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| template <typename Type> |
| Status Ingest_some_nulls_Impl(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| using index_type = typename arrow::TypeTraits<Type>::ArrayType::value_type; |
| auto indices = checked_cast<const DictionaryArray&>(*array).indices(); |
| auto raw_indices = indices->data()->GetValues<index_type>(1); |
| |
| auto p_data = INTEGER(data) + start; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_INTEGER; |
| return Status::OK(); |
| }; |
| |
| // convert the 0-based indices from the arrow Array |
| // to 1-based indices used in R factors |
| if (need_unification_) { |
| // transpose the indices before converting |
| auto transposed = |
| reinterpret_cast<const int32_t*>(arrays_transpose_[chunk_index]->data()); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = transposed[raw_indices[i]] + 1; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } else { |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<int>(raw_indices[i]) + 1; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| } |
| |
| bool GetOrdered() const { |
| return checked_cast<const DictionaryType&>(*chunked_array_->type()).ordered(); |
| } |
| |
| SEXP GetLevels() const { |
| // R factor levels must be type "character" so coerce `dict` to STRSXP |
| // TODO (npr): this coercion should be optional, "dictionariesAsFactors" ;) |
| // Alternative: preserve the logical type of the dictionary values |
| // (e.g. if dict is timestamp, return a POSIXt R vector, not factor) |
| if (dictionary_->type_id() != Type::STRING) { |
| cpp11::safe[Rf_warning]("Coercing dictionary values to R character factor levels"); |
| } |
| |
| SEXP vec = PROTECT(Converter::Convert(dictionary_)); |
| SEXP strings_vec = PROTECT(Rf_coerceVector(vec, STRSXP)); |
| UNPROTECT(2); |
| return strings_vec; |
| } |
| }; |
| |
| class Converter_Struct : public Converter { |
| public: |
| explicit Converter_Struct(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array), converters() { |
| const auto& struct_type = |
| checked_cast<const arrow::StructType&>(*chunked_array->type()); |
| |
| int nf = struct_type.num_fields(); |
| |
| std::shared_ptr<arrow::Table> array_as_table = |
| ValueOrStop(arrow::Table::FromChunkedStructArray(chunked_array)); |
| for (int i = 0; i < nf; i++) { |
| converters.push_back(Converter::Make(array_as_table->column(i))); |
| } |
| } |
| |
| SEXP Allocate(R_xlen_t n) const { |
| // allocate a data frame column to host each array |
| // If possible, a column is dealt with directly with altrep |
| auto type = |
| checked_cast<const arrow::StructType*>(this->chunked_array_->type().get()); |
| auto out = |
| arrow::r::to_r_list(converters, [n](const std::shared_ptr<Converter>& converter) { |
| SEXP out = converter->MaybeAltrep(); |
| if (Rf_isNull(out)) { |
| out = converter->Allocate(n); |
| } |
| return out; |
| }); |
| auto colnames = arrow::r::to_r_strings( |
| type->fields(), |
| [](const std::shared_ptr<Field>& field) { return field->name(); }); |
| out.attr(symbols::row_names) = arrow::r::short_row_names(static_cast<int>(n)); |
| out.attr(R_NamesSymbol) = colnames; |
| out.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; |
| |
| return out; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| int nf = static_cast<int>(converters.size()); |
| for (int i = 0; i < nf; i++) { |
| SEXP data_i = VECTOR_ELT(data, i); |
| |
| // only ingest if the column is not altrep |
| if (!altrep::is_unmaterialized_arrow_altrep(data_i)) { |
| StopIfNotOk(converters[i]->Ingest_all_nulls(data_i, start, n)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto struct_array = checked_cast<const arrow::StructArray*>(array.get()); |
| int nf = static_cast<int>(converters.size()); |
| // Flatten() deals with merging of nulls |
| auto arrays = ValueOrStop(struct_array->Flatten(gc_memory_pool())); |
| for (int i = 0; i < nf; i++) { |
| SEXP data_i = VECTOR_ELT(data, i); |
| |
| // only ingest if the column is not altrep |
| if (!altrep::is_unmaterialized_arrow_altrep(data_i)) { |
| StopIfNotOk(converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], |
| start, n, chunk_index)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| virtual bool Parallel() const { |
| // this can only run in parallel if all the |
| // inner converters can |
| for (const auto& converter : converters) { |
| if (!converter->Parallel()) return false; |
| } |
| return true; |
| } |
| |
| private: |
| std::vector<std::shared_ptr<Converter>> converters; |
| }; |
| |
| double ms_to_seconds(int64_t ms) { return static_cast<double>(ms) / 1000; } |
| |
| class Converter_Date64 : public Converter { |
| public: |
| explicit Converter_Date64(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| Rf_classgets(data, arrow::r::data::classes_POSIXct); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_data = REAL(data) + start; |
| auto p_values = array->data()->GetValues<int64_t>(1); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i] / 1000); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename value_type, typename unit_type = TimeType> |
| class Converter_Time : public Converter { |
| public: |
| explicit Converter_Time(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| data.attr("class") = cpp11::writable::strings({"hms", "difftime"}); |
| |
| // hms difftime is always stored as "seconds" |
| data.attr("units") = cpp11::writable::strings({"secs"}); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| int multiplier = TimeUnit_multiplier(array); |
| |
| auto p_data = REAL(data) + start; |
| auto p_values = array->data()->GetValues<value_type>(1); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i]) / multiplier; |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| |
| private: |
| int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const { |
| // hms difftime is always "seconds", so multiply based on the Array's TimeUnit |
| switch (static_cast<unit_type*>(array->type().get())->unit()) { |
| case TimeUnit::SECOND: |
| return 1; |
| case TimeUnit::MILLI: |
| return 1000; |
| case TimeUnit::MICRO: |
| return 1000000; |
| case TimeUnit::NANO: |
| return 1000000000; |
| default: |
| return 0; |
| } |
| } |
| }; |
| |
| template <typename value_type, typename unit_type = DurationType> |
| class Converter_Duration : public Converter { |
| public: |
| explicit Converter_Duration(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| data.attr("class") = "difftime"; |
| |
| // difftime is always stored as "seconds" |
| data.attr("units") = cpp11::writable::strings({"secs"}); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| int multiplier = TimeUnit_multiplier(array); |
| |
| auto p_data = REAL(data) + start; |
| auto p_values = array->data()->GetValues<value_type>(1); |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = static_cast<double>(p_values[i]) / multiplier; |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| |
| private: |
| int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const { |
| // difftime is always "seconds", so multiply based on the Array's TimeUnit |
| switch (static_cast<unit_type*>(array->type().get())->unit()) { |
| case TimeUnit::SECOND: |
| return 1; |
| case TimeUnit::MILLI: |
| return 1000; |
| case TimeUnit::MICRO: |
| return 1000000; |
| case TimeUnit::NANO: |
| return 1000000000; |
| default: |
| return 0; |
| } |
| } |
| }; |
| |
| template <typename value_type> |
| class Converter_Timestamp : public Converter_Time<value_type, TimestampType> { |
| public: |
| explicit Converter_Timestamp(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter_Time<value_type, TimestampType>(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| Rf_classgets(data, arrow::r::data::classes_POSIXct); |
| auto array_type = |
| checked_cast<const TimestampType*>(this->chunked_array_->type().get()); |
| std::string tzone = array_type->timezone(); |
| if (tzone.size() > 0) { |
| data.attr("tzone") = tzone; |
| } |
| return data; |
| } |
| }; |
| |
| template <typename Type> |
| class Converter_Decimal : public Converter { |
| public: |
| explicit Converter_Decimal(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| std::fill_n(REAL(data) + start, n, NA_REAL); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| using DecimalArray = typename TypeTraits<Type>::ArrayType; |
| auto p_data = REAL(data) + start; |
| const auto& decimals_arr = checked_cast<const DecimalArray&>(*array); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| p_data[i] = std::stod(decimals_arr.FormatValue(i).c_str()); |
| return Status::OK(); |
| }; |
| auto null_one = [&](R_xlen_t i) { |
| p_data[i] = NA_REAL; |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one, null_one); |
| } |
| }; |
| |
| template <typename ListArrayType> |
| class Converter_List : public Converter { |
| private: |
| std::shared_ptr<arrow::DataType> value_type_; |
| |
| public: |
| explicit Converter_List(const std::shared_ptr<ChunkedArray>& chunked_array, |
| const std::shared_ptr<arrow::DataType>& value_type) |
| : Converter(chunked_array), value_type_(value_type) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::list res(n); |
| |
| if (std::is_same<ListArrayType, MapArray>::value) { |
| res.attr(R_ClassSymbol) = arrow::r::data::classes_arrow_list; |
| } else if (std::is_same<ListArrayType, ListArray>::value) { |
| res.attr(R_ClassSymbol) = arrow::r::data::classes_arrow_list; |
| } else { |
| res.attr(R_ClassSymbol) = arrow::r::data::classes_arrow_large_list; |
| } |
| |
| std::shared_ptr<arrow::Array> array = CreateEmptyArray(value_type_); |
| |
| // convert to an R object to store as the list' ptype |
| res.attr(arrow::r::symbols::ptype) = Converter::Convert(array); |
| |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| // nothing to do, list contain NULL by default |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto list_array = checked_cast<const ListArrayType*>(array.get()); |
| auto values_array = list_array->values(); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| auto slice = list_array->value_slice(i); |
| SET_VECTOR_ELT(data, i + start, Converter::Convert(slice)); |
| return Status::OK(); |
| }; |
| |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| bool Parallel() const { return false; } |
| }; |
| |
| class Converter_FixedSizeList : public Converter { |
| private: |
| std::shared_ptr<arrow::DataType> value_type_; |
| int list_size_; |
| |
| public: |
| explicit Converter_FixedSizeList(const std::shared_ptr<ChunkedArray>& chunked_array, |
| const std::shared_ptr<arrow::DataType>& value_type, |
| int list_size) |
| : Converter(chunked_array), value_type_(value_type), list_size_(list_size) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::list res(n); |
| Rf_classgets(res, arrow::r::data::classes_arrow_fixed_size_list); |
| res.attr(arrow::r::symbols::list_size) = Rf_ScalarInteger(list_size_); |
| |
| std::shared_ptr<arrow::Array> array = CreateEmptyArray(value_type_); |
| |
| // convert to an R object to store as the list' ptype |
| res.attr(arrow::r::symbols::ptype) = Converter::Convert(array); |
| |
| return res; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| // nothing to do, list contain NULL by default |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| const auto& fixed_size_list_array = checked_cast<const FixedSizeListArray&>(*array); |
| auto values_array = fixed_size_list_array.values(); |
| |
| auto ingest_one = [&](R_xlen_t i) { |
| auto slice = fixed_size_list_array.value_slice(i); |
| SET_VECTOR_ELT(data, i + start, Converter::Convert(slice)); |
| return Status::OK(); |
| }; |
| return IngestSome(array, n, ingest_one); |
| } |
| |
| bool Parallel() const { return false; } |
| }; |
| |
| class Converter_Int64 : public Converter { |
| public: |
| explicit Converter_Int64(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| cpp11::writable::doubles data(n); |
| data.attr("class") = "integer64"; |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; |
| std::fill_n(p_data, n, NA_INT64); |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| auto p_values = array->data()->GetValues<int64_t>(1); |
| if (!p_values) { |
| return Status::Invalid("Invalid data buffer"); |
| } |
| |
| auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start; |
| |
| if (array->null_count()) { |
| internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), |
| n); |
| for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { |
| *p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64; |
| } |
| } else { |
| std::copy_n(p_values, n, p_data); |
| } |
| |
| return Status::OK(); |
| } |
| }; |
| |
| class Converter_Null : public Converter { |
| public: |
| explicit Converter_Null(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| SEXP data = PROTECT(Rf_allocVector(LGLSXP, n)); |
| std::fill_n(LOGICAL(data), n, NA_LOGICAL); |
| Rf_classgets(data, Rf_mkString("vctrs_unspecified")); |
| UNPROTECT(1); |
| return data; |
| } |
| |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| return Status::OK(); |
| } |
| }; |
| |
| // Unlike other types, conversion of ExtensionType (chunked) arrays occurs at |
| // R level via the ExtensionType (or subclass) R6 instance. We do this via Allocate, |
| // since it is called once per ChunkedArray. |
| class Converter_Extension : public Converter { |
| public: |
| explicit Converter_Extension(const std::shared_ptr<ChunkedArray>& chunked_array) |
| : Converter(chunked_array) {} |
| |
| SEXP Allocate(R_xlen_t n) const { |
| auto extension_type = |
| dynamic_cast<const RExtensionType*>(chunked_array_->type().get()); |
| if (extension_type == nullptr) { |
| Rf_error("Converter_Extension can't be used with a non-R extension type"); |
| } |
| |
| return extension_type->Convert(chunked_array_); |
| } |
| |
| // At this point we have already done the conversion |
| Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { |
| return Status::OK(); |
| } |
| |
| Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array, |
| R_xlen_t start, R_xlen_t n, size_t chunk_index) const { |
| return Status::OK(); |
| } |
| }; |
| |
| bool ArraysCanFitInteger(ArrayVector arrays) { |
| bool all_can_fit = true; |
| auto i32 = arrow::int32(); |
| for (const auto& array : arrays) { |
| if (all_can_fit) { |
| all_can_fit = arrow::IntegersCanFit(*array->data(), *i32).ok(); |
| } |
| } |
| return all_can_fit; |
| } |
| |
| bool GetBoolOption(const std::string& name, bool default_) { |
| SEXP getOption = Rf_install("getOption"); |
| cpp11::sexp call = Rf_lang2(getOption, Rf_mkString(name.c_str())); |
| cpp11::sexp res = Rf_eval(call, R_BaseEnv); |
| if (TYPEOF(res) == LGLSXP) { |
| return LOGICAL(res)[0] == TRUE; |
| } else { |
| return default_; |
| } |
| } |
| |
| std::shared_ptr<Converter> Converter::Make( |
| const std::shared_ptr<ChunkedArray>& chunked_array) { |
| const auto& type = chunked_array->type(); |
| switch (type->id()) { |
| // direct support |
| case Type::INT32: |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int32Type>>(chunked_array); |
| |
| case Type::DOUBLE: |
| return std::make_shared<arrow::r::Converter_Double<arrow::DoubleType>>( |
| chunked_array); |
| |
| // need to handle 1-bit case |
| case Type::BOOL: |
| return std::make_shared<arrow::r::Converter_Boolean>(chunked_array); |
| |
| case Type::BINARY: |
| return std::make_shared<arrow::r::Converter_Binary<arrow::BinaryArray>>( |
| chunked_array); |
| |
| case Type::LARGE_BINARY: |
| return std::make_shared<arrow::r::Converter_Binary<arrow::LargeBinaryArray>>( |
| chunked_array); |
| |
| case Type::FIXED_SIZE_BINARY: |
| return std::make_shared<arrow::r::Converter_FixedSizeBinary>( |
| chunked_array, checked_cast<const FixedSizeBinaryType&>(*type).byte_width()); |
| |
| // handle memory dense strings |
| case Type::STRING: |
| return std::make_shared<arrow::r::Converter_String<arrow::StringArray>>( |
| chunked_array); |
| |
| case Type::LARGE_STRING: |
| return std::make_shared<arrow::r::Converter_String<arrow::LargeStringArray>>( |
| chunked_array); |
| |
| case Type::DICTIONARY: |
| return std::make_shared<arrow::r::Converter_Dictionary>(chunked_array); |
| |
| case Type::DATE32: |
| return std::make_shared<arrow::r::Converter_Date32>(chunked_array); |
| |
| case Type::DATE64: |
| return std::make_shared<arrow::r::Converter_Date64>(chunked_array); |
| |
| // promotions to integer vector |
| case Type::INT8: |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int8Type>>(chunked_array); |
| |
| case Type::UINT8: |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt8Type>>(chunked_array); |
| |
| case Type::INT16: |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int16Type>>(chunked_array); |
| |
| case Type::UINT16: |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt16Type>>(chunked_array); |
| |
| // promotions to numeric vector, if they don't fit into int32 |
| case Type::UINT32: |
| if (ArraysCanFitInteger(chunked_array->chunks())) { |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt32Type>>( |
| chunked_array); |
| } else { |
| return std::make_shared<arrow::r::Converter_Double<arrow::UInt32Type>>( |
| chunked_array); |
| } |
| |
| case Type::UINT64: |
| if (ArraysCanFitInteger(chunked_array->chunks())) { |
| return std::make_shared<arrow::r::Converter_Int<arrow::UInt64Type>>( |
| chunked_array); |
| } else { |
| return std::make_shared<arrow::r::Converter_Double<arrow::UInt64Type>>( |
| chunked_array); |
| } |
| |
| case Type::HALF_FLOAT: |
| return std::make_shared<arrow::r::Converter_Double<arrow::HalfFloatType>>( |
| chunked_array); |
| |
| case Type::FLOAT: |
| return std::make_shared<arrow::r::Converter_Double<arrow::FloatType>>( |
| chunked_array); |
| |
| // time32 and time64 |
| case Type::TIME32: |
| return std::make_shared<arrow::r::Converter_Time<int32_t>>(chunked_array); |
| |
| case Type::TIME64: |
| return std::make_shared<arrow::r::Converter_Time<int64_t>>(chunked_array); |
| |
| case Type::DURATION: |
| return std::make_shared<arrow::r::Converter_Duration<int64_t>>(chunked_array); |
| |
| case Type::TIMESTAMP: |
| return std::make_shared<arrow::r::Converter_Timestamp<int64_t>>(chunked_array); |
| |
| case Type::INT64: |
| // Prefer integer if it fits, unless option arrow.int64_downcast is `false` |
| if (GetBoolOption("arrow.int64_downcast", true) && |
| ArraysCanFitInteger(chunked_array->chunks())) { |
| return std::make_shared<arrow::r::Converter_Int<arrow::Int64Type>>(chunked_array); |
| } else { |
| return std::make_shared<arrow::r::Converter_Int64>(chunked_array); |
| } |
| |
| case Type::DECIMAL32: |
| return std::make_shared<arrow::r::Converter_Decimal<Decimal32Type>>(chunked_array); |
| |
| case Type::DECIMAL64: |
| return std::make_shared<arrow::r::Converter_Decimal<Decimal64Type>>(chunked_array); |
| |
| case Type::DECIMAL128: |
| return std::make_shared<arrow::r::Converter_Decimal<Decimal128Type>>(chunked_array); |
| |
| case Type::DECIMAL256: |
| return std::make_shared<arrow::r::Converter_Decimal<Decimal256Type>>(chunked_array); |
| |
| // nested |
| case Type::STRUCT: |
| return std::make_shared<arrow::r::Converter_Struct>(chunked_array); |
| |
| case Type::LIST: |
| return std::make_shared<arrow::r::Converter_List<arrow::ListArray>>( |
| chunked_array, checked_cast<const arrow::ListType*>(type.get())->value_type()); |
| |
| case Type::LARGE_LIST: |
| return std::make_shared<arrow::r::Converter_List<arrow::LargeListArray>>( |
| chunked_array, |
| checked_cast<const arrow::LargeListType*>(type.get())->value_type()); |
| |
| case Type::FIXED_SIZE_LIST: |
| return std::make_shared<arrow::r::Converter_FixedSizeList>( |
| chunked_array, |
| checked_cast<const arrow::FixedSizeListType&>(*type).value_type(), |
| checked_cast<const arrow::FixedSizeListType&>(*type).list_size()); |
| |
| case Type::MAP: |
| return std::make_shared<arrow::r::Converter_List<arrow::MapArray>>( |
| chunked_array, checked_cast<const arrow::MapType&>(*type).value_type()); |
| |
| case Type::NA: |
| return std::make_shared<arrow::r::Converter_Null>(chunked_array); |
| |
| case Type::EXTENSION: |
| return std::make_shared<arrow::r::Converter_Extension>(chunked_array); |
| |
| default: |
| break; |
| } |
| |
| cpp11::stop("cannot handle Array of type <%s>", type->name().c_str()); |
| } |
| |
| std::shared_ptr<ChunkedArray> to_chunks(const std::shared_ptr<Array>& array) { |
| return std::make_shared<ChunkedArray>(array); |
| } |
| |
| std::shared_ptr<ChunkedArray> to_chunks( |
| const std::shared_ptr<ChunkedArray>& chunked_array) { |
| return chunked_array; |
| } |
| |
| template <typename Rectangle> |
| cpp11::writable::list to_data_frame(const std::shared_ptr<Rectangle>& data, |
| bool use_threads) { |
| int64_t nc = data->num_columns(); |
| int64_t nr = data->num_rows(); |
| cpp11::writable::strings names(nc); |
| |
| arrow::r::RTasks tasks(use_threads); |
| |
| cpp11::writable::list tbl(nc); |
| |
| for (int i = 0; i < nc; i++) { |
| names[i] = data->schema()->field(i)->name(); |
| tbl[i] = Converter::LazyConvert(to_chunks(data->column(i)), tasks); |
| } |
| |
| StopIfNotOk(tasks.Finish()); |
| |
| tbl.attr(R_NamesSymbol) = names; |
| tbl.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; |
| tbl.attr(R_RowNamesSymbol) = arrow::r::short_row_names(static_cast<int>(nr)); |
| |
| return tbl; |
| } |
| |
| } // namespace r |
| } // namespace arrow |
| |
| // [[arrow::export]] |
| SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) { |
| return arrow::r::Converter::Convert(array); |
| } |
| |
| // [[arrow::export]] |
| SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array, |
| bool use_threads = false) { |
| return arrow::r::Converter::Convert(chunked_array, use_threads); |
| } |
| |
| // [[arrow::export]] |
| cpp11::writable::list RecordBatch__to_dataframe( |
| const std::shared_ptr<arrow::RecordBatch>& batch, bool use_threads) { |
| return arrow::r::to_data_frame(batch, use_threads); |
| } |
| |
| // [[arrow::export]] |
| cpp11::writable::list Table__to_dataframe(const std::shared_ptr<arrow::Table>& table, |
| bool use_threads) { |
| return arrow::r::to_data_frame(table, use_threads); |
| } |