blob: b1c30eec0b3c3fe625f6f834ebaa77a9db171d4f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/ipc/feather.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>
#include <sstream> // IWYU pragma: keep
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include <flatbuffers/flatbuffers.h>
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/metadata_internal.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/util.h"
#include "arrow/ipc/writer.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/visitor_inline.h"
#include "generated/feather_generated.h"
namespace arrow {
using internal::checked_cast;
using internal::make_unique;
class ExtensionType;
namespace ipc {
namespace feather {
namespace {
using FBB = flatbuffers::FlatBufferBuilder;
constexpr const char* kFeatherV1MagicBytes = "FEA1";
constexpr const int kFeatherDefaultAlignment = 8;
const uint8_t kPaddingBytes[kFeatherDefaultAlignment] = {0};
inline int64_t PaddedLength(int64_t nbytes) {
static const int64_t alignment = kFeatherDefaultAlignment;
return ((nbytes + alignment - 1) / alignment) * alignment;
}
Status WritePaddedWithOffset(io::OutputStream* stream, const uint8_t* data,
int64_t bit_offset, const int64_t length,
int64_t* bytes_written) {
data = data + bit_offset / 8;
uint8_t bit_shift = static_cast<uint8_t>(bit_offset % 8);
if (bit_offset == 0) {
RETURN_NOT_OK(stream->Write(data, length));
} else {
constexpr int64_t buffersize = 256;
uint8_t buffer[buffersize];
const uint8_t lshift = static_cast<uint8_t>(8 - bit_shift);
const uint8_t* buffer_end = buffer + buffersize;
uint8_t* buffer_it = buffer;
for (const uint8_t* end = data + length; data != end;) {
uint8_t r = static_cast<uint8_t>(*data++ >> bit_shift);
uint8_t l = static_cast<uint8_t>(*data << lshift);
uint8_t value = l | r;
*buffer_it++ = value;
if (buffer_it == buffer_end) {
RETURN_NOT_OK(stream->Write(buffer, buffersize));
buffer_it = buffer;
}
}
if (buffer_it != buffer) {
RETURN_NOT_OK(stream->Write(buffer, buffer_it - buffer));
}
}
int64_t remainder = PaddedLength(length) - length;
if (remainder != 0) {
RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder));
}
*bytes_written = length + remainder;
return Status::OK();
}
Status WritePadded(io::OutputStream* stream, const uint8_t* data, int64_t length,
int64_t* bytes_written) {
return WritePaddedWithOffset(stream, data, /*bit_offset=*/0, length, bytes_written);
}
struct ColumnType {
enum type { PRIMITIVE, CATEGORY, TIMESTAMP, DATE, TIME };
};
inline TimeUnit::type FromFlatbufferEnum(fbs::TimeUnit unit) {
return static_cast<TimeUnit::type>(static_cast<int>(unit));
}
/// For compatibility, we need to write any data sometimes just to keep producing
/// files that can be read with an older reader.
Status WritePaddedBlank(io::OutputStream* stream, int64_t length,
int64_t* bytes_written) {
const uint8_t null = 0;
for (int64_t i = 0; i < length; i++) {
RETURN_NOT_OK(stream->Write(&null, 1));
}
int64_t remainder = PaddedLength(length) - length;
if (remainder != 0) {
RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder));
}
*bytes_written = length + remainder;
return Status::OK();
}
// ----------------------------------------------------------------------
// ReaderV1
class ReaderV1 : public Reader {
public:
Status Open(const std::shared_ptr<io::RandomAccessFile>& source) {
source_ = source;
ARROW_ASSIGN_OR_RAISE(int64_t size, source->GetSize());
int magic_size = static_cast<int>(strlen(kFeatherV1MagicBytes));
int footer_size = magic_size + static_cast<int>(sizeof(uint32_t));
// Now get the footer and verify
ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(size - footer_size, footer_size));
if (memcmp(buffer->data() + sizeof(uint32_t), kFeatherV1MagicBytes, magic_size)) {
return Status::Invalid("Feather file footer incomplete");
}
uint32_t metadata_length = *reinterpret_cast<const uint32_t*>(buffer->data());
if (size < magic_size + footer_size + metadata_length) {
return Status::Invalid("File is smaller than indicated metadata size");
}
ARROW_ASSIGN_OR_RAISE(
metadata_buffer_,
source->ReadAt(size - footer_size - metadata_length, metadata_length));
metadata_ = fbs::GetCTable(metadata_buffer_->data());
return ReadSchema();
}
Status ReadSchema() {
std::vector<std::shared_ptr<Field>> fields;
for (int i = 0; i < static_cast<int>(metadata_->columns()->size()); ++i) {
const fbs::Column* col = metadata_->columns()->Get(i);
std::shared_ptr<DataType> type;
RETURN_NOT_OK(
GetDataType(col->values(), col->metadata_type(), col->metadata(), &type));
fields.push_back(::arrow::field(col->name()->str(), type));
}
schema_ = ::arrow::schema(std::move(fields));
return Status::OK();
}
Status GetDataType(const fbs::PrimitiveArray* values, fbs::TypeMetadata metadata_type,
const void* metadata, std::shared_ptr<DataType>* out) {
#define PRIMITIVE_CASE(CAP_TYPE, FACTORY_FUNC) \
case fbs::Type::CAP_TYPE: \
*out = FACTORY_FUNC(); \
break;
switch (metadata_type) {
case fbs::TypeMetadata::CategoryMetadata: {
auto meta = static_cast<const fbs::CategoryMetadata*>(metadata);
std::shared_ptr<DataType> index_type, dict_type;
RETURN_NOT_OK(GetDataType(values, fbs::TypeMetadata::NONE, nullptr, &index_type));
RETURN_NOT_OK(
GetDataType(meta->levels(), fbs::TypeMetadata::NONE, nullptr, &dict_type));
*out = dictionary(index_type, dict_type, meta->ordered());
break;
}
case fbs::TypeMetadata::TimestampMetadata: {
auto meta = static_cast<const fbs::TimestampMetadata*>(metadata);
TimeUnit::type unit = FromFlatbufferEnum(meta->unit());
std::string tz;
// flatbuffer non-null
if (meta->timezone() != 0) {
tz = meta->timezone()->str();
} else {
tz = "";
}
*out = timestamp(unit, tz);
} break;
case fbs::TypeMetadata::DateMetadata:
*out = date32();
break;
case fbs::TypeMetadata::TimeMetadata: {
auto meta = static_cast<const fbs::TimeMetadata*>(metadata);
*out = time32(FromFlatbufferEnum(meta->unit()));
} break;
default:
switch (values->type()) {
PRIMITIVE_CASE(BOOL, boolean);
PRIMITIVE_CASE(INT8, int8);
PRIMITIVE_CASE(INT16, int16);
PRIMITIVE_CASE(INT32, int32);
PRIMITIVE_CASE(INT64, int64);
PRIMITIVE_CASE(UINT8, uint8);
PRIMITIVE_CASE(UINT16, uint16);
PRIMITIVE_CASE(UINT32, uint32);
PRIMITIVE_CASE(UINT64, uint64);
PRIMITIVE_CASE(FLOAT, float32);
PRIMITIVE_CASE(DOUBLE, float64);
PRIMITIVE_CASE(UTF8, utf8);
PRIMITIVE_CASE(BINARY, binary);
PRIMITIVE_CASE(LARGE_UTF8, large_utf8);
PRIMITIVE_CASE(LARGE_BINARY, large_binary);
default:
return Status::Invalid("Unrecognized type");
}
break;
}
#undef PRIMITIVE_CASE
return Status::OK();
}
int64_t GetOutputLength(int64_t nbytes) {
// XXX: Hack for Feather 0.3.0 for backwards compatibility with old files
// Size in-file of written byte buffer
if (version() < 2) {
// Feather files < 0.3.0
return nbytes;
} else {
return PaddedLength(nbytes);
}
}
// Retrieve a primitive array from the data source
//
// @returns: a Buffer instance, the precise type will depend on the kind of
// input data source (which may or may not have memory-map like semantics)
Status LoadValues(std::shared_ptr<DataType> type, const fbs::PrimitiveArray* meta,
fbs::TypeMetadata metadata_type, const void* metadata,
std::shared_ptr<ArrayData>* out) {
std::vector<std::shared_ptr<Buffer>> buffers;
// Buffer data from the source (may or may not perform a copy depending on
// input source)
ARROW_ASSIGN_OR_RAISE(auto buffer,
source_->ReadAt(meta->offset(), meta->total_bytes()));
int64_t offset = 0;
if (type->id() == Type::DICTIONARY) {
// Load the index type values
type = checked_cast<const DictionaryType&>(*type).index_type();
}
// If there are nulls, the null bitmask is first
if (meta->null_count() > 0) {
int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(meta->length()));
buffers.push_back(SliceBuffer(buffer, offset, null_bitmap_size));
offset += null_bitmap_size;
} else {
buffers.push_back(nullptr);
}
if (is_binary_like(type->id())) {
int64_t offsets_size = GetOutputLength((meta->length() + 1) * sizeof(int32_t));
buffers.push_back(SliceBuffer(buffer, offset, offsets_size));
offset += offsets_size;
} else if (is_large_binary_like(type->id())) {
int64_t offsets_size = GetOutputLength((meta->length() + 1) * sizeof(int64_t));
buffers.push_back(SliceBuffer(buffer, offset, offsets_size));
offset += offsets_size;
}
buffers.push_back(SliceBuffer(buffer, offset, buffer->size() - offset));
*out = ArrayData::Make(type, meta->length(), std::move(buffers), meta->null_count());
return Status::OK();
}
int version() const override { return metadata_->version(); }
int64_t num_rows() const { return metadata_->num_rows(); }
std::shared_ptr<Schema> schema() const override { return schema_; }
Status GetDictionary(int field_index, std::shared_ptr<ArrayData>* out) {
const fbs::Column* col_meta = metadata_->columns()->Get(field_index);
auto dict_meta = col_meta->metadata_as<fbs::CategoryMetadata>();
const auto& dict_type =
checked_cast<const DictionaryType&>(*schema_->field(field_index)->type());
return LoadValues(dict_type.value_type(), dict_meta->levels(),
fbs::TypeMetadata::NONE, nullptr, out);
}
Status GetColumn(int field_index, std::shared_ptr<ChunkedArray>* out) {
const fbs::Column* col_meta = metadata_->columns()->Get(field_index);
std::shared_ptr<ArrayData> data;
auto type = schema_->field(field_index)->type();
RETURN_NOT_OK(LoadValues(type, col_meta->values(), col_meta->metadata_type(),
col_meta->metadata(), &data));
if (type->id() == Type::DICTIONARY) {
RETURN_NOT_OK(GetDictionary(field_index, &data->dictionary));
data->type = type;
}
*out = std::make_shared<ChunkedArray>(MakeArray(data));
return Status::OK();
}
Status Read(std::shared_ptr<Table>* out) override {
std::vector<std::shared_ptr<ChunkedArray>> columns;
for (int i = 0; i < static_cast<int>(metadata_->columns()->size()); ++i) {
columns.emplace_back();
RETURN_NOT_OK(GetColumn(i, &columns.back()));
}
*out = Table::Make(this->schema(), std::move(columns), this->num_rows());
return Status::OK();
}
Status Read(const std::vector<int>& indices, std::shared_ptr<Table>* out) override {
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<ChunkedArray>> columns;
auto my_schema = this->schema();
for (auto field_index : indices) {
if (field_index < 0 || field_index >= my_schema->num_fields()) {
return Status::Invalid("Field index ", field_index, " is out of bounds");
}
columns.emplace_back();
RETURN_NOT_OK(GetColumn(field_index, &columns.back()));
fields.push_back(my_schema->field(field_index));
}
*out = Table::Make(::arrow::schema(std::move(fields)), std::move(columns),
this->num_rows());
return Status::OK();
}
Status Read(const std::vector<std::string>& names,
std::shared_ptr<Table>* out) override {
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<ChunkedArray>> columns;
std::shared_ptr<Schema> sch = this->schema();
for (auto name : names) {
int field_index = sch->GetFieldIndex(name);
if (field_index == -1) {
return Status::Invalid("Field named ", name, " is not found");
}
columns.emplace_back();
RETURN_NOT_OK(GetColumn(field_index, &columns.back()));
fields.push_back(sch->field(field_index));
}
*out = Table::Make(::arrow::schema(std::move(fields)), std::move(columns),
this->num_rows());
return Status::OK();
}
private:
std::shared_ptr<io::RandomAccessFile> source_;
std::shared_ptr<Buffer> metadata_buffer_;
const fbs::CTable* metadata_;
std::shared_ptr<Schema> schema_;
};
// ----------------------------------------------------------------------
// WriterV1
struct ArrayMetadata {
fbs::Type type;
int64_t offset;
int64_t length;
int64_t null_count;
int64_t total_bytes;
};
#define TO_FLATBUFFER_CASE(TYPE) \
case Type::TYPE: \
return fbs::Type::TYPE;
Result<fbs::Type> ToFlatbufferType(const DataType& type) {
switch (type.id()) {
TO_FLATBUFFER_CASE(BOOL);
TO_FLATBUFFER_CASE(INT8);
TO_FLATBUFFER_CASE(INT16);
TO_FLATBUFFER_CASE(INT32);
TO_FLATBUFFER_CASE(INT64);
TO_FLATBUFFER_CASE(UINT8);
TO_FLATBUFFER_CASE(UINT16);
TO_FLATBUFFER_CASE(UINT32);
TO_FLATBUFFER_CASE(UINT64);
TO_FLATBUFFER_CASE(FLOAT);
TO_FLATBUFFER_CASE(DOUBLE);
TO_FLATBUFFER_CASE(LARGE_BINARY);
TO_FLATBUFFER_CASE(BINARY);
case Type::STRING:
return fbs::Type::UTF8;
case Type::LARGE_STRING:
return fbs::Type::LARGE_UTF8;
case Type::DATE32:
return fbs::Type::INT32;
case Type::TIMESTAMP:
return fbs::Type::INT64;
case Type::TIME32:
return fbs::Type::INT32;
case Type::TIME64:
return fbs::Type::INT64;
default:
return Status::TypeError("Unsupported Feather V1 type: ", type.ToString(),
". Use V2 format to serialize all Arrow types.");
}
}
inline flatbuffers::Offset<fbs::PrimitiveArray> GetPrimitiveArray(
FBB& fbb, const ArrayMetadata& array) {
return fbs::CreatePrimitiveArray(fbb, array.type, fbs::Encoding::PLAIN, array.offset,
array.length, array.null_count, array.total_bytes);
}
// Convert Feather enums to Flatbuffer enums
inline fbs::TimeUnit ToFlatbufferEnum(TimeUnit::type unit) {
return static_cast<fbs::TimeUnit>(static_cast<int>(unit));
}
const fbs::TypeMetadata COLUMN_TYPE_ENUM_MAPPING[] = {
fbs::TypeMetadata::NONE, // PRIMITIVE
fbs::TypeMetadata::CategoryMetadata, // CATEGORY
fbs::TypeMetadata::TimestampMetadata, // TIMESTAMP
fbs::TypeMetadata::DateMetadata, // DATE
fbs::TypeMetadata::TimeMetadata // TIME
};
inline fbs::TypeMetadata ToFlatbufferEnum(ColumnType::type column_type) {
return COLUMN_TYPE_ENUM_MAPPING[column_type];
}
struct ColumnMetadata {
flatbuffers::Offset<void> WriteMetadata(FBB& fbb) { // NOLINT
switch (this->meta_type) {
case ColumnType::PRIMITIVE:
// flatbuffer void
return 0;
case ColumnType::CATEGORY: {
auto cat_meta = fbs::CreateCategoryMetadata(
fbb, GetPrimitiveArray(fbb, this->category_levels), this->category_ordered);
return cat_meta.Union();
}
case ColumnType::TIMESTAMP: {
// flatbuffer void
flatbuffers::Offset<flatbuffers::String> tz = 0;
if (!this->timezone.empty()) {
tz = fbb.CreateString(this->timezone);
}
auto ts_meta =
fbs::CreateTimestampMetadata(fbb, ToFlatbufferEnum(this->temporal_unit), tz);
return ts_meta.Union();
}
case ColumnType::DATE: {
auto date_meta = fbs::CreateDateMetadata(fbb);
return date_meta.Union();
}
case ColumnType::TIME: {
auto time_meta =
fbs::CreateTimeMetadata(fbb, ToFlatbufferEnum(this->temporal_unit));
return time_meta.Union();
}
default:
// null
DCHECK(false);
return 0;
}
}
ArrayMetadata values;
ColumnType::type meta_type;
ArrayMetadata category_levels;
bool category_ordered;
TimeUnit::type temporal_unit;
// A timezone name known to the Olson timezone database. For display purposes
// because the actual data is all UTC
std::string timezone;
};
Status WriteArrayV1(const Array& values, io::OutputStream* dst, ArrayMetadata* meta);
struct ArrayWriterV1 {
const Array& values;
io::OutputStream* dst;
ArrayMetadata* meta;
Status WriteBuffer(const uint8_t* buffer, int64_t length, int64_t bit_offset) {
int64_t bytes_written = 0;
if (buffer) {
RETURN_NOT_OK(
WritePaddedWithOffset(dst, buffer, bit_offset, length, &bytes_written));
} else {
RETURN_NOT_OK(WritePaddedBlank(dst, length, &bytes_written));
}
meta->total_bytes += bytes_written;
return Status::OK();
}
template <typename T>
typename std::enable_if<
is_nested_type<T>::value || is_null_type<T>::value || is_decimal_type<T>::value ||
std::is_same<DictionaryType, T>::value || is_duration_type<T>::value ||
is_interval_type<T>::value || is_fixed_size_binary_type<T>::value ||
std::is_same<Date64Type, T>::value || std::is_same<Time64Type, T>::value ||
std::is_same<ExtensionType, T>::value,
Status>::type
Visit(const T& type) {
return Status::NotImplemented(type.ToString());
}
template <typename T>
typename std::enable_if<is_number_type<T>::value ||
std::is_same<Date32Type, T>::value ||
std::is_same<Time32Type, T>::value ||
is_timestamp_type<T>::value || is_boolean_type<T>::value,
Status>::type
Visit(const T&) {
const auto& prim_values = checked_cast<const PrimitiveArray&>(values);
const auto& fw_type = checked_cast<const FixedWidthType&>(*values.type());
if (prim_values.values()) {
const uint8_t* buffer =
prim_values.values()->data() + (prim_values.offset() * fw_type.bit_width() / 8);
int64_t bit_offset = (prim_values.offset() * fw_type.bit_width()) % 8;
return WriteBuffer(buffer,
BitUtil::BytesForBits(values.length() * fw_type.bit_width()),
bit_offset);
} else {
return Status::OK();
}
return Status::OK();
}
template <typename T>
enable_if_base_binary<T, Status> Visit(const T&) {
using ArrayType = typename TypeTraits<T>::ArrayType;
const auto& ty_values = checked_cast<const ArrayType&>(values);
using offset_type = typename T::offset_type;
const offset_type* offsets_data = nullptr;
int64_t values_bytes = 0;
if (ty_values.value_offsets()) {
offsets_data = ty_values.raw_value_offsets();
// All of the data has to be written because we don't have offset
// shifting implemented here as with the IPC format
values_bytes = offsets_data[values.length()];
}
RETURN_NOT_OK(WriteBuffer(reinterpret_cast<const uint8_t*>(offsets_data),
sizeof(offset_type) * (values.length() + 1),
/*bit_offset=*/0));
const uint8_t* values_buffer = nullptr;
if (ty_values.value_data()) {
values_buffer = ty_values.value_data()->data();
}
return WriteBuffer(values_buffer, values_bytes, /*bit_offset=*/0);
}
Status Write() {
if (values.type_id() == Type::DICTIONARY) {
return WriteArrayV1(*(checked_cast<const DictionaryArray&>(values).indices()), dst,
meta);
}
ARROW_ASSIGN_OR_RAISE(meta->type, ToFlatbufferType(*values.type()));
ARROW_ASSIGN_OR_RAISE(meta->offset, dst->Tell());
meta->length = values.length();
meta->null_count = values.null_count();
meta->total_bytes = 0;
// Write the null bitmask
if (values.null_count() > 0) {
RETURN_NOT_OK(WriteBuffer(values.null_bitmap_data(),
BitUtil::BytesForBits(values.length()), values.offset()));
}
// Write data buffer(s)
return VisitTypeInline(*values.type(), this);
}
};
Status WriteArrayV1(const Array& values, io::OutputStream* dst, ArrayMetadata* meta) {
std::shared_ptr<Array> sanitized;
if (values.type_id() == Type::NA) {
// As long as R doesn't support NA, we write this as a StringColumn
// to ensure stable roundtrips.
sanitized = std::make_shared<StringArray>(values.length(), nullptr, nullptr,
values.null_bitmap(), values.null_count());
} else {
sanitized = MakeArray(values.data());
}
ArrayWriterV1 visitor{*sanitized, dst, meta};
return visitor.Write();
}
Status WriteColumnV1(const ChunkedArray& values, io::OutputStream* dst,
ColumnMetadata* out) {
if (values.num_chunks() > 1) {
return Status::Invalid("Writing chunked arrays not supported in Feather V1");
}
const Array& chunk = *values.chunk(0);
RETURN_NOT_OK(WriteArrayV1(chunk, dst, &out->values));
switch (chunk.type_id()) {
case Type::DICTIONARY: {
out->meta_type = ColumnType::CATEGORY;
auto dictionary = checked_cast<const DictionaryArray&>(chunk).dictionary();
RETURN_NOT_OK(WriteArrayV1(*dictionary, dst, &out->category_levels));
out->category_ordered =
checked_cast<const DictionaryType&>(*chunk.type()).ordered();
} break;
case Type::DATE32:
out->meta_type = ColumnType::DATE;
break;
case Type::TIME32: {
out->meta_type = ColumnType::TIME;
out->temporal_unit = checked_cast<const Time32Type&>(*chunk.type()).unit();
} break;
case Type::TIMESTAMP: {
const auto& ts_type = checked_cast<const TimestampType&>(*chunk.type());
out->meta_type = ColumnType::TIMESTAMP;
out->temporal_unit = ts_type.unit();
out->timezone = ts_type.timezone();
} break;
default:
out->meta_type = ColumnType::PRIMITIVE;
break;
}
return Status::OK();
}
Status WriteFeatherV1(const Table& table, io::OutputStream* dst) {
// Preamble
int64_t bytes_written;
RETURN_NOT_OK(WritePadded(dst, reinterpret_cast<const uint8_t*>(kFeatherV1MagicBytes),
strlen(kFeatherV1MagicBytes), &bytes_written));
// Write columns
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<fbs::Column>> fb_columns;
for (int i = 0; i < table.num_columns(); ++i) {
ColumnMetadata col;
RETURN_NOT_OK(WriteColumnV1(*table.column(i), dst, &col));
auto fb_column = fbs::CreateColumn(
fbb, fbb.CreateString(table.field(i)->name()), GetPrimitiveArray(fbb, col.values),
ToFlatbufferEnum(col.meta_type), col.WriteMetadata(fbb),
/*user_metadata=*/0);
fb_columns.push_back(fb_column);
}
// Finalize file footer
auto root = fbs::CreateCTable(fbb, /*description=*/0, table.num_rows(),
fbb.CreateVector(fb_columns), kFeatherV1Version,
/*metadata=*/0);
fbb.Finish(root);
auto buffer = std::make_shared<Buffer>(fbb.GetBufferPointer(),
static_cast<int64_t>(fbb.GetSize()));
// Writer metadata
RETURN_NOT_OK(WritePadded(dst, buffer->data(), buffer->size(), &bytes_written));
uint32_t metadata_size = static_cast<uint32_t>(bytes_written);
// Footer: metadata length, magic bytes
RETURN_NOT_OK(dst->Write(&metadata_size, sizeof(uint32_t)));
return dst->Write(kFeatherV1MagicBytes, strlen(kFeatherV1MagicBytes));
}
// ----------------------------------------------------------------------
// Reader V2
class ReaderV2 : public Reader {
public:
Status Open(const std::shared_ptr<io::RandomAccessFile>& source) {
source_ = source;
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_));
schema_ = reader->schema();
return Status::OK();
}
int version() const override { return kFeatherV2Version; }
std::shared_ptr<Schema> schema() const override { return schema_; }
Status Read(const IpcReadOptions& options, std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_, options));
RecordBatchVector batches(reader->num_record_batches());
for (int i = 0; i < reader->num_record_batches(); ++i) {
ARROW_ASSIGN_OR_RAISE(batches[i], reader->ReadRecordBatch(i));
}
return Table::FromRecordBatches(reader->schema(), batches).Value(out);
}
Status Read(std::shared_ptr<Table>* out) override {
return Read(IpcReadOptions::Defaults(), out);
}
Status Read(const std::vector<int>& indices, std::shared_ptr<Table>* out) override {
auto options = IpcReadOptions::Defaults();
options.included_fields = indices;
return Read(options, out);
}
Status Read(const std::vector<std::string>& names,
std::shared_ptr<Table>* out) override {
std::vector<int> indices;
std::shared_ptr<Schema> sch = this->schema();
for (auto name : names) {
int field_index = sch->GetFieldIndex(name);
if (field_index == -1) {
return Status::Invalid("Field named ", name, " is not found");
}
indices.push_back(field_index);
}
return Read(indices, out);
}
private:
std::shared_ptr<io::RandomAccessFile> source_;
std::shared_ptr<Schema> schema_;
};
} // namespace
Result<std::shared_ptr<Reader>> Reader::Open(
const std::shared_ptr<io::RandomAccessFile>& source) {
// Pathological issue where the file is smaller than header and footer
// combined
ARROW_ASSIGN_OR_RAISE(int64_t size, source->GetSize());
if (size < /* 2 * 4 + 4 */ 12) {
return Status::Invalid("File is too small to be a well-formed file");
}
// Determine what kind of file we have. 6 is the max of len(FEA1) and
// len(ARROW1)
constexpr int magic_size = 6;
ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(0, magic_size));
if (memcmp(buffer->data(), kFeatherV1MagicBytes, strlen(kFeatherV1MagicBytes)) == 0) {
std::shared_ptr<ReaderV1> result = std::make_shared<ReaderV1>();
RETURN_NOT_OK(result->Open(source));
return result;
} else if (memcmp(buffer->data(), internal::kArrowMagicBytes,
strlen(internal::kArrowMagicBytes)) == 0) {
std::shared_ptr<ReaderV2> result = std::make_shared<ReaderV2>();
RETURN_NOT_OK(result->Open(source));
return result;
} else {
return Status::Invalid("Not a Feather V1 or Arrow IPC file");
}
}
WriteProperties WriteProperties::Defaults() {
WriteProperties result;
#ifdef ARROW_WITH_LZ4
result.compression = Compression::LZ4_FRAME;
#else
result.compression = Compression::UNCOMPRESSED;
#endif
return result;
}
Status WriteTable(const Table& table, io::OutputStream* dst,
const WriteProperties& properties) {
if (properties.version == kFeatherV1Version) {
return WriteFeatherV1(table, dst);
} else {
IpcWriteOptions ipc_options = IpcWriteOptions::Defaults();
ipc_options.unify_dictionaries = true;
ipc_options.allow_64bit = true;
ARROW_ASSIGN_OR_RAISE(
ipc_options.codec,
util::Codec::Create(properties.compression, properties.compression_level));
std::shared_ptr<RecordBatchWriter> writer;
ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), ipc_options));
RETURN_NOT_OK(writer->WriteTable(table, properties.chunksize));
return writer->Close();
}
}
} // namespace feather
} // namespace ipc
} // namespace arrow