blob: 94adf640ebdbe44e2f2122fbbd1676b034b54eab [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Internal metadata serialization matters
#ifndef ARROW_IPC_METADATA_INTERNAL_H
#define ARROW_IPC_METADATA_INTERNAL_H
#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <vector>
#include <flatbuffers/flatbuffers.h>
#include "arrow/buffer.h"
#include "arrow/ipc/Message_generated.h"
#include "arrow/ipc/Schema_generated.h"
#include "arrow/ipc/dictionary.h" // IYWU pragma: keep
#include "arrow/ipc/message.h"
#include "arrow/memory_pool.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
namespace arrow {
class DataType;
class Schema;
class Tensor;
class SparseTensor;
namespace flatbuf = org::apache::arrow::flatbuf;
namespace io {
class OutputStream;
} // namespace io
namespace ipc {
class DictionaryMemo;
namespace internal {
static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion =
flatbuf::MetadataVersion_V4;
static constexpr flatbuf::MetadataVersion kMinMetadataVersion =
flatbuf::MetadataVersion_V4;
MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version);
static constexpr const char* kArrowMagicBytes = "ARROW1";
struct FieldMetadata {
int64_t length;
int64_t null_count;
int64_t offset;
};
struct BufferMetadata {
/// The relative offset into the memory page to the starting byte of the buffer
int64_t offset;
/// Absolute length in bytes of the buffer
int64_t length;
};
struct FileBlock {
int64_t offset;
int32_t metadata_length;
int64_t body_length;
};
// Read interface classes. We do not fully deserialize the flatbuffers so that
// individual fields metadata can be retrieved from very large schema without
//
// Construct a complete Schema from the message and add
// dictinory-encoded fields to a DictionaryMemo instance. May be
// expensive for very large schemas if you are only interested in a
// few fields
Status GetSchema(const void* opaque_schema, DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out);
Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape, std::vector<int64_t>* strides,
std::vector<std::string>* dim_names);
// EXPERIMENTAL: Extracting metadata of a sparse tensor from the message
Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape,
std::vector<std::string>* dim_names, int64_t* length,
SparseTensorFormat::type* sparse_tensor_format_id);
static inline Status VerifyMessage(const uint8_t* data, int64_t size,
const flatbuf::Message** out) {
flatbuffers::Verifier verifier(data, size, /*max_depth=*/128);
if (!flatbuf::VerifyMessageBuffer(verifier)) {
return Status::IOError("Invalid flatbuffers message.");
}
*out = flatbuf::GetMessage(data);
return Status::OK();
}
/// Write a serialized message metadata with a length-prefix and padding to an
/// 8-byte offset. Does not make assumptions about whether the stream is
/// aligned already
///
/// <message_size: int32><message: const void*><padding>
///
/// \param[in] message a buffer containing the metadata to write
/// \param[in] alignment the size multiple of the total message size including
/// length prefix, metadata, and padding. Usually 8 or 64
/// \param[in,out] file the OutputStream to write to
/// \param[out] message_length the total size of the payload written including
/// padding
/// \return Status
Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
int32_t* message_length);
// Serialize arrow::Schema as a Flatbuffer
//
// \param[in] schema a Schema instance
// \param[in,out] dictionary_memo class for tracking dictionaries and assigning
// dictionary ids
// \param[out] out the serialized arrow::Buffer
// \return Status outcome
Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo,
std::shared_ptr<Buffer>* out);
Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset,
std::shared_ptr<Buffer>* out);
Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
const std::vector<FileBlock>& record_batches,
io::OutputStream* out);
Status WriteDictionaryMessage(const int64_t id, const int64_t length,
const int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
static inline Status WriteFlatbufferBuilder(flatbuffers::FlatBufferBuilder& fbb,
std::shared_ptr<Buffer>* out) {
int32_t size = fbb.GetSize();
std::shared_ptr<Buffer> result;
RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), size, &result));
uint8_t* dst = result->mutable_data();
memcpy(dst, fbb.GetBufferPointer(), size);
*out = result;
return Status::OK();
}
} // namespace internal
} // namespace ipc
} // namespace arrow
#endif // ARROW_IPC_METADATA_H