blob: 7fcb02bef3cb2066c0f9ccc76227a4be82eb5f23 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "./arrow_types.h"
#if defined(ARROW_R_WITH_ARROW)
#include <arrow/util/parallel.h>
#include <arrow/util/task-group.h>
namespace arrow {
namespace r {
using Rcpp::default_value;
using Rcpp::IntegerVector;
using Rcpp::LogicalVector_;
using Rcpp::no_init;
using Rcpp::Shield;
using Rcpp::StringVector_;
class Converter {
public:
explicit Converter(const ArrayVector& arrays) : arrays_(arrays) {}
virtual ~Converter() {}
// Allocate a vector of the right R type for this converter
virtual SEXP Allocate(R_xlen_t n) const = 0;
// data[ start:(start + n) ] = NA
virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const = 0;
// ingest the values from the array into data[ start : (start + n)]
virtual Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const = 0;
// ingest one array
Status IngestOne(SEXP data, const std::shared_ptr<arrow::Array>& array, R_xlen_t start,
R_xlen_t n) const {
if (array->null_count() == n) {
return Ingest_all_nulls(data, start, n);
} else {
return Ingest_some_nulls(data, array, start, n);
}
}
// can this run in parallel ?
virtual bool Parallel() const { return true; }
// Ingest all the arrays serially
Status IngestSerial(SEXP data) {
R_xlen_t k = 0;
for (const auto& array : arrays_) {
auto n_chunk = array->length();
RETURN_NOT_OK(IngestOne(data, array, k, n_chunk));
k += n_chunk;
}
return Status::OK();
}
// ingest the arrays in parallel
//
// for each array, add a task to the task group
//
// The task group is Finish() iun the caller
void IngestParallel(SEXP data, const std::shared_ptr<arrow::internal::TaskGroup>& tg) {
R_xlen_t k = 0;
for (const auto& array : arrays_) {
auto n_chunk = array->length();
tg->Append([=] { return IngestOne(data, array, k, n_chunk); });
k += n_chunk;
}
}
// Converter factory
static std::shared_ptr<Converter> Make(const ArrayVector& arrays);
protected:
const ArrayVector& arrays_;
};
// data[start:(start+n)] = NA
template <int RTYPE>
Status AllNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n) {
auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start;
std::fill_n(p_data, n, Rcpp::default_value<RTYPE>());
return Status::OK();
}
// ingest the data from `array` into a slice of `data`
//
// each element goes through `lambda` when some conversion is needed
template <int RTYPE, typename array_value_type, typename Lambda>
Status SomeNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n,
const array_value_type* p_values,
const std::shared_ptr<arrow::Array>& array, Lambda lambda) {
if (!p_values) {
return Status::Invalid("Invalid data buffer");
}
auto p_data = Rcpp::internal::r_vector_start<RTYPE>(data) + start;
if (array->null_count()) {
arrow::internal::BitmapReader bitmap_reader(array->null_bitmap()->data(),
array->offset(), n);
for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data, ++p_values) {
*p_data = bitmap_reader.IsSet() ? lambda(*p_values) : default_value<RTYPE>();
}
} else {
std::transform(p_values, p_values + n, p_data, lambda);
}
return Status::OK();
}
// Allocate + Ingest
SEXP ArrayVector__as_vector(R_xlen_t n, const ArrayVector& arrays) {
auto converter = Converter::Make(arrays);
Shield<SEXP> data(converter->Allocate(n));
STOP_IF_NOT_OK(converter->IngestSerial(data));
return data;
}
template <int RTYPE>
class Converter_SimpleArray : public Converter {
using Vector = Rcpp::Vector<RTYPE, Rcpp::NoProtectStorage>;
using value_type = typename Vector::stored_type;
public:
explicit Converter_SimpleArray(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const { return Vector(no_init(n)); }
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<RTYPE>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto p_values = array->data()->GetValues<value_type>(1);
auto echo = [](value_type value) { return value; };
return SomeNull_Ingest<RTYPE, value_type>(data, start, n, p_values, array, echo);
}
};
class Converter_Date32 : public Converter_SimpleArray<INTSXP> {
public:
explicit Converter_Date32(const ArrayVector& arrays)
: Converter_SimpleArray<INTSXP>(arrays) {}
SEXP Allocate(R_xlen_t n) const {
IntegerVector data(no_init(n));
data.attr("class") = "Date";
return data;
}
};
struct Converter_String : public Converter {
public:
explicit Converter_String(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const { return StringVector_(no_init(n)); }
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<STRSXP>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto p_offset = array->data()->GetValues<int32_t>(1);
if (!p_offset) {
return Status::Invalid("Invalid offset buffer");
}
auto p_strings = array->data()->GetValues<char>(2, *p_offset);
if (!p_strings) {
// There is an offset buffer, but the data buffer is null
// There is at least one value in the array and not all the values are null
// That means all values are either empty strings or nulls so there is nothing to do
if (array->null_count()) {
arrow::internal::BitmapReader null_reader(array->null_bitmap_data(),
array->offset(), n);
for (int i = 0; i < n; i++, null_reader.Next()) {
if (null_reader.IsNotSet()) {
SET_STRING_ELT(data, start + i, NA_STRING);
}
}
}
return Status::OK();
}
arrow::StringArray* string_array = static_cast<arrow::StringArray*>(array.get());
if (array->null_count()) {
// need to watch for nulls
arrow::internal::BitmapReader null_reader(array->null_bitmap_data(),
array->offset(), n);
for (int i = 0; i < n; i++, null_reader.Next()) {
if (null_reader.IsSet()) {
SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i)));
} else {
SET_STRING_ELT(data, start + i, NA_STRING);
}
}
} else {
for (int i = 0; i < n; i++) {
SET_STRING_ELT(data, start + i, r_string(string_array->GetString(i)));
}
}
return Status::OK();
}
bool Parallel() const { return false; }
inline SEXP r_string(const arrow::util::string_view& view) const {
return Rf_mkCharLenCE(view.data(), view.size(), CE_UTF8);
}
};
class Converter_Boolean : public Converter {
public:
explicit Converter_Boolean(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const { return LogicalVector_(no_init(n)); }
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<LGLSXP>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto p_data = Rcpp::internal::r_vector_start<LGLSXP>(data) + start;
auto p_bools = array->data()->GetValues<uint8_t>(1, 0);
if (!p_bools) {
return Status::Invalid("Invalid data buffer");
}
arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n);
if (array->null_count()) {
arrow::internal::BitmapReader null_reader(array->null_bitmap()->data(),
array->offset(), n);
for (R_xlen_t i = 0; i < n; i++, data_reader.Next(), null_reader.Next(), ++p_data) {
*p_data = null_reader.IsSet() ? data_reader.IsSet() : NA_LOGICAL;
}
} else {
for (R_xlen_t i = 0; i < n; i++, data_reader.Next(), ++p_data) {
*p_data = data_reader.IsSet();
}
}
return Status::OK();
}
};
class Converter_Dictionary : public Converter {
public:
explicit Converter_Dictionary(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const {
IntegerVector data(no_init(n));
auto dict_array = static_cast<DictionaryArray*>(Converter::arrays_[0].get());
auto dict = dict_array->dictionary();
auto indices = dict_array->indices();
switch (indices->type_id()) {
case Type::UINT8:
case Type::INT8:
case Type::UINT16:
case Type::INT16:
case Type::INT32:
break;
default:
Rcpp::stop("Cannot convert Dictionary Array of type `%s` to R",
dict_array->type()->ToString());
}
if (dict->type_id() != Type::STRING) {
Rcpp::stop("Cannot convert Dictionary Array of type `%s` to R",
dict_array->type()->ToString());
}
bool ordered = dict_array->dict_type()->ordered();
data.attr("levels") = ArrayVector__as_vector(dict->length(), {dict});
if (ordered) {
data.attr("class") = Rcpp::CharacterVector::create("ordered", "factor");
} else {
data.attr("class") = "factor";
}
return data;
}
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<INTSXP>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
DictionaryArray* dict_array = static_cast<DictionaryArray*>(array.get());
auto indices = dict_array->indices();
switch (indices->type_id()) {
case Type::UINT8:
return Ingest_some_nulls_Impl<arrow::UInt8Type>(data, array, start, n);
case Type::INT8:
return Ingest_some_nulls_Impl<arrow::Int8Type>(data, array, start, n);
case Type::UINT16:
return Ingest_some_nulls_Impl<arrow::UInt16Type>(data, array, start, n);
case Type::INT16:
return Ingest_some_nulls_Impl<arrow::Int16Type>(data, array, start, n);
case Type::INT32:
return Ingest_some_nulls_Impl<arrow::Int32Type>(data, array, start, n);
default:
break;
}
return Status::OK();
}
private:
template <typename Type>
Status Ingest_some_nulls_Impl(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
std::shared_ptr<Array> indices =
static_cast<DictionaryArray*>(array.get())->indices();
// convert the 0-based indices from the arrow Array
// to 1-based indices used in R factors
auto to_r_index = [](value_type value) { return static_cast<int>(value) + 1; };
return SomeNull_Ingest<INTSXP, value_type>(
data, start, n, indices->data()->GetValues<value_type>(1), indices, to_r_index);
}
};
class Converter_Struct : public Converter {
public:
explicit Converter_Struct(const ArrayVector& arrays) : Converter(arrays), converters() {
auto first_array =
internal::checked_cast<arrow::StructArray*>(Converter::arrays_[0].get());
int nf = first_array->num_fields();
for (int i = 0; i < nf; i++) {
converters.push_back(Converter::Make({first_array->field(i)}));
}
}
SEXP Allocate(R_xlen_t n) const {
// allocate a data frame column to host each array
auto first_array =
internal::checked_cast<arrow::StructArray*>(Converter::arrays_[0].get());
auto type = first_array->struct_type();
int nf = first_array->num_fields();
Rcpp::List out(nf);
Rcpp::CharacterVector colnames(nf);
for (int i = 0; i < nf; i++) {
out[i] = converters[i]->Allocate(n);
colnames[i] = type->child(i)->name();
}
IntegerVector rn(2);
rn[0] = NA_INTEGER;
rn[1] = -n;
Rf_setAttrib(out, symbols::row_names, rn);
Rf_setAttrib(out, R_NamesSymbol, colnames);
Rf_setAttrib(out, R_ClassSymbol, Rf_mkString("data.frame"));
return out;
}
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
int nf = converters.size();
for (int i = 0; i < nf; i++) {
STOP_IF_NOT_OK(converters[i]->Ingest_all_nulls(VECTOR_ELT(data, i), start, n));
}
return Status::OK();
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto struct_array = internal::checked_cast<arrow::StructArray*>(array.get());
int nf = converters.size();
// Flatten() deals with merging of nulls
ArrayVector arrays(nf);
STOP_IF_NOT_OK(struct_array->Flatten(default_memory_pool(), &arrays));
for (int i = 0; i < nf; i++) {
STOP_IF_NOT_OK(
converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], start, n));
}
return Status::OK();
}
private:
std::vector<std::shared_ptr<Converter>> converters;
};
double ms_to_seconds(int64_t ms) { return static_cast<double>(ms / 1000); }
class Converter_Date64 : public Converter {
public:
explicit Converter_Date64(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const {
Rcpp::NumericVector data(no_init(n));
data.attr("class") = Rcpp::CharacterVector::create("POSIXct", "POSIXt");
return data;
}
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<REALSXP>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto convert = [](int64_t ms) { return static_cast<double>(ms / 1000); };
return SomeNull_Ingest<REALSXP, int64_t>(
data, start, n, array->data()->GetValues<int64_t>(1), array, convert);
}
};
template <int RTYPE, typename Type>
class Converter_Promotion : public Converter {
using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
using value_type = typename TypeTraits<Type>::ArrayType::value_type;
public:
explicit Converter_Promotion(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const {
return Rcpp::Vector<RTYPE, Rcpp::NoProtectStorage>(no_init(n));
}
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<RTYPE>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto convert = [](value_type value) { return static_cast<r_stored_type>(value); };
return SomeNull_Ingest<RTYPE, value_type>(
data, start, n, array->data()->GetValues<value_type>(1), array, convert);
}
private:
static r_stored_type value_convert(value_type value) {
return static_cast<r_stored_type>(value);
}
};
template <typename value_type>
class Converter_Time : public Converter {
public:
explicit Converter_Time(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const {
Rcpp::NumericVector data(no_init(n));
data.attr("class") = Rcpp::CharacterVector::create("hms", "difftime");
data.attr("units") = Rcpp::CharacterVector::create("secs");
return data;
}
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<REALSXP>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
int multiplier = TimeUnit_multiplier(array);
auto convert = [=](value_type value) {
return static_cast<double>(value) / multiplier;
};
return SomeNull_Ingest<REALSXP, value_type>(
data, start, n, array->data()->GetValues<value_type>(1), array, convert);
}
private:
int TimeUnit_multiplier(const std::shared_ptr<Array>& array) const {
switch (static_cast<TimeType*>(array->type().get())->unit()) {
case TimeUnit::SECOND:
return 1;
case TimeUnit::MILLI:
return 1000;
case TimeUnit::MICRO:
return 1000000;
case TimeUnit::NANO:
return 1000000000;
default:
return 0;
}
}
};
template <typename value_type>
class Converter_Timestamp : public Converter_Time<value_type> {
public:
explicit Converter_Timestamp(const ArrayVector& arrays)
: Converter_Time<value_type>(arrays) {}
SEXP Allocate(R_xlen_t n) const {
Rcpp::NumericVector data(no_init(n));
data.attr("class") = Rcpp::CharacterVector::create("POSIXct", "POSIXt");
return data;
}
};
class Converter_Decimal : public Converter {
public:
explicit Converter_Decimal(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const { return Rcpp::NumericVector_(no_init(n)); }
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
return AllNull_Ingest<REALSXP>(data, start, n);
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto p_data = Rcpp::internal::r_vector_start<REALSXP>(data) + start;
const auto& decimals_arr =
internal::checked_cast<const arrow::Decimal128Array&>(*array);
if (array->null_count()) {
internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(),
n);
for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) {
*p_data = bitmap_reader.IsSet() ? std::stod(decimals_arr.FormatValue(i).c_str())
: NA_REAL;
}
} else {
for (R_xlen_t i = 0; i < n; i++, ++p_data) {
*p_data = std::stod(decimals_arr.FormatValue(i).c_str());
}
}
return Status::OK();
}
};
class Converter_List : public Converter {
public:
explicit Converter_List(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const { return Rcpp::List(no_init(n)); }
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
// nothing to do, list contain NULL by default
return Status::OK();
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
using internal::checked_cast;
auto list_array = checked_cast<arrow::ListArray*>(array.get());
auto values_array = list_array->values();
auto ingest_one = [&](R_xlen_t i) {
auto slice =
values_array->Slice(list_array->value_offset(i), list_array->value_length(i));
SET_VECTOR_ELT(data, i + start, Array__as_vector(slice));
};
if (array->null_count()) {
internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(),
n);
for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next()) {
if (bitmap_reader.IsSet()) ingest_one(i);
}
} else {
for (R_xlen_t i = 0; i < n; i++) {
ingest_one(i);
}
}
return Status::OK();
}
};
class Converter_Int64 : public Converter {
public:
explicit Converter_Int64(const ArrayVector& arrays) : Converter(arrays) {}
SEXP Allocate(R_xlen_t n) const {
Rcpp::NumericVector data(no_init(n));
data.attr("class") = "integer64";
return data;
}
Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const {
auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start;
std::fill_n(p_data, n, NA_INT64);
return Status::OK();
}
Status Ingest_some_nulls(SEXP data, const std::shared_ptr<arrow::Array>& array,
R_xlen_t start, R_xlen_t n) const {
auto p_values = array->data()->GetValues<int64_t>(1);
if (!p_values) {
return Status::Invalid("Invalid data buffer");
}
auto p_data = reinterpret_cast<int64_t*>(REAL(data)) + start;
if (array->null_count()) {
internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(),
n);
for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) {
*p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64;
}
} else {
std::copy_n(p_values, n, p_data);
}
return Status::OK();
}
};
std::shared_ptr<Converter> Converter::Make(const ArrayVector& arrays) {
switch (arrays[0]->type_id()) {
// direct support
case Type::INT32:
return std::make_shared<arrow::r::Converter_SimpleArray<INTSXP>>(arrays);
case Type::DOUBLE:
return std::make_shared<arrow::r::Converter_SimpleArray<REALSXP>>(arrays);
// need to handle 1-bit case
case Type::BOOL:
return std::make_shared<arrow::r::Converter_Boolean>(arrays);
// handle memory dense strings
case Type::STRING:
return std::make_shared<arrow::r::Converter_String>(arrays);
case Type::DICTIONARY:
return std::make_shared<arrow::r::Converter_Dictionary>(arrays);
case Type::DATE32:
return std::make_shared<arrow::r::Converter_Date32>(arrays);
case Type::DATE64:
return std::make_shared<arrow::r::Converter_Date64>(arrays);
// promotions to integer vector
case Type::INT8:
return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::Int8Type>>(
arrays);
case Type::UINT8:
return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::UInt8Type>>(
arrays);
case Type::INT16:
return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::Int16Type>>(
arrays);
case Type::UINT16:
return std::make_shared<arrow::r::Converter_Promotion<INTSXP, arrow::UInt16Type>>(
arrays);
// promotions to numeric vector
case Type::UINT32:
return std::make_shared<arrow::r::Converter_Promotion<REALSXP, arrow::UInt32Type>>(
arrays);
case Type::HALF_FLOAT:
return std::make_shared<
arrow::r::Converter_Promotion<REALSXP, arrow::HalfFloatType>>(arrays);
case Type::FLOAT:
return std::make_shared<arrow::r::Converter_Promotion<REALSXP, arrow::FloatType>>(
arrays);
// time32 ane time64
case Type::TIME32:
return std::make_shared<arrow::r::Converter_Time<int32_t>>(arrays);
case Type::TIME64:
return std::make_shared<arrow::r::Converter_Time<int64_t>>(arrays);
case Type::TIMESTAMP:
return std::make_shared<arrow::r::Converter_Timestamp<int64_t>>(arrays);
case Type::INT64:
return std::make_shared<arrow::r::Converter_Int64>(arrays);
case Type::DECIMAL:
return std::make_shared<arrow::r::Converter_Decimal>(arrays);
// nested
case Type::STRUCT:
return std::make_shared<arrow::r::Converter_Struct>(arrays);
case Type::LIST:
return std::make_shared<arrow::r::Converter_List>(arrays);
default:
break;
}
Rcpp::stop(tfm::format("cannot handle Array of type %s", arrays[0]->type()->name()));
return nullptr;
}
Rcpp::List to_dataframe_serial(
int64_t nr, int64_t nc, const Rcpp::CharacterVector& names,
const std::vector<std::shared_ptr<Converter>>& converters) {
Rcpp::List tbl(nc);
for (int i = 0; i < nc; i++) {
SEXP column = tbl[i] = converters[i]->Allocate(nr);
STOP_IF_NOT_OK(converters[i]->IngestSerial(column));
}
tbl.attr("names") = names;
tbl.attr("class") = Rcpp::CharacterVector::create("tbl_df", "tbl", "data.frame");
tbl.attr("row.names") = Rcpp::IntegerVector::create(NA_INTEGER, -nr);
return tbl;
}
Rcpp::List to_dataframe_parallel(
int64_t nr, int64_t nc, const Rcpp::CharacterVector& names,
const std::vector<std::shared_ptr<Converter>>& converters) {
Rcpp::List tbl(nc);
// task group to ingest data in parallel
auto tg = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool());
// allocate and start ingesting immediately the columns that
// can be ingested in parallel, i.e. when ingestion no longer
// need to happen on the main thread
for (int i = 0; i < nc; i++) {
// allocate data for column i
SEXP column = tbl[i] = converters[i]->Allocate(nr);
// add a task to ingest data of that column if that can be done in parallel
if (converters[i]->Parallel()) {
converters[i]->IngestParallel(column, tg);
}
}
arrow::Status status = arrow::Status::OK();
// ingest the columns that cannot be dealt with in parallel
for (int i = 0; i < nc; i++) {
if (!converters[i]->Parallel()) {
status &= converters[i]->IngestSerial(tbl[i]);
}
}
// wait for the ingestion to be finished
status &= tg->Finish();
STOP_IF_NOT_OK(status);
tbl.attr("names") = names;
tbl.attr("class") = Rcpp::CharacterVector::create("tbl_df", "tbl", "data.frame");
tbl.attr("row.names") = IntegerVector::create(NA_INTEGER, -nr);
return tbl;
}
} // namespace r
} // namespace arrow
// [[arrow::export]]
SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array) {
return arrow::r::ArrayVector__as_vector(array->length(), {array});
}
// [[arrow::export]]
SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array) {
return arrow::r::ArrayVector__as_vector(chunked_array->length(),
chunked_array->chunks());
}
// [[arrow::export]]
Rcpp::List RecordBatch__to_dataframe(const std::shared_ptr<arrow::RecordBatch>& batch,
bool use_threads) {
int64_t nc = batch->num_columns();
int64_t nr = batch->num_rows();
Rcpp::CharacterVector names(nc);
std::vector<arrow::ArrayVector> arrays(nc);
std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc);
for (int64_t i = 0; i < nc; i++) {
names[i] = batch->column_name(i);
arrays[i] = {batch->column(i)};
converters[i] = arrow::r::Converter::Make(arrays[i]);
}
if (use_threads) {
return arrow::r::to_dataframe_parallel(nr, nc, names, converters);
} else {
return arrow::r::to_dataframe_serial(nr, nc, names, converters);
}
}
// [[arrow::export]]
Rcpp::List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table,
bool use_threads) {
int64_t nc = table->num_columns();
int64_t nr = table->num_rows();
Rcpp::CharacterVector names(nc);
std::vector<std::shared_ptr<arrow::r::Converter>> converters(nc);
for (int64_t i = 0; i < nc; i++) {
converters[i] = arrow::r::Converter::Make(table->column(i)->data()->chunks());
names[i] = table->column(i)->name();
}
if (use_threads) {
return arrow::r::to_dataframe_parallel(nr, nc, names, converters);
} else {
return arrow::r::to_dataframe_serial(nr, nc, names, converters);
}
}
#endif